summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp3714
1 files changed, 1392 insertions, 2322 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index f8dbd14..48e2cb5 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -15,25 +15,23 @@
#include <time.h>
#endif
-#ifndef WITHOUT_SYSTEMD
-#include <systemd/sd-daemon.h>
-#endif
-
#include <boost/asio/steady_timer.hpp>
#include <vsomeip/constants.hpp>
-#include <vsomeip/message.hpp>
#include <vsomeip/payload.hpp>
#include <vsomeip/runtime.hpp>
+#include <vsomeip/internal/logger.hpp>
#include "../include/event.hpp"
#include "../include/eventgroupinfo.hpp"
+#include "../include/remote_subscription.hpp"
#include "../include/routing_manager_host.hpp"
#include "../include/routing_manager_impl.hpp"
#include "../include/routing_manager_stub.hpp"
#include "../include/serviceinfo.hpp"
#include "../../configuration/include/configuration.hpp"
-#include "../../configuration/include/internal.hpp"
+#include "../../security/include/security.hpp"
+
#include "../../endpoints/include/endpoint_definition.hpp"
#include "../../endpoints/include/local_client_endpoint_impl.hpp"
#include "../../endpoints/include/tcp_client_endpoint_impl.hpp"
@@ -41,32 +39,38 @@
#include "../../endpoints/include/udp_client_endpoint_impl.hpp"
#include "../../endpoints/include/udp_server_endpoint_impl.hpp"
#include "../../endpoints/include/virtual_server_endpoint_impl.hpp"
-#include "../../logging/include/logger.hpp"
#include "../../message/include/deserializer.hpp"
+#include "../../message/include/message_impl.hpp"
#include "../../message/include/serializer.hpp"
#include "../../service_discovery/include/constants.hpp"
#include "../../service_discovery/include/defines.hpp"
#include "../../service_discovery/include/runtime.hpp"
-#include "../../service_discovery/include/service_discovery_impl.hpp"
+#include "../../service_discovery/include/service_discovery.hpp"
#include "../../utility/include/byteorder.hpp"
#include "../../utility/include/utility.hpp"
-#include "../../plugin/include/plugin_manager.hpp"
+#include "../../plugin/include/plugin_manager_impl.hpp"
#ifdef USE_DLT
#include "../../tracing/include/connector_impl.hpp"
#endif
+#ifndef ANDROID
#include "../../e2e_protection/include/buffer/buffer.hpp"
#include "../../e2e_protection/include/e2exf/config.hpp"
-#include "../../e2e_protection/include/e2e/profile/profile01/profile_01.hpp"
-#include "../../e2e_protection/include/e2e/profile/profile01/protector.hpp"
-#include "../../e2e_protection/include/e2e/profile/profile01/checker.hpp"
+#include "../../e2e_protection/include/e2e/profile/e2e_provider.hpp"
+#endif
-#include "../../e2e_protection/include/e2e/profile/profile_custom/profile_custom.hpp"
-#include "../../e2e_protection/include/e2e/profile/profile_custom/protector.hpp"
-#include "../../e2e_protection/include/e2e/profile/profile_custom/checker.hpp"
+#ifdef USE_DLT
+#include "../../tracing/include/connector_impl.hpp"
+#endif
-namespace vsomeip {
+namespace vsomeip_v3 {
+
+#ifdef ANDROID
+namespace sd {
+runtime::~runtime() {}
+}
+#endif
routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
routing_manager_base(_host),
@@ -74,11 +78,9 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
if_state_running_(false),
sd_route_set_(false),
routing_running_(false),
-#ifndef WITHOUT_SYSTEMD
- watchdog_timer_(_host->get_io()),
-#endif
status_log_timer_(_host->get_io()),
memory_log_timer_(_host->get_io()),
+ ep_mgr_impl_(std::make_shared<endpoint_manager_impl>(this, io_, configuration_)),
pending_remote_offer_id_(0),
last_resume_(std::chrono::steady_clock::now().min()),
pending_security_update_id_(0)
@@ -86,6 +88,8 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
}
routing_manager_impl::~routing_manager_impl() {
+ utility::remove_lockfile(configuration_);
+ utility::reset_client_ids();
}
boost::asio::io_service & routing_manager_impl::get_io() {
@@ -100,22 +104,23 @@ std::set<client_t> routing_manager_impl::find_local_clients(service_t _service,
return routing_manager_base::find_local_clients(_service, _instance);
}
-bool routing_manager_impl::is_subscribe_to_any_event_allowed(client_t _client,
+client_t routing_manager_impl::find_local_client(service_t _service, instance_t _instance) {
+ return routing_manager_base::find_local_client(_service, _instance);
+}
+
+bool routing_manager_impl::is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
- return routing_manager_base::is_subscribe_to_any_event_allowed(_client,
+ return routing_manager_base::is_subscribe_to_any_event_allowed(_credentials, _client,
_service, _instance, _eventgroup);
}
void routing_manager_impl::init() {
- routing_manager_base::init();
+ routing_manager_base::init(ep_mgr_impl_);
// TODO: Only instantiate the stub if needed
stub_ = std::make_shared<routing_manager_stub>(this, configuration_);
stub_->init();
- // We need to be able to send messages to ourself (for delivering events)
- (void)create_local(VSOMEIP_ROUTING_CLIENT);
-
if (configuration_->is_sd_enabled()) {
VSOMEIP_INFO<< "Service Discovery enabled. Trying to load module.";
auto its_plugin = plugin_manager::get()->get_plugin(
@@ -130,34 +135,29 @@ void routing_manager_impl::init() {
}
}
+#ifndef ANDROID
if( configuration_->is_e2e_enabled()) {
VSOMEIP_INFO << "E2E protection enabled.";
+
+ const char *its_e2e_module = getenv(VSOMEIP_ENV_E2E_PROTECTION_MODULE);
+ std::string plugin_name = its_e2e_module != nullptr ? its_e2e_module : VSOMEIP_E2E_LIBRARY;
+
+ auto its_plugin = plugin_manager::get()->get_plugin(plugin_type_e::APPLICATION_PLUGIN, plugin_name);
+ if (its_plugin) {
+ VSOMEIP_INFO << "E2E module loaded.";
+ e2e_provider_ = std::dynamic_pointer_cast<e2e::e2e_provider>(its_plugin);
+ }
+ }
+
+ if(e2e_provider_) {
std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration();
for (auto &identifier : its_e2e_configuration) {
- auto its_cfg = identifier.second;
- if(its_cfg->profile == "CRC8") {
- e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
- e2e::profile01::profile_config its_profile_config = e2e::profile01::profile_config(its_cfg->crc_offset, its_cfg->data_id,
- (e2e::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset);
- if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) {
- custom_protectors[its_data_identifier] = std::make_shared<e2e::profile01::protector>(its_profile_config);
- }
- if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) {
- custom_checkers[its_data_identifier] = std::make_shared<e2e::profile01::profile_01_checker>(its_profile_config);
- }
- } else if(its_cfg->profile == "CRC32") {
- e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
- e2e::profile_custom::profile_config its_profile_config = e2e::profile_custom::profile_config(its_cfg->crc_offset);
-
- if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) {
- custom_protectors[its_data_identifier] = std::make_shared<e2e::profile_custom::protector>(its_profile_config);
- }
- if ((its_cfg->variant == "checker") || (its_cfg->variant == "both")) {
- custom_checkers[its_data_identifier] = std::make_shared<e2e::profile_custom::profile_custom_checker>(its_profile_config);
- }
+ if(!e2e_provider_->add_configuration(identifier.second)) {
+ VSOMEIP_INFO << "Unknown E2E profile: " << identifier.second->profile << ", skipping ...";
}
}
}
+#endif
}
void routing_manager_impl::start() {
@@ -186,15 +186,6 @@ void routing_manager_impl::start() {
version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
this, std::placeholders::_1));
}
-
-#ifndef WITHOUT_SYSTEMD
- {
- std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
- watchdog_timer_.expires_from_now(std::chrono::seconds(0));
- watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
- this, std::placeholders::_1));
- }
-#endif
#ifndef _WIN32
if (configuration_->log_memory()) {
std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
@@ -216,6 +207,26 @@ void routing_manager_impl::start() {
}
void routing_manager_impl::stop() {
+ // Ensure to StopOffer all services that are offered by the application hosting the rm
+ local_services_map_t its_services;
+ {
+ std::lock_guard<std::mutex> its_lock(local_services_mutex_);
+ for (const auto& s : local_services_) {
+ for (const auto& i : s.second) {
+ if (std::get<2>(i.second) == client_) {
+ its_services[s.first][i.first] = i.second;
+ }
+ }
+ }
+
+ }
+ for (const auto& s : its_services) {
+ for (const auto& i : s.second) {
+ on_stop_offer_service(std::get<2>(i.second), s.first, i.first,
+ std::get<0>(i.second), std::get<1>(i.second));
+ }
+ }
+
{
std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
version_log_timer_.cancel();
@@ -236,37 +247,119 @@ void routing_manager_impl::stop() {
boost::system::error_code ec;
status_log_timer_.cancel(ec);
}
-#ifndef WITHOUT_SYSTEMD
- {
- std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
- watchdog_timer_.cancel();
- }
- sd_notify(0, "STOPPING=1");
- VSOMEIP_INFO << "Sent STOPPING to systemd watchdog";
-#endif
-
host_->on_state(state_type_e::ST_DEREGISTERED);
if (discovery_)
discovery_->stop();
stub_->stop();
- for (auto client: get_connected_clients()) {
+ for (auto client: ep_mgr_->get_connected_clients()) {
if (client != VSOMEIP_ROUTING_CLIENT) {
remove_local(client, true);
}
}
}
-bool routing_manager_impl::offer_service(client_t _client, service_t _service,
- instance_t _instance, major_version_t _major, minor_version_t _minor) {
+bool routing_manager_impl::insert_offer_command(service_t _service, instance_t _instance, uint8_t _command,
+ client_t _client, major_version_t _major, minor_version_t _minor) {
+ std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
+ // flag to indicate whether caller of this function can start directly processing the command
+ bool must_process(false);
+ auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
+ if (found_service_instance != offer_commands_.end()) {
+ // if nothing is queued
+ if (found_service_instance->second.empty()) {
+ must_process = true;
+ }
+ found_service_instance->second.push_back(
+ std::make_tuple(_command, _client, _major, _minor));
+ } else {
+ // nothing is queued -> add command to queue and process command directly
+ offer_commands_[std::make_pair(_service, _instance)].push_back(
+ std::make_tuple(_command, _client, _major, _minor));
+ must_process = true;
+ }
+ return must_process;
+}
+
+bool routing_manager_impl::erase_offer_command(service_t _service, instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
+ auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
+ if (found_service_instance != offer_commands_.end()) {
+ // erase processed command
+ if (!found_service_instance->second.empty()) {
+ found_service_instance->second.pop_front();
+ if (!found_service_instance->second.empty()) {
+ // check for other commands to be processed
+ auto its_command = found_service_instance->second.front();
+ if (std::get<0>(its_command) == VSOMEIP_OFFER_SERVICE) {
+ io_.post([&, its_command, _service, _instance](){
+ offer_service(std::get<1>(its_command), _service, _instance,
+ std::get<2>(its_command), std::get<3>(its_command), false);
+ });
+ } else {
+ io_.post([&, its_command, _service, _instance](){
+ stop_offer_service(std::get<1>(its_command), _service, _instance,
+ std::get<2>(its_command), std::get<3>(its_command), false);
+ });
+ }
+ }
+ }
+ }
+ return true;
+}
+
+bool routing_manager_impl::offer_service(client_t _client,
+ service_t _service, instance_t _instance,
+ major_version_t _major, minor_version_t _minor) {
+
+ return offer_service(_client, _service, _instance, _major, _minor, true);
+}
+
+bool routing_manager_impl::offer_service(client_t _client,
+ service_t _service, instance_t _instance,
+ major_version_t _major, minor_version_t _minor,
+ bool _must_queue) {
+
VSOMEIP_INFO << "OFFER("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
- << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]";
+ << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"
+ << " (" << _must_queue << ")";
+
+ // only queue commands if method was NOT called via erase_offer_command()
+ if (_must_queue) {
+ if (!insert_offer_command(_service, _instance, VSOMEIP_OFFER_SERVICE,
+ _client, _major, _minor)) {
+ return false;
+ }
+ }
+
+ // Check if the application hosted by routing manager is allowed to offer
+ // offer_service requests of local proxies are checked in rms::on:message
+ if (_client == get_client()) {
+#ifdef _WIN32
+ std::uint32_t its_routing_uid = ANY_UID;
+ std::uint32_t its_routing_gid = ANY_GID;
+#else
+ std::uint32_t its_routing_uid = getuid();
+ std::uint32_t its_routing_gid = getgid();
+#endif
+ if (!security::get()->is_offer_allowed(its_routing_uid, its_routing_gid,
+ _client, _service, _instance)) {
+ VSOMEIP_WARNING << "routing_manager_impl::offer_service: "
+ << std::hex << "Security: Client 0x" << _client
+ << " isn't allowed to offer the following service/instance "
+ << _service << "/" << _instance
+ << " ~> Skip offer!";
+ erase_offer_command(_service, _instance);
+ return false;
+ }
+ }
- if(!handle_local_offer_service(_client, _service, _instance, _major, _minor)) {
+ if (!handle_local_offer_service(_client, _service, _instance, _major, _minor)) {
+ erase_offer_command(_service, _instance);
return false;
}
@@ -282,7 +375,7 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service,
if (discovery_) {
std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
if (its_info) {
- discovery_->offer_service(_service, _instance, its_info);
+ discovery_->offer_service(its_info);
}
}
@@ -290,16 +383,23 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service,
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
std::set<event_t> its_already_subscribed_events;
for (auto &ps : pending_subscriptions_) {
- if (ps.service_ == _service &&
- ps.instance_ == _instance && ps.major_ == _major) {
+ if (ps.service_ == _service
+ && ps.instance_ == _instance
+ && ps.major_ == _major) {
insert_subscription(ps.service_, ps.instance_,
ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events);
- }
+ VSOMEIP_ERROR << __func__
+ << ": event="
+ << std::hex << ps.service_ << "."
+ << std::hex << ps.instance_ << "."
+ << std::hex << ps.event_; }
}
+
send_pending_subscriptions(_service, _instance, _major);
}
stub_->on_offer_service(_client, _service, _instance, _major, _minor);
on_availability(_service, _instance, true, _major, _minor);
+ erase_offer_command(_service, _instance);
return true;
}
@@ -307,11 +407,28 @@ void routing_manager_impl::stop_offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
+ stop_offer_service(_client, _service, _instance, _major, _minor, true);
+}
+
+void routing_manager_impl::stop_offer_service(client_t _client,
+ service_t _service, instance_t _instance,
+ major_version_t _major, minor_version_t _minor,
+ bool _must_queue) {
+
VSOMEIP_INFO << "STOP OFFER("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
- << ":" << std::dec << int(_major) << "." << _minor << "]";
+ << ":" << std::dec << int(_major) << "." << _minor << "]"
+ << " (" << _must_queue << ")";
+
+ if (_must_queue) {
+ if (!insert_offer_command(_service, _instance, VSOMEIP_STOP_OFFER_SERVICE,
+ _client, _major, _minor)) {
+ return;
+ }
+ }
+
bool is_local(false);
{
std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
@@ -340,12 +457,12 @@ void routing_manager_impl::stop_offer_service(client_t _client,
<< std::hex << std::setw(4) << std::setfill('0') << _instance
<< ":" << std::dec << int(_major) << "." << _minor << "] "
<< "for remote service --> ignore";
+ erase_offer_command(_service, _instance);
}
}
void routing_manager_impl::request_service(client_t _client, service_t _service,
- instance_t _instance, major_version_t _major, minor_version_t _minor,
- bool _use_exclusive_proxy) {
+ instance_t _instance, major_version_t _major, minor_version_t _minor) {
VSOMEIP_INFO << "REQUEST("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
@@ -353,8 +470,8 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
<< std::hex << std::setw(4) << std::setfill('0') << _instance << ":"
<< std::dec << int(_major) << "." << std::dec << _minor << "]";
- routing_manager_base::request_service(_client, _service, _instance, _major,
- _minor, _use_exclusive_proxy);
+ routing_manager_base::request_service(_client,
+ _service, _instance, _major, _minor);
auto its_info = find_service(_service, _instance);
if (!its_info) {
@@ -381,27 +498,22 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
|| _minor == ANY_MINOR)) {
if(!its_info->is_local()) {
requested_service_add(_client, _service, _instance, _major, _minor);
- its_info->add_client(_client);
- find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT);
- if (_use_exclusive_proxy) {
- std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true);
- if(its_endpoint) {
- find_or_create_remote_client(_service, _instance, true, _client);
- }
+ if (discovery_) {
+ // Non local service instance ~> tell SD to find it!
+ discovery_->request_service(_service, _instance, _major,
+ _minor, DEFAULT_TTL);
}
+ its_info->add_client(_client);
+ ep_mgr_impl_->find_or_create_remote_client(
+ _service, _instance, true, VSOMEIP_ROUTING_CLIENT);
}
}
}
- if (_use_exclusive_proxy) {
- std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
- specific_endpoint_clients_[_service][_instance].insert(_client);
- }
-
if (_client == get_client()) {
stub_->create_local_receiver();
- service_data_t request = { _service, _instance, _major, _minor, _use_exclusive_proxy };
+ service_data_t request = { _service, _instance, _major, _minor };
std::set<service_data_t> requests;
requests.insert(request);
stub_->handle_requests(_client, requests);
@@ -424,36 +536,28 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
requested_service_remove(_client, _service, _instance);
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
- if(its_info && !its_info->is_local()) {
- unsubscribe_specific_client_at_sd(_service, _instance, _client);
- if(!its_info->get_requesters_size()) {
- if(discovery_) {
+ if (its_info && !its_info->is_local()) {
+ if (!its_info->get_requesters_size()) {
+ if (discovery_) {
discovery_->release_service(_service, _instance);
- discovery_->unsubscribe_client(_service, _instance, VSOMEIP_ROUTING_CLIENT);
+ discovery_->unsubscribe_all(_service, _instance);
}
- clear_client_endpoints(_service, _instance, true);
- clear_client_endpoints(_service, _instance, false);
+ ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
+ ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
its_info->set_endpoint(nullptr, true);
its_info->set_endpoint(nullptr, false);
- clear_identified_clients(_service, _instance);
- clear_identifying_clients( _service, _instance);
unset_all_eventpayloads(_service, _instance);
- } else {
- remove_identified_client(_service, _instance, _client);
- remove_identifying_client(_service, _instance, _client);
- remove_specific_client_endpoint(_service, _instance, _client, true);
- remove_specific_client_endpoint(_service, _instance, _client, false);
}
} else {
- if(discovery_) {
+ if (discovery_) {
discovery_->release_service(_service, _instance);
}
}
}
-void routing_manager_impl::subscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- event_t _event, subscription_type_e _subscription_type) {
+void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ major_version_t _major, event_t _event) {
VSOMEIP_INFO << "SUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
@@ -464,14 +568,16 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
<< std::dec << (uint16_t)_major << "]";
const client_t its_local_client = find_local_client(_service, _instance);
if (get_client() == its_local_client) {
+#ifdef VSOMEIP_ENABLE_COMPAT
routing_manager_base::set_incoming_subscription_state(_client, _service, _instance,
_eventgroup, _event, subscription_state_e::IS_SUBSCRIBING);
+#endif
auto self = shared_from_this();
- host_->on_subscription(_service, _instance, _eventgroup, _client, true,
- [this, self, _client, _service, _instance, _eventgroup,
- _event, _major, _subscription_type]
+ host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, true,
+ [this, self, _client, _uid, _gid, _service, _instance, _eventgroup,
+ _event, _major]
(const bool _subscription_accepted) {
- (void) find_or_create_local(_client);
+ (void) ep_mgr_->find_or_create_local(_client);
if (!_subscription_accepted) {
stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event);
VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex
@@ -481,58 +587,52 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
} else {
stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
}
- routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type);
+ routing_manager_base::subscribe(_client, _uid, _gid, _service, _instance, _eventgroup, _major, _event);
+#ifdef VSOMEIP_ENABLE_COMPAT
send_pending_notify_ones(_service, _instance, _eventgroup, _client);
routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance,
_eventgroup, _event);
+#endif
});
} else {
if (discovery_) {
- client_t subscriber = VSOMEIP_ROUTING_CLIENT;
- if (0 == its_local_client) {
- subscriber = is_specific_endpoint_client(_client, _service, _instance);
- if (subscriber != VSOMEIP_ROUTING_CLIENT) {
- if (supports_selective(_service, _instance)) {
- identify_for_subscribe(_client, _service, _instance,
- _major, _subscription_type);
- } else {
- VSOMEIP_INFO << "Subcribe to legacy selective service: " << std::hex
- << _service << ":" << _instance << ".";
- }
- }
- }
- std::unique_lock<std::mutex> eventgroup_lock;
- auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- eventgroup_lock = its_eventgroup->get_subscription_lock();
- }
std::set<event_t> its_already_subscribed_events;
+
+ // Note: The calls to insert_subscription & handle_subscription_state must not
+ // run concurrently to a call to on_subscribe_ack. Therefore the lock is acquired
+ // before calling insert_subscription and released after the call to
+ // handle_subscription_state.
+ std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_);
bool inserted = insert_subscription(_service, _instance, _eventgroup,
_event, _client, &its_already_subscribed_events);
if (inserted) {
if (0 == its_local_client) {
handle_subscription_state(_client, _service, _instance, _eventgroup, _event);
- if (its_eventgroup) {
- eventgroup_lock.unlock();
- }
+ its_critical.unlock();
static const ttl_t configured_ttl(configuration_->get_sd_ttl());
notify_one_current_value(_client, _service, _instance,
_eventgroup, _event, its_already_subscribed_events);
- discovery_->subscribe(_service, _instance, _eventgroup,
- _major, configured_ttl, subscriber, _subscription_type);
+
+ auto its_info = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_info) {
+ discovery_->subscribe(_service, _instance, _eventgroup,
+ _major, configured_ttl,
+ its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT);
+ }
} else {
+ its_critical.unlock();
if (is_available(_service, _instance, _major)) {
- stub_->send_subscribe(find_local(_service, _instance),
- _client, _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION);
+ stub_->send_subscribe(ep_mgr_->find_local(_service, _instance),
+ _client, _service, _instance, _eventgroup, _major, _event,
+ PENDING_SUBSCRIPTION_ID);
}
}
- } else if (its_eventgroup) {
- eventgroup_lock.unlock();
}
if (get_client() == _client) {
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
- subscription_data_t subscription = { _service, _instance, _eventgroup, _major,
- _event, _subscription_type};
+ subscription_data_t subscription = {
+ _service, _instance, _eventgroup, _major, _event, _uid, _gid
+ };
pending_subscriptions_.insert(subscription);
}
} else {
@@ -541,8 +641,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
}
}
-void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
+void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
VSOMEIP_INFO << "UNSUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
@@ -552,14 +652,16 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
<< std::hex << std::setw(4) << std::setfill('0') << _event << "]";
bool last_subscriber_removed(true);
+
+ auto its_event = find_event(_service, _instance, _event);
std::shared_ptr<eventgroupinfo> its_info
= find_eventgroup(_service, _instance, _eventgroup);
if (its_info) {
- for (auto e : its_info->get_events()) {
+ for (const auto& e : its_info->get_events()) {
if (e->get_event() == _event || ANY_EVENT == _event)
e->remove_subscriber(_eventgroup, _client);
}
- for (auto e : its_info->get_events()) {
+ for (const auto& e : its_info->get_events()) {
if (e->has_subscriber(_eventgroup, ANY_CLIENT)) {
last_subscriber_removed = false;
break;
@@ -568,40 +670,36 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
}
if (discovery_) {
- host_->on_subscription(_service, _instance, _eventgroup, _client, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; });
+ host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false,
+ [](const bool _subscription_accepted){ (void)_subscription_accepted; });
if (0 == find_local_client(_service, _instance)) {
- client_t subscriber = is_specific_endpoint_client(_client, _service, _instance);
if (last_subscriber_removed) {
unset_all_eventpayloads(_service, _instance, _eventgroup);
- }
- if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) {
{
- auto tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber);
+ auto tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
remote_subscription_state_.erase(tuple);
}
- // for normal subscribers only unsubscribe via SD if last
- // subscriber was removed
- discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber);
- } else if (subscriber != VSOMEIP_ROUTING_CLIENT) {
- {
- auto tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber);
- std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
- remote_subscription_state_.erase(tuple);
+ }
+
+ if (last_subscriber_removed
+ || (its_event && its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT)) {
+ if (its_info) {
+ discovery_->unsubscribe(_service, _instance, _eventgroup,
+ its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT);
}
- // for selective subscribers always unsubscribe at the SD
- discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber);
}
} else {
if (get_client() == _client) {
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
remove_pending_subscription(_service, _instance, _eventgroup, _event);
- stub_->send_unsubscribe(find_local(_service, _instance),
+ stub_->send_unsubscribe(
+ ep_mgr_->find_local(_service, _instance),
_client, _service, _instance, _eventgroup, _event,
- DEFAULT_SUBSCRIPTION);
+ PENDING_SUBSCRIPTION_ID);
}
}
- clear_multicast_endpoints(_service, _instance);
+ ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
} else {
VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
@@ -609,18 +707,21 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
}
bool routing_manager_impl::send(client_t _client,
- std::shared_ptr<message> _message, bool _flush) {
- return routing_manager_base::send(_client, _message, _flush);
+ std::shared_ptr<message> _message) {
+ return routing_manager_base::send(_client, _message);
}
bool routing_manager_impl::send(client_t _client, const byte_t *_data,
- length_t _size, instance_t _instance, bool _flush, bool _reliable,
- client_t _bound_client, bool _is_valid_crc, bool _sent_from_remote) {
+ length_t _size, instance_t _instance, bool _reliable,
+ client_t _bound_client,
+ credentials_t _credentials,
+ uint8_t _status_check, bool _sent_from_remote) {
bool is_sent(false);
if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
std::shared_ptr<endpoint> its_target;
bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]);
bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]);
+ bool is_response = utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS]);
client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
_data[VSOMEIP_CLIENT_POS_MAX]);
@@ -629,21 +730,14 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
method_t its_method = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
- bool is_service_discovery = (its_service == vsomeip::sd::service
- && its_method == vsomeip::sd::method);
-
-#ifdef USE_DLT
- bool is_response(false);
-#endif
+ bool is_service_discovery
+ = (its_service == sd::service && its_method == sd::method);
if (is_request) {
- its_target = find_local(its_service, _instance);
+ its_target = ep_mgr_->find_local(its_service, _instance);
} else if (!is_notification) {
-#ifdef USE_DLT
- is_response = true;
-#endif
its_target = find_local(its_client);
- } else if (is_notification && _client) { // Selective notifications!
+ } else if (is_notification && _client && !is_service_discovery) { // Selective notifications!
if (_client == get_client()) {
#ifdef USE_DLT
const uint16_t its_data_size
@@ -654,7 +748,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
#endif
- deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote);
+ deliver_message(_data, _size, _instance, _reliable, _bound_client, _credentials, _status_check, _sent_from_remote);
return true;
}
its_target = find_local(_client);
@@ -673,36 +767,39 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
_data, its_data_size);
}
#endif
- is_sent = send_local(its_target, get_client(), _data, _size, _instance, _flush, _reliable, VSOMEIP_SEND, _is_valid_crc);
+ is_sent = send_local(its_target, get_client(), _data, _size, _instance, _reliable, VSOMEIP_SEND, _status_check);
} else {
// Check whether hosting application should get the message
// If not, check routes to external
- if ((its_client == host_->get_client() && !is_request)
+ if ((its_client == host_->get_client() && is_response)
|| (find_local_client(its_service, _instance)
== host_->get_client() && is_request)) {
- // TODO: find out how to handle session id here
- is_sent = deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote);
+ // TODO: Find out how to handle session id here
+ is_sent = deliver_message(_data, _size, _instance, _reliable, VSOMEIP_ROUTING_CLIENT, _credentials, _status_check);
} else {
e2e_buffer outputBuffer;
- if( configuration_->is_e2e_enabled()) {
+ if (e2e_provider_) {
if ( !is_service_discovery) {
service_t its_service = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
method_t its_method = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
- if( custom_protectors.count({its_service, its_method})) {
+#ifndef ANDROID
+ if (e2e_provider_->is_protected({its_service, its_method})) {
outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS);
e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size);
- custom_protectors[{its_service, its_method}]->protect( inputBuffer);
+ e2e_provider_->protect({its_service, its_method}, inputBuffer);
outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS);
std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS);
_data = outputBuffer.data();
}
+#endif
}
}
if (is_request) {
- client_t client = is_specific_endpoint_client(its_client, its_service, _instance);
- its_target = find_or_create_remote_client(its_service, _instance, _reliable, client);
+ client_t client = VSOMEIP_ROUTING_CLIENT;
+ its_target = ep_mgr_impl_->find_or_create_remote_client(
+ its_service, _instance, _reliable, client);
if (its_target) {
#ifdef USE_DLT
const uint16_t its_data_size
@@ -713,7 +810,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
#endif
- is_sent = its_target->send(_data, _size, _flush);
+ is_sent = its_target->send(_data, _size);
} else {
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN],
@@ -729,7 +826,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance));
if (its_info || is_service_discovery) {
if (is_notification && !is_service_discovery) {
- send_local_notification(get_client(), _data, _size, _instance, _flush, _reliable, _is_valid_crc);
+ send_local_notification(get_client(), _data, _size, _instance, _reliable, _status_check);
method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method);
@@ -741,42 +838,49 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
// we need both endpoints as clients can subscribe to events via TCP and UDP
std::shared_ptr<endpoint> its_udp_server_endpoint = its_info->get_endpoint(false);
std::shared_ptr<endpoint> its_tcp_server_endpoint = its_info->get_endpoint(true);
- bool is_offered_both(false);
- if (its_udp_server_endpoint && its_tcp_server_endpoint) {
- is_offered_both = true;
- }
+
if (its_udp_server_endpoint || its_tcp_server_endpoint) {
+ const auto its_reliability = its_event->get_reliability();
for (auto its_group : its_event->get_eventgroups()) {
auto its_eventgroup = find_eventgroup(its_service, _instance, its_group);
if (its_eventgroup) {
// Unicast targets
- for (const auto &its_remote : its_eventgroup->get_targets()) {
- if(its_remote.endpoint_->is_reliable() && its_tcp_server_endpoint) {
- if (!is_offered_both || (is_offered_both && its_event->is_reliable())) {
- its_targets.insert(its_remote.endpoint_);
+ for (const auto &its_remote : its_eventgroup->get_unicast_targets()) {
+ if (its_remote->is_reliable() && its_tcp_server_endpoint) {
+ if (its_reliability == reliability_type_e::RT_RELIABLE
+ || its_reliability == reliability_type_e::RT_BOTH) {
+ its_targets.insert(its_remote);
}
} else if (its_udp_server_endpoint && !its_eventgroup->is_sending_multicast()) {
- if (!is_offered_both || (is_offered_both && !its_event->is_reliable())) {
- its_targets.insert(its_remote.endpoint_);
+ if (its_reliability == reliability_type_e::RT_UNRELIABLE
+ || its_reliability == reliability_type_e::RT_BOTH) {
+ its_targets.insert(its_remote);
}
}
}
// Send to multicast targets if subscribers are still interested
if (its_eventgroup->is_sending_multicast()) {
- for (auto its_multicast_target : its_eventgroup->get_multicast_targets()) {
- if (!is_offered_both || (is_offered_both && !its_event->is_reliable())) {
- its_targets.insert(its_multicast_target.endpoint_);
+ if (its_reliability == reliability_type_e::RT_UNRELIABLE
+ || its_reliability == reliability_type_e::RT_BOTH) {
+ boost::asio::ip::address its_address;
+ uint16_t its_port;
+ if (its_eventgroup->get_multicast(its_address, its_port)) {
+ std::shared_ptr<endpoint_definition> its_multicast_target;
+ its_multicast_target = endpoint_definition::get(its_address,
+ its_port, false, its_service, _instance);
+ its_targets.insert(its_multicast_target);
}
}
}
}
}
}
+
for (auto const &target : its_targets) {
if (target->is_reliable()) {
- its_tcp_server_endpoint->send_to(target, _data, _size, _flush);
+ its_tcp_server_endpoint->send_to(target, _data, _size);
} else {
- its_udp_server_endpoint->send_to(target, _data, _size, _flush);
+ its_udp_server_endpoint->send_to(target, _data, _size);
}
#ifdef USE_DLT
has_sent = true;
@@ -798,7 +902,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
if ((utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS])
|| utility::is_error(_data[VSOMEIP_MESSAGE_TYPE_POS]))
&& !its_info->is_local()) {
- // we received a response/error but neither the hosting application
+ // We received a response/error but neither the hosting application
// nor another local client could be found --> drop
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN],
@@ -824,7 +928,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
#endif
- is_sent = its_target->send(_data, _size, _flush);
+ is_sent = its_target->send(_data, _size);
} else {
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN],
@@ -860,30 +964,43 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
}
bool routing_manager_impl::send_to(
+ const client_t _client,
const std::shared_ptr<endpoint_definition> &_target,
- std::shared_ptr<message> _message, bool _flush) {
+ std::shared_ptr<message> _message) {
bool is_sent(false);
- std::lock_guard<std::mutex> its_lock(serialize_mutex_);
- if (serializer_->serialize(_message.get())) {
- const byte_t *_data = serializer_->get_data();
- length_t _size = serializer_->get_size();
- e2e_buffer outputBuffer;
- if( configuration_->is_e2e_enabled()) {
+
+ std::shared_ptr<serializer> its_serializer(get_serializer());
+ if (its_serializer->serialize(_message.get())) {
+ const byte_t *its_data = its_serializer->get_data();
+ length_t its_size = its_serializer->get_size();
+ e2e_buffer its_output_buffer;
+ if (e2e_provider_) {
service_t its_service = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_data[VSOMEIP_SERVICE_POS_MIN],
+ its_data[VSOMEIP_SERVICE_POS_MAX]);
method_t its_method = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
- if( custom_protectors.count({its_service, its_method})) {
- outputBuffer.assign(_data, _data + VSOMEIP_PAYLOAD_POS);
- e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data +_size);
- custom_protectors[{its_service, its_method}]->protect( inputBuffer);
- outputBuffer.resize(inputBuffer.size() + VSOMEIP_PAYLOAD_POS);
- std::copy(inputBuffer.begin(), inputBuffer.end(), outputBuffer.begin() + VSOMEIP_PAYLOAD_POS);
- _data = outputBuffer.data();
+ its_data[VSOMEIP_METHOD_POS_MIN],
+ its_data[VSOMEIP_METHOD_POS_MAX]);
+#ifndef ANDROID
+ if(e2e_provider_->is_protected({its_service, its_method})) {
+ its_output_buffer.assign(its_data, its_data + VSOMEIP_PAYLOAD_POS);
+ e2e_buffer its_input_buffer(its_data + VSOMEIP_PAYLOAD_POS, its_data + its_size);
+ e2e_provider_->protect({its_service, its_method}, its_input_buffer);
+ its_output_buffer.resize(its_input_buffer.size() + VSOMEIP_PAYLOAD_POS);
+ std::copy(its_input_buffer.begin(), its_input_buffer.end(),
+ its_output_buffer.begin() + VSOMEIP_PAYLOAD_POS);
+ its_data = its_output_buffer.data();
}
+#endif
}
- is_sent = send_to(_target, _data, _size, _message->get_instance(), _flush);
- serializer_->reset();
+
+ const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MIN] = VSOMEIP_WORD_BYTE1(_client);
+ const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MAX] = VSOMEIP_WORD_BYTE0(_client);
+
+ is_sent = send_to(_target, its_data, its_size, _message->get_instance());
+
+ its_serializer->reset();
+ put_serializer(its_serializer);
} else {
VSOMEIP_ERROR<< "routing_manager_impl::send_to: serialization failed.";
}
@@ -892,10 +1009,10 @@ bool routing_manager_impl::send_to(
bool routing_manager_impl::send_to(
const std::shared_ptr<endpoint_definition> &_target,
- const byte_t *_data, uint32_t _size, instance_t _instance,
- bool _flush) {
- std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(
- _target->get_remote_port(), _target->is_reliable());
+ const byte_t *_data, uint32_t _size, instance_t _instance) {
+ std::shared_ptr<endpoint> its_endpoint =
+ ep_mgr_impl_->find_server_endpoint(
+ _target->get_remote_port(), _target->is_reliable());
if (its_endpoint) {
#ifdef USE_DLT
@@ -909,26 +1026,30 @@ bool routing_manager_impl::send_to(
#else
(void) _instance;
#endif
- return its_endpoint->send_to(_target, _data, _size, _flush);
+ return its_endpoint->send_to(_target, _data, _size);
}
return false;
}
-bool routing_manager_impl::send_to(
+bool routing_manager_impl::send_via_sd(
const std::shared_ptr<endpoint_definition> &_target,
const byte_t *_data, uint32_t _size, uint16_t _sd_port) {
- std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(
- _sd_port, _target->is_reliable());
+ std::shared_ptr<endpoint> its_endpoint =
+ ep_mgr_impl_->find_server_endpoint(_sd_port,
+ _target->is_reliable());
if (its_endpoint) {
#ifdef USE_DLT
- const uint16_t its_data_size
- = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
+ if (tc_->is_sd_enabled()) {
+ const uint16_t its_data_size
+ = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- trace::header its_header;
- if (its_header.prepare(its_endpoint, true, 0x0))
- tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
- _data, its_data_size);
+ trace::header its_header;
+ if (its_header.prepare(its_endpoint, true, 0x0))
+ tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
+ _data, its_data_size);
+
+ }
#endif
return its_endpoint->send_to(_target, _data, _size);
}
@@ -936,13 +1057,16 @@ bool routing_manager_impl::send_to(
return false;
}
-void routing_manager_impl::register_event(
- client_t _client, service_t _service, instance_t _instance,
- event_t _event, const std::set<eventgroup_t> &_eventgroups,
- bool _is_field, std::chrono::milliseconds _cycle,
- bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func,
+void routing_manager_impl::register_event(client_t _client,
+ service_t _service, instance_t _instance,
+ event_t _notifier,
+ const std::set<eventgroup_t> &_eventgroups, const event_type_e _type,
+ reliability_type_e _reliability,
+ std::chrono::milliseconds _cycle, bool _change_resets_cycle,
+ bool _update_on_change,
+ epsilon_change_func_t _epsilon_change_func,
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
- auto its_event = find_event(_service, _instance, _event);
+ auto its_event = find_event(_service, _instance, _notifier);
bool is_first(false);
if (its_event) {
if (!its_event->has_ref(_client, _is_provided)) {
@@ -952,8 +1076,11 @@ void routing_manager_impl::register_event(
is_first = true;
}
if (is_first) {
- routing_manager_base::register_event(_client, _service, _instance,
- _event, _eventgroups, _is_field, _cycle, _change_resets_cycle,
+ routing_manager_base::register_event(_client,
+ _service, _instance,
+ _notifier,
+ _eventgroups, _type, _reliability,
+ _cycle, _change_resets_cycle, _update_on_change,
_epsilon_change_func, _is_provided, _is_shadow,
_is_cache_placeholder);
}
@@ -961,17 +1088,20 @@ void routing_manager_impl::register_event(
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _event
+ << std::hex << std::setw(4) << std::setfill('0') << _notifier
<< ":is_provider=" << _is_provided << "]";
}
void routing_manager_impl::register_shadow_event(client_t _client,
service_t _service, instance_t _instance,
- event_t _event, const std::set<eventgroup_t> &_eventgroups,
- bool _is_field, bool _is_provided) {
- routing_manager_base::register_event(_client, _service, _instance,
- _event, _eventgroups, _is_field,
- std::chrono::milliseconds::zero(), false,
+ event_t _notifier,
+ const std::set<eventgroup_t> &_eventgroups, event_type_e _type,
+ reliability_type_e _reliability, bool _is_provided) {
+ routing_manager_base::register_event(_client,
+ _service, _instance,
+ _notifier,
+ _eventgroups, _type, _reliability,
+ std::chrono::milliseconds::zero(), false, true,
nullptr,
_is_provided, true);
}
@@ -985,56 +1115,49 @@ void routing_manager_impl::unregister_shadow_event(client_t _client,
void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload, client_t _client,
- bool _force, bool _flush, bool _remote_subscriber) {
+ bool _force
+#ifdef VSOMEIP_ENABLE_COMPAT
+ , bool _remote_subscriber
+#endif
+ ) {
if (find_local(_client)) {
routing_manager_base::notify_one(_service, _instance, _event, _payload,
- _client, _force, _flush, _remote_subscriber);
+ _client, _force
+#ifdef VSOMEIP_ENABLE_COMPAT
+ , _remote_subscriber
+#endif
+ );
} else {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
- // Event is valid for service/instance
- bool found_eventgroup(false);
- // Iterate over all groups of the event to ensure at least
- // one valid eventgroup for service/instance exists.
- for (auto its_group : its_event->get_eventgroups()) {
- auto its_eventgroup = find_eventgroup(_service, _instance, its_group);
+ std::set<std::shared_ptr<endpoint_definition> > its_targets;
+ const auto its_reliability = its_event->get_reliability();
+ for (const auto g : its_event->get_eventgroups()) {
+ const auto its_eventgroup = find_eventgroup(_service, _instance, g);
if (its_eventgroup) {
- // Eventgroup is valid for service/instance
- found_eventgroup = true;
- break;
- }
- }
- if (found_eventgroup) {
- const bool is_offered_both =
- (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT &&
- configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT);
-
- std::set<std::shared_ptr<endpoint_definition>> its_targets;
- // Now set event's payload!
- // Either with endpoint_definition (remote) or with client (local).
- {
- std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
- auto its_service = remote_subscribers_.find(_service);
- if (its_service != remote_subscribers_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_subscriber = its_instance->second.find(_client);
- if (its_subscriber != its_instance->second.end()) {
- its_targets = its_subscriber->second;
+ const auto its_subscriptions = its_eventgroup->get_remote_subscriptions();
+ for (const auto &s : its_subscriptions) {
+ if (s->has_client(_client)) {
+ if (its_reliability == reliability_type_e::RT_RELIABLE
+ || its_reliability == reliability_type_e::RT_BOTH) {
+ const auto its_reliable = s->get_reliable();
+ if (its_reliable)
+ its_targets.insert(its_reliable);
+ }
+ if (its_reliability == reliability_type_e::RT_UNRELIABLE
+ || its_reliability == reliability_type_e::RT_BOTH) {
+ const auto its_unreliable = s->get_unreliable();
+ if (its_unreliable)
+ its_targets.insert(its_unreliable);
}
}
}
}
+ }
+
+ if (its_targets.size() > 0) {
for (const auto &its_target : its_targets) {
- if (!is_offered_both) {
- its_event->set_payload(_payload, its_target, _force, _flush);
- } else {
- if (its_event->is_reliable() && its_target->is_reliable()) {
- its_event->set_payload(_payload, its_target, _force, _flush);
- } else if (!its_event->is_reliable() && !its_target->is_reliable()) {
- its_event->set_payload(_payload, its_target, _force, _flush);
- }
- }
+ its_event->set_payload(_payload, _client, its_target, _force);
}
}
} else {
@@ -1050,24 +1173,6 @@ void routing_manager_impl::on_availability(service_t _service, instance_t _insta
host_->on_availability(_service, _instance, _is_available, _major, _minor);
}
-void routing_manager_impl::on_error(
- const byte_t *_data, length_t _length, endpoint *_receiver,
- const boost::asio::ip::address &_remote_address,
- std::uint16_t _remote_port) {
- instance_t its_instance = 0;
- if (_length >= VSOMEIP_SERVICE_POS_MAX) {
- service_t its_service = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
- its_instance = find_instance(its_service, _receiver);
- }
- send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length, its_instance,
- _receiver->is_reliable(), _receiver, _remote_address, _remote_port);
-}
-
-void routing_manager_impl::release_port(uint16_t _port, bool _reliable) {
- std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_);
- used_client_ports_[_reliable].erase(_port);
-}
bool routing_manager_impl::offer_service_remotely(service_t _service,
instance_t _instance,
@@ -1156,7 +1261,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
clear_remote_subscriber(_service, _instance);
if (discovery_ && its_info) {
- discovery_->stop_offer_service(_service, _instance, its_info);
+ discovery_->stop_offer_service(its_info);
its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
}
} else {
@@ -1168,7 +1273,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
// ensure to not send StopOffer for endpoint on which the service is
// still offered
its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable);
- discovery_->stop_offer_service(_service, _instance, its_copied_info);
+ discovery_->stop_offer_service(its_copied_info);
}
}
@@ -1178,7 +1283,7 @@ bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
endpoint *_receiver, const boost::asio::ip::address &_destination,
- client_t _bound_client,
+ client_t _bound_client, credentials_t _credentials,
const boost::asio::ip::address &_remote_address,
std::uint16_t _remote_port) {
#if 0
@@ -1191,7 +1296,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
(void)_bound_client;
service_t its_service;
method_t its_method;
- bool its_is_crc_valid(true);
+ uint8_t its_check_status = e2e::profile_interface::generic_check_status::E2E_OK;
instance_t its_instance(0x0);
#ifdef USE_DLT
bool is_forwarded(true);
@@ -1215,7 +1320,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
}
}
} else {
- its_instance = find_instance(its_service, _receiver);
+ its_instance = ep_mgr_impl_->find_instance(its_service, _receiver);
if (its_instance == 0xFFFF) {
its_method = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_METHOD_POS_MIN],
@@ -1256,7 +1361,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
}
// Security checks if enabled!
- if (configuration_->is_security_enabled()) {
+ if (security::get()->is_enabled()) {
if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
client_t requester = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_CLIENT_POS_MIN],
@@ -1276,7 +1381,7 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
<< " which is already used locally ~> Skip message!";
return;
}
- if (!configuration_->is_remote_client_allowed()) {
+ if (!security::get()->is_remote_client_allowed()) {
// check if policy allows remote requests.
VSOMEIP_WARNING << "routing_manager_impl::on_message: "
<< std::hex << "Security: Remote client with client ID 0x" << requester
@@ -1287,37 +1392,28 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
}
}
}
- if( configuration_->is_e2e_enabled()) {
+ if (e2e_provider_) {
its_method = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
- if( custom_checkers.count({its_service, its_method})) {
+#ifndef ANDROID
+ if( e2e_provider_->is_checked({its_service, its_method})) {
e2e_buffer inputBuffer(_data + VSOMEIP_PAYLOAD_POS, _data + _size);
- e2e::profile_interface::generic_check_status check_status;
- custom_checkers[{its_service, its_method}]->check( inputBuffer, check_status);
+ e2e_provider_->check({its_service, its_method}, inputBuffer, its_check_status);
- if ( check_status != e2e::profile_interface::generic_check_status::E2E_OK ) {
+ if ( its_check_status != e2e::profile_interface::generic_check_status::E2E_OK ) {
VSOMEIP_INFO << std::hex << "E2E protection: CRC check failed for service: " << its_service << " method: " << its_method;
- its_is_crc_valid = false;
}
}
+#endif
}
- if (!deliver_specific_endpoint_message(
- its_service, its_instance, _data, _size, _receiver)) {
- // set client ID to zero for all messages
- if( utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
- byte_t *its_data = const_cast<byte_t *>(_data);
- its_data[VSOMEIP_CLIENT_POS_MIN] = 0x0;
- its_data[VSOMEIP_CLIENT_POS_MAX] = 0x0;
- }
- // Common way of message handling
+
+ // Common way of message handling
#ifdef USE_DLT
- is_forwarded =
+ is_forwarded =
#endif
- on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(),
- _bound_client, its_is_crc_valid, true);
-
- }
+ on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(),
+ _bound_client, _credentials, its_check_status, true);
}
}
#ifdef USE_DLT
@@ -1345,7 +1441,8 @@ bool routing_manager_impl::on_message(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _size,
bool _reliable, client_t _bound_client,
- bool _is_valid_crc,
+ credentials_t _credentials,
+ uint8_t _check_status,
bool _is_from_remote) {
#if 0
std::stringstream msg;
@@ -1367,15 +1464,15 @@ bool routing_manager_impl::on_message(
_data[VSOMEIP_CLIENT_POS_MAX]);
}
- if (its_client == VSOMEIP_ROUTING_CLIENT
- && utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
+ if (utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
is_forwarded = deliver_notification(_service, _instance, _data, _size,
- _reliable, _bound_client, _is_valid_crc, _is_from_remote);
+ _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
} else if (its_client == host_->get_client()) {
deliver_message(_data, _size, _instance,
- _reliable, _bound_client, _is_valid_crc, _is_from_remote);
+ _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
} else {
- send(its_client, _data, _size, _instance, true, _reliable, _bound_client, _is_valid_crc, _is_from_remote); //send to proxy
+ send(its_client, _data, _size, _instance, _reliable,
+ _bound_client, _credentials, _check_status, _is_from_remote); //send to proxy
}
return is_forwarded;
}
@@ -1384,9 +1481,7 @@ void routing_manager_impl::on_notification(client_t _client,
service_t _service, instance_t _instance,
const byte_t *_data, length_t _size, bool _notify_one) {
event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
-
+ _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
if (its_event) {
uint32_t its_length = utility::get_payload_size(_data, _size);
@@ -1396,197 +1491,37 @@ void routing_manager_impl::on_notification(client_t _client,
its_length);
if (_notify_one) {
- notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true, false);
+ notify_one(_service, _instance, its_event->get_event(),
+ its_payload, _client, true
+#ifdef VSOMEIP_ENABLE_COMPAT
+ , false
+#endif
+ );
} else {
if (its_event->is_field()) {
- if (its_event->is_set()) {
- its_event->set_payload(its_payload, false, true);
- } else {
- // new subscribers will be notified by SD via sent_initiale_events;
- if (its_event->get_remote_notification_pending()) {
- // Set payload first time ~> notify all remote subscriber per unicast (initial field)
- std::vector<std::unique_lock<std::mutex>> its_locks;
- std::vector<std::shared_ptr<eventgroupinfo>> its_eventgroupinfos;
- for (auto its_group : its_event->get_eventgroups()) {
- auto its_eventgroup = find_eventgroup(_service, _instance, its_group);
- if (its_eventgroup) {
- its_locks.push_back(its_eventgroup->get_subscription_lock());
- its_eventgroupinfos.push_back(its_eventgroup);
- }
- }
-
- bool is_offered_both(false);
- if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT &&
- configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) {
- is_offered_both = true;
- }
- for (const auto &its_eventgroup : its_eventgroupinfos) {
- //Unicast targets
- for (auto its_remote : its_eventgroup->get_targets()) {
- if (!is_offered_both) {
- its_event->set_payload(its_payload, its_remote.endpoint_, true, true);
- } else {
- bool was_set(false);
- if (its_event->is_reliable() && its_remote.endpoint_->is_reliable()) {
- its_event->set_payload(its_payload, its_remote.endpoint_, true, true);
- was_set = true;
- }
- if (!its_event->is_reliable() && !its_remote.endpoint_->is_reliable()) {
- its_event->set_payload(its_payload, its_remote.endpoint_, true, true);
- was_set = true;
- }
- if (!was_set) {
- its_event->set_payload_dont_notify(its_payload);
- }
- }
- }
- }
- its_event->set_remote_notification_pending(false);
- } else {
- its_event->set_payload_dont_notify(its_payload);
- }
+ if (!its_event->set_payload_notify_pending(its_payload)) {
+ its_event->set_payload(its_payload, false);
}
} else {
- its_event->set_payload(its_payload, false, true);
+ its_event->set_payload(its_payload, false, true);
}
}
}
}
-void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
- // Is called when endpoint->connect succeeded!
- struct service_info {
- service_t service_id_;
- instance_t instance_id_;
- major_version_t major_;
- minor_version_t minor_;
- bool reliable_;
- std::shared_ptr<endpoint> endpoint_;
- };
-
- // Set to state CONNECTED as connection is not yet fully established in remote side POV
- // but endpoint is ready to send / receive. Set to ESTABLISHED after timer expires
- // to prevent inserting subscriptions twice or send out subscription before remote side
- // is finished with TCP 3 way handshake
- _endpoint->set_connected(true);
-
- std::forward_list<struct service_info> services_to_report_;
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- for (auto &its_service : remote_services_) {
- for (auto &its_instance : its_service.second) {
- for (auto &its_client : its_instance.second) {
- if (its_client.first == VSOMEIP_ROUTING_CLIENT ||
- its_client.first == get_client()) {
- auto found_endpoint = its_client.second.find(false);
- if (found_endpoint != its_client.second.end()) {
- if (found_endpoint->second.get() == _endpoint.get()) {
- std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
- if (!its_info) {
- _endpoint->set_established(true);
- return;
- }
- services_to_report_.push_front(
- { its_service.first,
- its_instance.first,
- its_info->get_major(),
- its_info->get_minor(),
- false, _endpoint });
- }
- }
- found_endpoint = its_client.second.find(true);
- if (found_endpoint != its_client.second.end()) {
- if (found_endpoint->second.get() == _endpoint.get()) {
- std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
- if (!its_info) {
- _endpoint->set_established(true);
- return;
- }
- services_to_report_.push_front(
- { its_service.first,
- its_instance.first,
- its_info->get_major(),
- its_info->get_minor(),
- true, _endpoint });
- }
- }
- }
- }
- }
+bool routing_manager_impl::is_last_stop_callback(const uint32_t _callback_id) {
+ bool last_callback(false);
+ auto found_id = callback_counts_.find(_callback_id);
+ if (found_id != callback_counts_.end()) {
+ found_id->second--;
+ if (found_id->second == 0) {
+ last_callback = true;
}
}
-
- for (const auto &s : services_to_report_) {
- on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_);
- if (s.reliable_) {
- stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, s.service_id_,
- s.instance_id_, s.major_, s.minor_);
- }
- std::shared_ptr<boost::asio::steady_timer> its_timer =
- std::make_shared<boost::asio::steady_timer>(io_);
- boost::system::error_code ec;
- its_timer->expires_from_now(std::chrono::milliseconds(3), ec);
- if (!ec) {
- its_timer->async_wait(
- std::bind(
- &routing_manager_impl::call_sd_endpoint_connected,
- std::static_pointer_cast<routing_manager_impl>(
- shared_from_this()),
- std::placeholders::_1, s.service_id_,
- s.instance_id_, s.endpoint_, its_timer));
- } else {
- VSOMEIP_ERROR<< "routing_manager_impl::on_connect: " << ec.message();
- }
- }
- if (services_to_report_.empty()) {
- _endpoint->set_established(true);
- }
-}
-
-void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
- // Is called when endpoint->connect fails!
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- for (auto &its_service : remote_services_) {
- for (auto &its_instance : its_service.second) {
- for (auto &its_client : its_instance.second) {
- if (its_client.first == VSOMEIP_ROUTING_CLIENT ||
- its_client.first == get_client()) {
- auto found_endpoint = its_client.second.find(false);
- if (found_endpoint != its_client.second.end()) {
- if (found_endpoint->second.get() == _endpoint.get()) {
-
- std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
- if(!its_info){
- return;
- }
- on_availability(its_service.first, its_instance.first,
- false, its_info->get_major(), its_info->get_minor());
- }
- }
- found_endpoint = its_client.second.find(true);
- if (found_endpoint != its_client.second.end()) {
- if (found_endpoint->second.get() == _endpoint.get()) {
-
- std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
- if(!its_info){
- return;
- }
- on_availability(its_service.first, its_instance.first,
- false, its_info->get_major(), its_info->get_minor());
- stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT,
- its_service.first, its_instance.first,
- its_info->get_major(),
- its_info->get_minor());
- VSOMEIP_WARNING << __func__
- << ": lost connection to remote service: "
- << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_instance.first;
- }
- }
- }
- }
- }
+ if (last_callback) {
+ callback_counts_.erase(_callback_id);
}
+ return last_callback;
}
void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _service,
@@ -1643,47 +1578,133 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
if (its_info) {
its_reliable_endpoint = its_info->get_endpoint(true);
its_unreliable_endpoint = its_info->get_endpoint(false);
- }
+ static std::atomic<uint32_t> callback_id(0);
+ const uint32_t its_callback_id = ++callback_id;
+
+ struct ready_to_stop_t {
+ ready_to_stop_t() : reliable_(false), unreliable_(false){}
+ std::atomic<bool> reliable_;
+ std::atomic<bool> unreliable_;
+ };
+ auto ready_to_stop = std::make_shared<ready_to_stop_t>();
+ auto ptr = shared_from_this();
+
+ auto callback = [&, its_callback_id, ptr, its_info, its_reliable_endpoint, its_unreliable_endpoint,
+ ready_to_stop, _service, _instance, _major, _minor]
+ (std::shared_ptr<endpoint> _endpoint, service_t _stopped_service) {
+ (void)_stopped_service;
+ if (its_reliable_endpoint && its_reliable_endpoint == _endpoint) {
+ ready_to_stop->reliable_ = true;
+ }
+ if (its_unreliable_endpoint && its_unreliable_endpoint == _endpoint) {
+ ready_to_stop->unreliable_ = true;
+ }
+ if ((its_unreliable_endpoint && !ready_to_stop->unreliable_) ||
+ (its_reliable_endpoint && !ready_to_stop->reliable_)) {
+ {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ if (is_last_stop_callback(its_callback_id)) {
+ erase_offer_command(_service, _instance);
+ }
+ }
+ return;
+ }
- if (discovery_) {
- if (its_info) {
- if (its_info->get_major() == _major && its_info->get_minor() == _minor) {
- discovery_->stop_offer_service(_service, _instance, its_info);
+ if (discovery_) {
+ if (its_info->get_major() == _major && its_info->get_minor() == _minor) {
+ discovery_->stop_offer_service(its_info);
+ }
}
- }
- }
- del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr),
- (its_unreliable_endpoint != nullptr));
+ del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr),
+ (its_unreliable_endpoint != nullptr));
- // Cleanup reliable & unreliable server endpoints hold before
- if (its_info) {
- if (its_unreliable_endpoint) {
- cleanup_server_endpoint(_service, its_unreliable_endpoint);
- // Clear service info and service group
- clear_service_info(_service, _instance, false);
+ for (const auto& ep: {its_reliable_endpoint, its_unreliable_endpoint}) {
+ if (ep) {
+ if (ep_mgr_impl_->remove_instance(_service, ep.get())) {
+ {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ callback_counts_[its_callback_id]++;
+ }
+ // last instance -> pass ANY_INSTANCE and shutdown completely
+ ep->prepare_stop(
+ [&, _service, _instance, its_callback_id, ptr, its_reliable_endpoint, its_unreliable_endpoint]
+ (std::shared_ptr<endpoint> _endpoint,
+ service_t _stopped_service) {
+ (void)_stopped_service;
+ if (ep_mgr_impl_->remove_server_endpoint(
+ _endpoint->get_local_port(),
+ _endpoint->is_reliable())) {
+ _endpoint->stop();
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ if (is_last_stop_callback(its_callback_id)) {
+ erase_offer_command(_service, _instance);
+ }
+ }
+ }, ANY_SERVICE);
+ }
+ // Clear service info and service group
+ clear_service_info(_service, _instance, ep->is_reliable());
+ }
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ if (is_last_stop_callback(its_callback_id)) {
+ erase_offer_command(_service, _instance);
+ }
+ }
+ };
+
+ // determine callback count
+ for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
+ if (ep) {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ auto found_id = callback_counts_.find(its_callback_id);
+ if (found_id != callback_counts_.end()) {
+ found_id->second++;
+ } else {
+ callback_counts_[its_callback_id] = 1;
+ }
+ }
}
- if (its_reliable_endpoint) {
- cleanup_server_endpoint(_service, its_reliable_endpoint);
- // Clear service info and service group
- clear_service_info(_service, _instance, true);
+ for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
+ if (ep) {
+ ep->prepare_stop(callback, _service);
+ }
+ }
+
+ if (!its_reliable_endpoint && !its_unreliable_endpoint) {
+ {
+ std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
+ callback_counts_.erase(its_callback_id);
+ }
+ erase_offer_command(_service, _instance);
}
+ } else {
+ erase_offer_command(_service, _instance);
}
}
bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
- instance_t _instance, bool _reliable, client_t _bound_client, bool _is_valid_crc, bool _is_from_remote) {
+ instance_t _instance, bool _reliable, client_t _bound_client, credentials_t _credentials,
+ uint8_t _status_check, bool _is_from_remote) {
bool is_delivered(false);
+ std::uint32_t its_sender_uid = std::get<0>(_credentials);
+ std::uint32_t its_sender_gid = std::get<1>(_credentials);
- auto a_deserializer = get_deserializer();
- a_deserializer->set_data(_data, _size);
- std::shared_ptr<message> its_message(a_deserializer->deserialize_message());
- a_deserializer->reset();
- put_deserializer(a_deserializer);
+ auto its_deserializer = get_deserializer();
+ its_deserializer->set_data(_data, _size);
+ std::shared_ptr<message_impl> its_message(its_deserializer->deserialize_message());
+ its_deserializer->reset();
+ put_deserializer(its_deserializer);
if (its_message) {
its_message->set_instance(_instance);
its_message->set_reliable(_reliable);
- its_message->set_is_valid_crc(_is_valid_crc);
+ its_message->set_check_result(_status_check);
+ its_message->set_uid(std::get<0>(_credentials));
+ its_message->set_gid(std::get<1>(_credentials));
if (!_is_from_remote) {
if (utility::is_notification(its_message->get_message_type())) {
@@ -1698,7 +1719,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
<< " ~> Skip message!";
return false;
} else {
- if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
+ if (!security::get()->is_client_allowed(own_uid_, own_gid_,
+ get_client(), its_message->get_service(),
its_message->get_instance(), its_message->get_method())) {
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
<< " : routing_manager_impl::deliver_message: "
@@ -1711,7 +1733,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
}
}
} else if (utility::is_request(its_message->get_message_type())) {
- if (configuration_->is_security_enabled()
+ if (security::get()->is_enabled()
&& its_message->get_client() != _bound_client) {
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
<< " : routing_manager_impl::deliver_message:"
@@ -1720,12 +1742,13 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
<< its_message->get_service() << "/" << its_message->get_instance()
<< "/" << its_message->get_method() << " which doesn't match the bound client 0x"
<< std::setw(4) << std::setfill('0') << _bound_client
- << " ~> skip message!";
+ << " ~> Skip message!";
return false;
}
- if (!configuration_->is_client_allowed(its_message->get_client(),
- its_message->get_service(), its_message->get_instance(), its_message->get_method())) {
+ if (!security::get()->is_client_allowed(its_sender_uid, its_sender_gid,
+ its_message->get_client(), its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
<< " : routing_manager_impl::deliver_message: "
<< " isn't allowed to send a request to service/instance/method "
@@ -1746,8 +1769,9 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
<< " ~> Skip message!";
return false;
} else {
- if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
- its_message->get_instance(), its_message->get_method())) {
+ if (!security::get()->is_client_allowed(own_uid_, own_gid_,
+ get_client(), its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
<< " : routing_manager_impl::deliver_message: "
<< " isn't allowed to receive a response from service/instance/method "
@@ -1760,7 +1784,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
}
}
} else {
- if (!configuration_->is_remote_client_allowed()) {
+ if (!security::get()->is_remote_client_allowed()) {
// if the message is from remote, check if
// policy allows remote requests.
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
@@ -1773,7 +1797,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
<< " ~> Skip message!";
return false;
} else if (utility::is_notification(its_message->get_message_type())) {
- if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
+ if (!security::get()->is_client_allowed(own_uid_, own_gid_,
+ get_client(), its_message->get_service(),
its_message->get_instance(), its_message->get_method())) {
VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
<< " : routing_manager_impl::deliver_message: "
@@ -1800,11 +1825,14 @@ bool routing_manager_impl::deliver_notification(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _length,
bool _reliable, client_t _bound_client,
- bool _is_valid_crc, bool _is_from_remote) {
- method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
+ credentials_t _credentials,
+ uint8_t _status_check, bool _is_from_remote) {
+ event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
+ client_t its_client_id = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
- std::shared_ptr<event> its_event = find_event(_service, _instance, its_method);
+ std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
if (its_event) {
if (!its_event->is_provided()) {
if (its_event->get_subscribers().size() == 0) {
@@ -1843,17 +1871,46 @@ bool routing_manager_impl::deliver_notification(
}
}
- for (const auto its_local_client : its_event->get_subscribers()) {
- if (its_local_client == host_->get_client()) {
- deliver_message(_data, _length, _instance, _reliable, _bound_client, _is_valid_crc, _is_from_remote);
- } else {
- std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
- if (its_local_target) {
- send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
- _data, _length, _instance, true, _reliable, VSOMEIP_SEND, _is_valid_crc);
+ if (its_event->get_type() != event_type_e::ET_SELECTIVE_EVENT) {
+ for (const auto its_local_client : its_event->get_subscribers()) {
+ if (its_local_client == host_->get_client()) {
+ deliver_message(_data, _length, _instance, _reliable,
+ _bound_client, _credentials, _status_check, _is_from_remote);
+ } else {
+ std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
+ if (its_local_target) {
+ send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
+ _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
+ }
+ }
+ }
+ } else {
+ // TODO: Check whether it makes more sense to set the client id
+ // for internal selective events. This would create some extra
+ // effort but we could avoid this hack.
+ if (its_client_id == VSOMEIP_ROUTING_CLIENT)
+ its_client_id = get_client();
+
+ auto its_subscribers = its_event->get_subscribers();
+ if (its_subscribers.find(its_client_id) != its_subscribers.end()) {
+ if (its_client_id == host_->get_client()) {
+ deliver_message(_data, _length, _instance, _reliable,
+ _bound_client, _credentials, _status_check, _is_from_remote);
+ } else {
+ std::shared_ptr<endpoint> its_local_target = find_local(its_client_id);
+ if (its_local_target) {
+ send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
+ _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
+ }
}
}
}
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Event ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event_id << "]"
+ << " is not registered. The message is dropped.";
}
return true;
@@ -1865,34 +1922,40 @@ std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup(
return routing_manager_base::find_eventgroup(_service, _instance, _eventgroup);
}
-const std::shared_ptr<configuration> routing_manager_impl::get_configuration() const {
- return (host_->get_configuration());
-}
-
std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoint(
const std::string &_address, uint16_t _port, bool _reliable) {
- std::shared_ptr<endpoint> its_service_endpoint = find_server_endpoint(_port,
- _reliable);
+ std::shared_ptr<endpoint> its_service_endpoint =
+ ep_mgr_impl_->find_server_endpoint(_port, _reliable);
if (!its_service_endpoint) {
try {
- its_service_endpoint = create_server_endpoint(_port, _reliable,
- true);
+ its_service_endpoint =
+ ep_mgr_impl_->create_server_endpoint(_port,
+ _reliable, true);
if (its_service_endpoint) {
- sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR,
- DEFAULT_TTL, false); // false, because we do _not_ want to announce it...
+ sd_info_ = std::make_shared<serviceinfo>(
+ VSOMEIP_SD_SERVICE, VSOMEIP_SD_INSTANCE,
+ ANY_MAJOR, ANY_MINOR, DEFAULT_TTL,
+ false); // false, because we do _not_ want to announce it...
sd_info_->set_endpoint(its_service_endpoint, _reliable);
its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
_address, _port);
- its_service_endpoint->join(_address);
+ if (!_reliable) {
+#if defined(_WIN32) || defined(ANDROID)
+ dynamic_cast<udp_server_endpoint_impl*>(
+ its_service_endpoint.get())->join(_address);
+#else
+ reinterpret_cast<udp_server_endpoint_impl*>(
+ its_service_endpoint.get())->join(_address);
+#endif
+ }
} else {
VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
"Please check your network configuration.";
}
} catch (const std::exception &e) {
- host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
- VSOMEIP_ERROR << "Service Discovery endpoint could not be created: "
- << e.what();
+ VSOMEIP_ERROR << "Server endpoint creation failed: Service "
+ "Discovery endpoint could not be created: " << e.what();
}
}
return its_service_endpoint;
@@ -1900,8 +1963,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin
services_t routing_manager_impl::get_offered_services() const {
services_t its_services;
- for (auto s : get_services()) {
- for (auto i : s.second) {
+ for (const auto& s : get_services()) {
+ for (const auto& i : s.second) {
if (i.second->is_local()) {
its_services[s.first][i.first] = i.second;
}
@@ -1926,7 +1989,7 @@ routing_manager_impl::get_offered_service_instances(service_t _service) const {
const services_t its_services(get_services());
const auto found_service = its_services.find(_service);
if (found_service != its_services.end()) {
- for (const auto i : found_service->second) {
+ for (const auto& i : found_service->second) {
if (i.second->is_local()) {
its_instances[i.first] = i.second;
}
@@ -1935,25 +1998,6 @@ routing_manager_impl::get_offered_service_instances(service_t _service) const {
return its_instances;
}
-std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
- service_t _service, instance_t _instance, bool _reliable, client_t _client) {
- std::shared_ptr<endpoint> its_endpoint;
- bool start_endpoint(false);
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- its_endpoint = find_remote_client(_service, _instance, _reliable, _client);
- if (!its_endpoint) {
- its_endpoint = create_remote_client(_service, _instance, _reliable, _client);
- start_endpoint = true;
- }
- }
- if (start_endpoint && its_endpoint
- && configuration_->is_someip(_service, _instance)) {
- its_endpoint->start();
- }
- return its_endpoint;
-}
-
///////////////////////////////////////////////////////////////////////////////
// PRIVATE
///////////////////////////////////////////////////////////////////////////////
@@ -1969,39 +2013,29 @@ void routing_manager_impl::init_service_info(
return;
}
if (configuration_) {
- std::shared_ptr<endpoint> its_reliable_endpoint;
- std::shared_ptr<endpoint> its_unreliable_endpoint;
-
- uint16_t its_reliable_port = configuration_->get_reliable_port(_service,
- _instance);
- uint16_t its_unreliable_port = configuration_->get_unreliable_port(
- _service, _instance);
-
- bool is_someip = configuration_->is_someip(_service, _instance);
-
// Create server endpoints for local services only
if (_is_local_service) {
+ const bool is_someip = configuration_->is_someip(_service, _instance);
+ uint16_t its_reliable_port = configuration_->get_reliable_port(
+ _service, _instance);
if (ILLEGAL_PORT != its_reliable_port) {
- its_reliable_endpoint = find_or_create_server_endpoint(
- its_reliable_port, true, is_someip);
+ std::shared_ptr<endpoint> its_reliable_endpoint =
+ ep_mgr_impl_->find_or_create_server_endpoint(
+ its_reliable_port, true, is_someip, _service,
+ _instance);
if (its_reliable_endpoint) {
its_info->set_endpoint(its_reliable_endpoint, true);
- its_reliable_endpoint->increment_use_count();
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- service_instances_[_service][its_reliable_endpoint.get()] =
- _instance;
}
}
-
+ uint16_t its_unreliable_port = configuration_->get_unreliable_port(
+ _service, _instance);
if (ILLEGAL_PORT != its_unreliable_port) {
- its_unreliable_endpoint = find_or_create_server_endpoint(
- its_unreliable_port, false, is_someip);
+ std::shared_ptr<endpoint> its_unreliable_endpoint =
+ ep_mgr_impl_->find_or_create_server_endpoint(
+ its_unreliable_port, false, is_someip, _service,
+ _instance);
if (its_unreliable_endpoint) {
its_info->set_endpoint(its_unreliable_endpoint, false);
- its_unreliable_endpoint->increment_use_count();
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- service_instances_[_service][its_unreliable_endpoint.get()] =
- _instance;
}
}
@@ -2013,143 +2047,8 @@ void routing_manager_impl::init_service_info(
}
}
} else {
- host_->on_error(error_code_e::CONFIGURATION_MISSING);
- }
-}
-
-std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
- const boost::asio::ip::address &_address,
- uint16_t _local_port, uint16_t _remote_port,
- bool _reliable, client_t _client) {
- (void)_client;
-
- std::shared_ptr<endpoint> its_endpoint;
- try {
- if (_reliable) {
- its_endpoint = std::make_shared<tcp_client_endpoint_impl>(
- shared_from_this(),
- boost::asio::ip::tcp::endpoint(
- (_address.is_v4() ?
- boost::asio::ip::tcp::v4() :
- boost::asio::ip::tcp::v6()),
- _local_port),
- boost::asio::ip::tcp::endpoint(_address, _remote_port),
- io_,
- configuration_->get_max_message_size_reliable(
- _address.to_string(), _remote_port),
- configuration_->get_buffer_shrink_threshold(),
- // send timeout after 2/3 of configured ttl, warning after 1/3
- std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
- configuration_->get_endpoint_queue_limit(
- _address.to_string(), _remote_port),
- configuration_->get_max_tcp_restart_aborts(),
- configuration_->get_max_tcp_connect_time());
-
- if (configuration_->has_enabled_magic_cookies(_address.to_string(),
- _remote_port)) {
- its_endpoint->enable_magic_cookies();
- }
- } else {
- its_endpoint = std::make_shared<udp_client_endpoint_impl>(
- shared_from_this(),
- boost::asio::ip::udp::endpoint(
- (_address.is_v4() ?
- boost::asio::ip::udp::v4() :
- boost::asio::ip::udp::v6()),
- _local_port),
- boost::asio::ip::udp::endpoint(_address, _remote_port),
- io_, configuration_->get_endpoint_queue_limit(
- _address.to_string(), _remote_port),
- configuration_->get_udp_receive_buffer_size());
- }
- } catch (...) {
- host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED);
+ VSOMEIP_ERROR << "Missing vsomeip configuration.";
}
-
- return (its_endpoint);
-}
-
-std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
- uint16_t _port, bool _reliable, bool _start) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- std::shared_ptr<endpoint> its_endpoint;
- try {
- boost::asio::ip::address its_unicast = configuration_->get_unicast_address();
- if (_start) {
- if (_reliable) {
- its_endpoint = std::make_shared<tcp_server_endpoint_impl>(
- shared_from_this(),
- boost::asio::ip::tcp::endpoint(its_unicast, _port), io_,
- configuration_->get_max_message_size_reliable(
- its_unicast.to_string(), _port),
- configuration_->get_buffer_shrink_threshold(),
- // send timeout after 2/3 of configured ttl, warning after 1/3
- std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
- configuration_->get_endpoint_queue_limit(
- its_unicast.to_string(), _port));
- if (configuration_->has_enabled_magic_cookies(
- its_unicast.to_string(), _port) ||
- configuration_->has_enabled_magic_cookies(
- "local", _port)) {
- its_endpoint->enable_magic_cookies();
- }
- } else {
- configuration::endpoint_queue_limit_t its_limit =
- configuration_->get_endpoint_queue_limit(
- its_unicast.to_string(), _port);
-#ifndef _WIN32
- if (its_unicast.is_v4()) {
- its_unicast = boost::asio::ip::address_v4::any();
- } else if (its_unicast.is_v6()) {
- its_unicast = boost::asio::ip::address_v6::any();
- }
-#endif
- boost::asio::ip::udp::endpoint ep(its_unicast, _port);
- its_endpoint = std::make_shared<udp_server_endpoint_impl>(
- shared_from_this(), ep, io_, its_limit,
- configuration_->get_udp_receive_buffer_size());
- }
-
- } else {
- its_endpoint = std::make_shared<virtual_server_endpoint_impl>(
- its_unicast.to_string(), _port, _reliable);
- }
-
- if (its_endpoint) {
- server_endpoints_[_port][_reliable] = its_endpoint;
- its_endpoint->start();
- }
- } catch (const std::exception &e) {
- host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
- VSOMEIP_ERROR << e.what();
- }
-
- return (its_endpoint);
-}
-
-std::shared_ptr<endpoint> routing_manager_impl::find_server_endpoint(
- uint16_t _port, bool _reliable) const {
- std::shared_ptr<endpoint> its_endpoint;
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto found_port = server_endpoints_.find(_port);
- if (found_port != server_endpoints_.end()) {
- auto found_endpoint = found_port->second.find(_reliable);
- if (found_endpoint != found_port->second.end()) {
- its_endpoint = found_endpoint->second;
- }
- }
-
- return (its_endpoint);
-}
-
-std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint(
- uint16_t _port, bool _reliable, bool _start) {
- std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(_port,
- _reliable);
- if (!its_endpoint) {
- its_endpoint = create_server_endpoint(_port, _reliable, _start);
- }
- return (its_endpoint);
}
void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
@@ -2167,8 +2066,8 @@ void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
auto its_client = requested_services_.find(_client);
if (its_client != requested_services_.end()) {
- for (auto its_service : its_client->second) {
- for (auto its_instance : its_service.second) {
+ for (const auto& its_service : its_client->second) {
+ for (const auto& its_instance : its_service.second) {
services_to_release_.push_front(
{ its_service.first, its_instance.first });
}
@@ -2180,162 +2079,6 @@ void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
}
}
-instance_t routing_manager_impl::find_instance(service_t _service,
- endpoint * _endpoint) {
- instance_t its_instance(0xFFFF);
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto found_service = service_instances_.find(_service);
- if (found_service != service_instances_.end()) {
- auto found_endpoint = found_service->second.find(_endpoint);
- if (found_endpoint != found_service->second.end()) {
- its_instance = found_endpoint->second;
- }
- }
- }
- return (its_instance);
-}
-
-std::shared_ptr<endpoint> routing_manager_impl::create_remote_client(
- service_t _service, instance_t _instance, bool _reliable, client_t _client) {
- std::shared_ptr<endpoint> its_endpoint;
- std::shared_ptr<endpoint_definition> its_endpoint_def;
- uint16_t its_local_port;
- uint16_t its_remote_port = ILLEGAL_PORT;
-
- auto found_service = remote_service_info_.find(_service);
- if (found_service != remote_service_info_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_reliability = found_instance->second.find(_reliable);
- if (found_reliability != found_instance->second.end()) {
- its_endpoint_def = found_reliability->second;
- its_remote_port = its_endpoint_def->get_port();
- }
- }
- }
-
- if( its_remote_port != ILLEGAL_PORT) {
- // if client port range for remote service port range is configured
- // and remote port is in range, determine unused client port
- std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
- if (configuration_->get_client_port(_service, _instance, its_remote_port, _reliable,
- used_client_ports_, its_local_port)) {
- if(its_endpoint_def) {
- its_endpoint = create_client_endpoint(
- its_endpoint_def->get_address(),
- its_local_port,
- its_endpoint_def->get_port(),
- _reliable, _client
- );
- }
-
- if (its_endpoint) {
- used_client_ports_[_reliable].insert(its_local_port);
- its_lock.unlock();
- service_instances_[_service][its_endpoint.get()] = _instance;
- remote_services_[_service][_instance][_client][_reliable] = its_endpoint;
- if (_client == VSOMEIP_ROUTING_CLIENT) {
- client_endpoints_by_ip_[its_endpoint_def->get_address()]
- [its_endpoint_def->get_port()]
- [_reliable] = its_endpoint;
- // Set the basic route to the service in the service info
- auto found_service_info = find_service(_service, _instance);
- if (found_service_info) {
- found_service_info->set_endpoint(its_endpoint, _reliable);
- }
- }
- }
- }
- }
- return its_endpoint;
-}
-
-
-std::shared_ptr<endpoint> routing_manager_impl::find_remote_client(
- service_t _service, instance_t _instance, bool _reliable, client_t _client) {
- std::shared_ptr<endpoint> its_endpoint;
- auto found_service = remote_services_.find(_service);
- if (found_service != remote_services_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_client = found_instance->second.find(_client);
- if (found_client != found_instance->second.end()) {
- auto found_reliability = found_client->second.find(_reliable);
- if (found_reliability != found_client->second.end()) {
- its_endpoint = found_reliability->second;
- }
- }
- }
- }
- if (its_endpoint || _client != VSOMEIP_ROUTING_CLIENT) {
- return its_endpoint;
- }
-
- // If another service is hosted on the same server_endpoint
- // reuse the existing client_endpoint.
- auto found_service_info = remote_service_info_.find(_service);
- if(found_service_info != remote_service_info_.end()) {
- auto found_instance = found_service_info->second.find(_instance);
- if(found_instance != found_service_info->second.end()) {
- auto found_reliable = found_instance->second.find(_reliable);
- if(found_reliable != found_instance->second.end()) {
- std::shared_ptr<endpoint_definition> its_ep_def =
- found_reliable->second;
- auto found_address = client_endpoints_by_ip_.find(
- its_ep_def->get_address());
- if(found_address != client_endpoints_by_ip_.end()) {
- auto found_port = found_address->second.find(
- its_ep_def->get_remote_port());
- if(found_port != found_address->second.end()) {
- auto found_reliable2 = found_port->second.find(
- _reliable);
- if(found_reliable2 != found_port->second.end()) {
- its_endpoint = found_reliable2->second;
- // store the endpoint under this service/instance id
- // as well - needed for later cleanup
- remote_services_[_service][_instance][_client][_reliable] =
- its_endpoint;
- service_instances_[_service][its_endpoint.get()] = _instance;
- // add endpoint to serviceinfo object
- auto found_service_info = find_service(_service,_instance);
- if (found_service_info) {
- found_service_info->set_endpoint(its_endpoint, _reliable);
- }
- }
- }
- }
- }
- }
- }
- return its_endpoint;
-}
-
-client_t routing_manager_impl::find_client(
- service_t _service, instance_t _instance,
- const std::shared_ptr<eventgroupinfo> &_eventgroup,
- const std::shared_ptr<endpoint_definition> &_target) const {
- client_t its_client = VSOMEIP_ROUTING_CLIENT;
- if (!_eventgroup->is_multicast()) {
- if (!_target->is_reliable()) {
- uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance);
- auto endpoint = find_server_endpoint(unreliable_port, false);
- if (endpoint) {
- its_client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)->
- get_client(_target);
- }
- } else {
- uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance);
- auto endpoint = find_server_endpoint(reliable_port, true);
- if (endpoint) {
- its_client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)->
- get_client(_target);
- }
- }
- }
- return its_client;
-}
-
bool routing_manager_impl::is_field(service_t _service, instance_t _instance,
event_t _event) const {
std::lock_guard<std::mutex> its_lock(events_mutex_);
@@ -2398,60 +2141,12 @@ void routing_manager_impl::add_routing_info(
// Check whether remote services are unchanged
bool is_reliable_known(false);
bool is_unreliable_known(false);
-
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto found_service = remote_service_info_.find(_service);
- if (found_service != remote_service_info_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- std::shared_ptr<endpoint_definition> its_definition;
- if (_reliable_port != ILLEGAL_PORT) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- its_definition = found_reliable->second;
- if (its_definition->get_address() == _reliable_address
- && its_definition->get_port() == _reliable_port) {
- is_reliable_known = true;
- } else {
- VSOMEIP_WARNING << "Reliable service endpoint has changed: ["
- << std::hex << std::setfill('0') << std::setw(4) << _service << "."
- << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
- << std::dec << static_cast<std::uint32_t>(_major) << "."
- << std::dec << _minor << "] old: "
- << its_definition->get_address().to_string() << ":"
- << its_definition->get_port() << " new: "
- << _reliable_address.to_string() << ":"
- << _reliable_port;
- }
- }
- }
- if (_unreliable_port != ILLEGAL_PORT) {
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- its_definition = found_unreliable->second;
- if (its_definition->get_address() == _unreliable_address
- && its_definition->get_port() == _unreliable_port) {
- is_unreliable_known = true;
- } else {
- VSOMEIP_WARNING << "Unreliable service endpoint has changed: ["
- << std::hex << std::setfill('0') << std::setw(4) << _service << "."
- << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
- << std::dec << static_cast<std::uint32_t>(_major) << "."
- << std::dec << _minor << "] old: "
- << its_definition->get_address().to_string() << ":"
- << its_definition->get_port() << " new: "
- << _unreliable_address.to_string() << ":"
- << _unreliable_port;
- }
- }
- }
- }
- }
- }
+ ep_mgr_impl_->is_remote_service_known(_service, _instance, _major,
+ _minor, _reliable_address, _reliable_port, &is_reliable_known,
+ _unreliable_address, _unreliable_port, &is_unreliable_known);
bool udp_inserted(false);
-
+ bool tcp_inserted(false);
// Add endpoint(s) if necessary
if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) {
std::shared_ptr<endpoint_definition> endpoint_def_tcp
@@ -2459,15 +2154,14 @@ void routing_manager_impl::add_routing_info(
if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
std::shared_ptr<endpoint_definition> endpoint_def_udp
= endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- remote_service_info_[_service][_instance][false] = endpoint_def_udp;
- remote_service_info_[_service][_instance][true] = endpoint_def_tcp;
- }
+ ep_mgr_impl_->add_remote_service_info(_service, _instance,
+ endpoint_def_tcp, endpoint_def_udp);
udp_inserted = true;
+ tcp_inserted = true;
} else {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- remote_service_info_[_service][_instance][true] = endpoint_def_tcp;
+ ep_mgr_impl_->add_remote_service_info(_service, _instance,
+ endpoint_def_tcp);
+ tcp_inserted = true;
}
// check if service was requested and establish TCP connection if necessary
@@ -2489,11 +2183,13 @@ void routing_manager_impl::add_routing_info(
// SWS_SD_00376 establish TCP connection to service
// service is marked as available later in on_connect()
if(!connected) {
- find_or_create_remote_client(_service, _instance,
- true, VSOMEIP_ROUTING_CLIENT);
if (udp_inserted) {
- find_or_create_remote_client(_service, _instance,
- false, VSOMEIP_ROUTING_CLIENT);
+ // atomically create reliable and unreliable endpoint
+ ep_mgr_impl_->find_or_create_remote_client(
+ _service, _instance, VSOMEIP_ROUTING_CLIENT);
+ } else {
+ ep_mgr_impl_->find_or_create_remote_client(
+ _service, _instance, true, VSOMEIP_ROUTING_CLIENT);
}
connected = true;
}
@@ -2505,10 +2201,6 @@ void routing_manager_impl::add_routing_info(
}
}
}
- auto specific_endpoint_clients = get_specific_endpoint_clients(_service, _instance);
- for (const client_t& c : specific_endpoint_clients) {
- find_or_create_remote_client(_service, _instance, true, c);
- }
} else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) {
std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
for(const auto &client_id : requested_services_) {
@@ -2523,7 +2215,9 @@ void routing_manager_impl::add_routing_info(
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
- if (!stub_->contained_in_routing_info(
+ std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
+ if (ep && ep->is_established() &&
+ !stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
its_info->get_minor())) {
@@ -2534,12 +2228,8 @@ void routing_manager_impl::add_routing_info(
its_info->get_major(),
its_info->get_minor());
if (discovery_) {
- std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
- if (ep && ep->is_established()) {
- discovery_->on_endpoint_connected(
- _service, _instance,
- ep);
- }
+ discovery_->on_endpoint_connected(
+ _service, _instance, ep);
}
}
break;
@@ -2554,10 +2244,7 @@ void routing_manager_impl::add_routing_info(
if (!udp_inserted) {
std::shared_ptr<endpoint_definition> endpoint_def
= endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- remote_service_info_[_service][_instance][false] = endpoint_def;
- }
+ ep_mgr_impl_->add_remote_service_info(_service, _instance, endpoint_def);
// check if service was requested and increase requester count if necessary
{
bool connected(false);
@@ -2577,7 +2264,7 @@ void routing_manager_impl::add_routing_info(
|| major_minor_pair.second
== ANY_MINOR)) {
if(!connected) {
- find_or_create_remote_client(_service, _instance,
+ ep_mgr_impl_->find_or_create_remote_client(_service, _instance,
false, VSOMEIP_ROUTING_CLIENT);
connected = true;
}
@@ -2590,7 +2277,8 @@ void routing_manager_impl::add_routing_info(
}
}
}
- if (!is_reliable_known) {
+ if (!is_reliable_known && !tcp_inserted) {
+ // UDP only service can be marked as available instantly
on_availability(_service, _instance, true, _major, _minor);
stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor);
}
@@ -2649,26 +2337,81 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
if(!its_info)
return;
- on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor());
- stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor());
+ on_availability(_service, _instance, false,
+ its_info->get_major(), its_info->get_minor());
+ stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
+ its_info->get_major(), its_info->get_minor());
// Implicit unsubscribe
- clear_targets_and_pending_sub_from_eventgroups(_service, _instance);
- clear_identified_clients( _service, _instance);
- clear_identifying_clients( _service, _instance);
+ std::vector<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (auto &its_eventgroup : found_instance->second) {
+ // As the service is gone, all subscriptions to its events
+ // do no longer exist and the last received payload is no
+ // longer valid.
+ for (auto &its_event : its_eventgroup.second->get_events()) {
+ const auto its_subscribers = its_event->get_subscribers();
+ for (const auto its_subscriber : its_subscribers) {
+ if (its_subscriber != get_client()) {
+ its_event->remove_subscriber(
+ its_eventgroup.first, its_subscriber);
+ }
+ }
+ its_events.push_back(its_event);
+ remove_pending_subscription(_service, _instance,
+ its_eventgroup.first, its_event->get_event());
+ }
+
+ }
+ }
+ }
+ }
+ for (const auto& e : its_events) {
+ e->unset_payload(true);
+ }
- clear_remote_subscriber(_service, _instance);
+ {
+ std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
+ std::set<std::tuple<
+ service_t, instance_t, eventgroup_t, client_t> > its_invalid;
+
+ for (const auto its_state : remote_subscription_state_) {
+ if (std::get<0>(its_state.first) == _service
+ && std::get<1>(its_state.first) == _instance) {
+ its_invalid.insert(its_state.first);
+ }
+ }
+
+ for (const auto its_key : its_invalid)
+ remote_subscription_state_.erase(its_key);
+ }
+
+ {
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ auto found_service = remote_subscribers_.find(_service);
+ if (found_service != remote_subscribers_.end()) {
+ if (found_service->second.erase(_instance) > 0 &&
+ !found_service->second.size()) {
+ remote_subscribers_.erase(found_service);
+ }
+ }
+ }
if (_has_reliable) {
- clear_client_endpoints(_service, _instance, true);
- clear_remote_service_info(_service, _instance, true);
+ ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
+ ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
}
if (_has_unreliable) {
- clear_client_endpoints(_service, _instance, false);
- clear_remote_service_info(_service, _instance, false);
+ ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
+ ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
}
- clear_multicast_endpoints(_service, _instance);
+ ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
if (_has_reliable)
clear_service_info(_service, _instance, true);
@@ -2677,8 +2420,8 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
// For expired services using only unreliable endpoints that have never been created before
if (!_has_reliable && !_has_unreliable) {
- clear_remote_service_info(_service, _instance, true);
- clear_remote_service_info(_service, _instance, false);
+ ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
+ ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
clear_service_info(_service, _instance, true);
clear_service_info(_service, _instance, false);
}
@@ -2764,83 +2507,57 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr
}
}
-void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) {
- struct subscriptions_info {
- service_t service_id_;
- instance_t instance_id_;
- eventgroup_t eventgroup_id_;
- std::shared_ptr<endpoint_definition> invalid_endpoint_;
- client_t client_;
- std::set<std::shared_ptr<event>> events_;
- std::shared_ptr<eventgroupinfo> eventgroupinfo_;
- };
- std::vector<struct subscriptions_info> subscriptions_to_expire_;
+void
+routing_manager_impl::expire_subscriptions(
+ const boost::asio::ip::address &_address) {
{
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- for (auto &its_service : eventgroups_) {
- for (auto &its_instance : its_service.second) {
- for (auto &its_eventgroup : its_instance.second) {
- std::set<std::shared_ptr<endpoint_definition>> its_invalid_endpoints;
- for (auto &its_target : its_eventgroup.second->get_targets()) {
- if (its_target.endpoint_->get_address() == _address)
- its_invalid_endpoints.insert(its_target.endpoint_);
- }
+ for (const auto &its_service : eventgroups_) {
+ for (const auto &its_instance : its_service.second) {
+ for (const auto &its_eventgroup : its_instance.second) {
+ const auto its_info = its_eventgroup.second;
+ for (auto its_subscription
+ : its_info->get_remote_subscriptions()) {
+ // Note: get_remote_subscription delivers a copied
+ // set of subscriptions. Thus, its is possible to
+ // to remove them within the loop.
+ const auto its_reliable = its_subscription->get_reliable();
+ const auto its_unreliable = its_subscription->get_unreliable();
+ if ((its_reliable && its_reliable->get_address() == _address)
+ || (its_unreliable && its_unreliable->get_address() == _address)) {
+
+ // TODO: Check whether subscriptions to different hosts are valid.
+ // IF yes, we probably need to simply reset the corresponding
+ // endpoint instead of removing the subscription...
+
+ if (its_reliable) {
+ VSOMEIP_ERROR << __func__
+ << ": removing subscription to "
+ << std::hex << its_info->get_service() << "."
+ << std::hex << its_info->get_instance() << "."
+ << std::hex << its_info->get_eventgroup()
+ << " from target "
+ << its_reliable->get_address().to_string() << ":"
+ << std::dec << its_reliable->get_port();
+ }
+ if (its_unreliable) {
+ VSOMEIP_ERROR << __func__
+ << ": removing subscription to "
+ << std::hex << its_info->get_service() << "."
+ << std::hex << its_info->get_instance() << "."
+ << std::hex << its_info->get_eventgroup()
+ << " from target "
+ << its_unreliable->get_address().to_string() << ":"
+ << std::dec << its_unreliable->get_port();
+ }
- for (auto &its_endpoint : its_invalid_endpoints) {
- its_eventgroup.second->remove_target(its_endpoint);
- client_t its_client = find_client(its_service.first,
- its_instance.first, its_eventgroup.second,
- its_endpoint);
- clear_remote_subscriber(its_service.first,
- its_instance.first, its_client, its_endpoint);
-
- std::set<std::shared_ptr<event> > its_events;
- if (its_eventgroup.second->get_targets().size() == 0) {
- its_events = its_eventgroup.second->get_events();
+ on_remote_unsubscribe(its_subscription);
}
- subscriptions_to_expire_.push_back({its_service.first,
- its_instance.first,
- its_eventgroup.first,
- its_endpoint,
- its_client,
- its_events,
- its_eventgroup.second});
- }
- if(its_eventgroup.second->is_multicast() && its_invalid_endpoints.size() &&
- 0 == its_eventgroup.second->get_unreliable_target_count() ) {
- //clear multicast targets if no subscriber is left for multicast eventgroup
- its_eventgroup.second->clear_multicast_targets();
}
}
}
}
}
-
- for (const auto &s : subscriptions_to_expire_) {
- if (s.invalid_endpoint_) {
- for (const auto e: s.events_) {
- if (e->is_shadow()) {
- e->unset_payload();
- }
- }
- const client_t its_hosting_client = find_local_client(
- s.service_id_, s.instance_id_);
- if (its_hosting_client != VSOMEIP_ROUTING_CLIENT) {
- const pending_subscription_t its_pending_unsubscription(
- std::shared_ptr<sd_message_identifier_t>(),
- s.invalid_endpoint_, s.invalid_endpoint_,
- 0, s.client_);
- pending_subscription_id_t its_pending_unsubscription_id =
- s.eventgroupinfo_->add_pending_subscription(
- its_pending_unsubscription);
- if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) {
- send_unsubscription(its_hosting_client, s.client_,
- s.service_id_, s.instance_id_, s.eventgroup_id_,
- its_pending_unsubscription_id);
- }
- }
- }
- }
}
void routing_manager_impl::init_routing_info() {
@@ -2863,200 +2580,194 @@ void routing_manager_impl::init_routing_info() {
its_address, its_unreliable_port);
if(its_reliable_port != ILLEGAL_PORT) {
- find_or_create_remote_client(i.first, i.second, true, VSOMEIP_ROUTING_CLIENT);
+ ep_mgr_impl_->find_or_create_remote_client(
+ i.first, i.second, true, VSOMEIP_ROUTING_CLIENT);
}
if(its_unreliable_port != ILLEGAL_PORT) {
- find_or_create_remote_client(i.first, i.second, false, VSOMEIP_ROUTING_CLIENT);
+ ep_mgr_impl_->find_or_create_remote_client(
+ i.first, i.second, false, VSOMEIP_ROUTING_CLIENT);
}
}
}
}
-void routing_manager_impl::on_remote_subscription(
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- const std::shared_ptr<endpoint_definition> &_subscriber,
- const std::shared_ptr<endpoint_definition> &_target, ttl_t _ttl,
- const std::shared_ptr<sd_message_identifier_t> &_sd_message_id,
- const std::function<void(remote_subscription_state_e, client_t)>& _callback) {
- std::shared_ptr<eventgroupinfo> its_eventgroup
- = find_eventgroup(_service, _instance, _eventgroup);
- client_t its_subscribing_client(ILLEGAL_CLIENT);
- if (!its_eventgroup) {
- VSOMEIP_ERROR << "REMOTE SUBSCRIBE: attempt to subscribe to unknown eventgroup ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << _subscriber->get_address().to_string()
- << ":" << std::dec << _subscriber->get_port()
- << (_subscriber->is_reliable() ? " reliable" : " unreliable");
- _callback(remote_subscription_state_e::SUBSCRIPTION_ERROR, its_subscribing_client);
+void routing_manager_impl::on_remote_subscribe(
+ std::shared_ptr<remote_subscription> &_subscription,
+ const remote_subscription_callback_t &_callback) {
+ auto its_eventgroupinfo = _subscription->get_eventgroupinfo();
+ if (!its_eventgroupinfo) {
+ VSOMEIP_ERROR << __func__ << " eventgroupinfo is invalid";
return;
}
- // find out client id for selective subscriber
- if (!_subscriber->is_reliable()) {
- uint16_t unreliable_port = configuration_->get_unreliable_port(_service, _instance);
- _subscriber->set_remote_port(unreliable_port);
- if (!its_eventgroup->is_multicast()) {
- auto endpoint = find_server_endpoint(unreliable_port, false);
- if (endpoint) {
- its_subscribing_client = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint)->
- get_client(_subscriber);
- }
- }
- } else {
- uint16_t reliable_port = configuration_->get_reliable_port(_service, _instance);
- _subscriber->set_remote_port(reliable_port);
- auto endpoint = find_server_endpoint(reliable_port, true);
- if (endpoint) {
- its_subscribing_client = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint)->
- get_client(_subscriber);
- }
- }
- std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock());
- const std::chrono::steady_clock::time_point its_expiration =
- std::chrono::steady_clock::now() + std::chrono::seconds(_ttl);
- if (its_eventgroup->update_target(_subscriber, its_expiration)) {
- _callback(remote_subscription_state_e::SUBSCRIPTION_ACKED,its_subscribing_client);
- return;
- } else {
- const pending_subscription_id_t its_subscription_id =
- its_eventgroup->add_pending_subscription(
- pending_subscription_t(_sd_message_id, _subscriber,
- _target, _ttl, its_subscribing_client));
- if (its_subscription_id != DEFAULT_SUBSCRIPTION) {
- // only sent subscription to rm_proxy / hosting application if there's
- // no subscription for this eventgroup from the same remote subscriber
- // already pending
- const client_t its_offering_client = find_local_client(_service, _instance);
- send_subscription(its_offering_client, its_subscribing_client,
- _service, _instance, _eventgroup,
- its_eventgroup->get_major(), its_subscription_id);
- } else {
- VSOMEIP_WARNING << __func__ << " a remote subscription is already pending ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << _subscriber->get_address().to_string()
- << ":" << std::dec << _subscriber->get_port()
- << (_subscriber->is_reliable() ? " reliable" : " unreliable");
- }
- _callback(remote_subscription_state_e::SUBSCRIPTION_PENDING, its_subscribing_client);
+ const ttl_t its_ttl = _subscription->get_ttl();
+
+ const auto its_service = its_eventgroupinfo->get_service();
+ const auto its_instance = its_eventgroupinfo->get_instance();
+ const auto its_eventgroup = its_eventgroupinfo->get_eventgroup();
+ const auto its_major = its_eventgroupinfo->get_major();
+
+ // Get remote port(s)
+ auto its_reliable = _subscription->get_reliable();
+ if (its_reliable) {
+ uint16_t its_port
+ = configuration_->get_reliable_port(its_service, its_instance);
+ its_reliable->set_remote_port(its_port);
+ }
+
+ auto its_unreliable = _subscription->get_unreliable();
+ if (its_unreliable) {
+ uint16_t its_port
+ = configuration_->get_unreliable_port(its_service, its_instance);
+ its_unreliable->set_remote_port(its_port);
+ }
+
+ // Calculate expiration time
+ const std::chrono::steady_clock::time_point its_expiration
+ = std::chrono::steady_clock::now() + std::chrono::seconds(its_ttl);
+
+ // Try to update the subscription. This will fail, if the subscription does
+ // not exist or is still (partly) pending.
+ remote_subscription_id_t its_id;
+ std::set<client_t> its_added;
+ auto its_result = its_eventgroupinfo->update_remote_subscription(
+ _subscription, its_expiration, its_added, its_id, true);
+ if (its_result) {
+ if (!_subscription->is_pending()) { // resubscription without change
+ _callback(_subscription);
+ } else if (!its_added.empty()) { // new clients for a selective subscription
+ const client_t its_offering_client
+ = find_local_client(its_service, its_instance);
+ send_subscription(its_offering_client,
+ its_service, its_instance, its_eventgroup, its_major,
+ its_added, _subscription->get_id());
+ } else { // identical subscription is not yet processed
+ std::stringstream its_warning;
+ its_warning << __func__ << " a remote subscription is already pending ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"
+ << " from ";
+ if (its_reliable && its_unreliable)
+ its_warning << "[";
+ if (its_reliable)
+ its_warning << its_reliable->get_address().to_string()
+ << ":" << std::dec << its_reliable->get_port();
+ if (its_reliable && its_unreliable)
+ its_warning << ", ";
+ if (its_unreliable)
+ its_warning << its_unreliable->get_address().to_string()
+ << ":" << std::dec << its_unreliable->get_port();
+ if (its_reliable && its_unreliable)
+ its_warning << "]";
+ VSOMEIP_WARNING << its_warning.str();
+ }
+ } else { // new subscription
+ auto its_id
+ = its_eventgroupinfo->add_remote_subscription(_subscription);
+
+ const client_t its_offering_client
+ = find_local_client(its_service, its_instance);
+ send_subscription(its_offering_client,
+ its_service, its_instance, its_eventgroup, its_major,
+ _subscription->get_clients(), its_id);
+ }
+}
+
+void routing_manager_impl::on_remote_unsubscribe(
+ std::shared_ptr<remote_subscription> &_subscription) {
+ std::shared_ptr<eventgroupinfo> its_info
+ = _subscription->get_eventgroupinfo();
+ if (!its_info) {
+ VSOMEIP_ERROR << __func__
+ << ": Received Unsubscribe for unregistered eventgroup.";
return;
}
- _callback(remote_subscription_state_e::SUBSCRIPTION_ERROR, its_subscribing_client);
-}
-void routing_manager_impl::on_unsubscribe(service_t _service,
- instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<endpoint_definition> _target) {
- std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service,
- _instance, _eventgroup);
- if (its_eventgroup) {
- client_t its_client = find_client(_service, _instance, its_eventgroup, _target);
- const pending_subscription_t its_pending_unsubscription(
- std::shared_ptr<sd_message_identifier_t>(), _target, _target,
- 0, its_client);
- pending_subscription_id_t its_pending_unsubscription_id =
- its_eventgroup->add_pending_subscription(
- its_pending_unsubscription);
-
- its_eventgroup->remove_target(_target);
- clear_remote_subscriber(_service, _instance, its_client, _target);
-
- if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) {
- // there are no pending (un)subscriptions
- const client_t its_offering_client = find_local_client(_service, _instance);
- send_unsubscription(its_offering_client, its_client, _service,
- _instance, _eventgroup, its_pending_unsubscription_id);
- }
+ const auto its_service = its_info->get_service();
+ const auto its_instance = its_info->get_instance();
+ const auto its_eventgroup = its_info->get_eventgroup();
+ const auto its_major = its_info->get_major();
- if (its_eventgroup->get_targets().size() == 0) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup->get_events();
- for (auto e : its_events) {
- if (e->is_shadow()) {
- e->unset_payload();
- }
- }
- }
- VSOMEIP_INFO << "REMOTE UNSUBSCRIBE("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << _target->get_address().to_string()
- << ":" << std::dec <<_target->get_port()
- << (_target->is_reliable() ? " reliable" : " unreliable");
+ auto its_subscriber = _subscription->get_subscriber();
- } else {
- VSOMEIP_ERROR << "REMOTE UNSUBSCRIBE: attempt to subscribe to unknown eventgroup ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << _target->get_address().to_string()
- << ":" << std::dec <<_target->get_port()
- << (_target->is_reliable() ? " reliable" : " unreliable");
+ // Get remote port(s)
+ auto its_reliable = _subscription->get_reliable();
+ if (its_reliable) {
+ uint16_t its_port
+ = configuration_->get_reliable_port(its_service, its_instance);
+ its_reliable->set_remote_port(its_port);
}
-}
-
-void routing_manager_impl::on_subscribe_ack(service_t _service,
- instance_t _instance, const boost::asio::ip::address &_address,
- uint16_t _port) {
- bool multicast_known(false);
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- const auto found_service = multicast_info.find(_service);
- if (found_service != multicast_info.end()) {
- const auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- const auto& endpoint_def = found_instance->second;
- if (endpoint_def->get_address() == _address &&
- endpoint_def->get_port() == _port) {
- // Multicast info and endpoint already created before
- // This can happen when more than one client subscribe on the same instance!
- multicast_known = true;
- }
- }
- }
- if (!multicast_known) {
- // Save multicast info to be able to delete the endpoint
- // as soon as the instance stops offering its service
- std::shared_ptr<endpoint_definition> endpoint_def =
- endpoint_definition::get(_address, _port, false, _service, _instance);
- multicast_info[_service][_instance] = endpoint_def;
- }
+ auto its_unreliable = _subscription->get_unreliable();
+ if (its_unreliable) {
+ uint16_t its_port
+ = configuration_->get_unreliable_port(its_service, its_instance);
+ its_unreliable->set_remote_port(its_port);
}
- const bool is_someip = configuration_->is_someip(_service, _instance);
- // Create multicast endpoint & join multicase group
- std::shared_ptr<endpoint> its_endpoint
- = find_or_create_server_endpoint(_port, false, is_someip);
- if (its_endpoint) {
- if (!multicast_known) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- service_instances_[_service][its_endpoint.get()] = _instance;
- }
- its_endpoint->join(_address.to_string());
- } else {
- VSOMEIP_ERROR<<"Could not find/create multicast endpoint!";
+ remote_subscription_id_t its_id(0);
+ std::set<client_t> its_removed;
+ auto its_result = its_info->update_remote_subscription(
+ _subscription, std::chrono::steady_clock::now(),
+ its_removed, its_id, false);
+
+ if (its_result) {
+ const client_t its_offering_client
+ = find_local_client(its_service, its_instance);
+ send_unsubscription(its_offering_client,
+ its_service, its_instance, its_eventgroup, its_major,
+ its_removed, its_id);
}
}
+void routing_manager_impl::on_subscribe_ack_with_multicast(
+ service_t _service, instance_t _instance,
+ const boost::asio::ip::address &_address, uint16_t _port) {
+ ep_mgr_impl_->find_or_create_multicast_endpoint(_service,
+ _instance, _address, _port);
+}
+
void routing_manager_impl::on_subscribe_ack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event, pending_subscription_id_t _subscription_id) {
- client_t its_client = is_specific_endpoint_client(_client, _service, _instance);
- bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT;
+ event_t _event, remote_subscription_id_t _id) {
+ std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
if (its_eventgroup) {
- std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock());
- if (_subscription_id == DEFAULT_SUBSCRIPTION) {
- // ACK coming in via SD from remote or as answer to a subscription
- // of the application hosting the rm_impl to a local service
- auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client);
- std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
- auto its_state = remote_subscription_state_.find(its_tuple);
+ auto its_subscription = its_eventgroup->get_remote_subscription(_id);
+ if (its_subscription) {
+ its_subscription->set_client_state(_client,
+ remote_subscription_state_e::SUBSCRIPTION_ACKED);
+
+ auto its_parent = its_subscription->get_parent();
+ if (its_parent) {
+ its_parent->set_client_state(_client,
+ remote_subscription_state_e::SUBSCRIPTION_ACKED);
+ if (!its_subscription->is_pending()) {
+ its_eventgroup->remove_remote_subscription(_id);
+ }
+ }
+
+ if (discovery_) {
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ remote_subscribers_[_service][_instance][VSOMEIP_ROUTING_CLIENT].insert(
+ its_subscription->get_subscriber());
+ discovery_->update_remote_subscription(its_subscription);
+
+ VSOMEIP_INFO << "REMOTE SUBSCRIBE("
+ << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " from " << its_subscription->get_subscriber()->get_address()
+ << ":" << std::dec << its_subscription->get_subscriber()->get_port()
+ << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
+ << " was accepted";
+
+ return;
+ }
+ } else {
+ const auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
+ const auto its_state = remote_subscription_state_.find(its_tuple);
if (its_state != remote_subscription_state_.end()) {
if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
// Already notified!
@@ -3064,385 +2775,79 @@ void routing_manager_impl::on_subscribe_ack(client_t _client,
}
}
remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED;
- } else { // ACK sent back from local client as answer to a remote subscription
- const client_t its_offering_client = find_local_client(_service, _instance);
- if (its_offering_client == VSOMEIP_ROUTING_CLIENT) {
- // service was stopped while subscription was pending
- // send subscribe_nack back instead
- eventgroup_lock.unlock();
- on_subscribe_nack(_client, _service, _instance, _eventgroup,
- _event, _subscription_id);
- return;
- }
- if (discovery_) {
- std::vector<pending_subscription_t> its_pending_subscriptions =
- its_eventgroup->remove_pending_subscription(_subscription_id);
- for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) {
- if (its_sd_message_id.ttl_ > 0) {
- if (its_sd_message_id.sd_message_identifier_
- && its_sd_message_id.subscriber_
- && its_sd_message_id.target_) {
- {
- std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
- remote_subscribers_[_service][_instance][_client].insert(its_sd_message_id.subscriber_);
- }
- const std::chrono::steady_clock::time_point its_expiration =
- std::chrono::steady_clock::now()
- + std::chrono::seconds(
- its_sd_message_id.ttl_);
- // IP address of target is a multicast address if the event is in a multicast eventgroup
- if (its_eventgroup->is_multicast()
- && !its_sd_message_id.subscriber_->is_reliable()) {
- // Event is in multicast eventgroup and subscribe for UDP
- its_eventgroup->add_target(
- { its_sd_message_id.target_, its_expiration },
- { its_sd_message_id.subscriber_, its_expiration });
- } else {
- // subscribe for TCP or UDP
- its_eventgroup->add_target(
- { its_sd_message_id.subscriber_, its_expiration });
- }
- discovery_->remote_subscription_acknowledge(_service,
- _instance, _eventgroup, _client, true,
- its_sd_message_id.sd_message_identifier_);
+ }
- VSOMEIP_INFO << "REMOTE SUBSCRIBE("
- << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << its_sd_message_id.subscriber_->get_address().to_string()
- << ":" << std::dec << its_sd_message_id.subscriber_->get_port()
- << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable")
- << " was accepted";
- }
- } else { // unsubscription was queued while subscription was pending -> send it to client
- send_unsubscription(its_offering_client,
- its_sd_message_id.subscribing_client_,
- _service, _instance, _eventgroup,
- its_sd_message_id.pending_subscription_id_);
- }
+ std::set<client_t> subscribed_clients;
+ if (_client == VSOMEIP_ROUTING_CLIENT) {
+ for (const auto &its_event : its_eventgroup->get_events()) {
+ if (_event == ANY_EVENT || _event == its_event->get_event()) {
+ const auto &its_subscribers = its_event->get_subscribers();
+ subscribed_clients.insert(its_subscribers.begin(), its_subscribers.end());
}
}
- return;
+ } else {
+ subscribed_clients.insert(_client);
}
- if (specific_endpoint_client) {
- if (_client == get_client()) {
- host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/);
+ for (const auto &its_subscriber : subscribed_clients) {
+ if (its_subscriber == get_client()) {
if (_event == ANY_EVENT) {
- for (const auto &its_event : its_eventgroup->get_events())
- host_->on_subscription_status(_service, _instance,
- _eventgroup, its_event->get_event(), 0x0 /*OK*/);
- } else {
- host_->on_subscription_status(_service, _instance,
- _eventgroup, _event, 0x0 /*OK*/);
- }
- } else {
- stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup,
- _event);
- }
- } else {
- std::set<client_t> subscribed_clients;
- for (auto its_event : its_eventgroup->get_events()) {
- for (auto its_client : its_event->get_subscribers()) {
- subscribed_clients.insert(its_client);
- }
- }
- for (auto its_subscriber : subscribed_clients) {
- if (its_subscriber == get_client()) {
- host_->on_subscription_error(_service, _instance,
- _eventgroup, 0x0 /*OK*/);
- for (auto its_event : its_eventgroup->get_events()) {
+ for (const auto &its_event : its_eventgroup->get_events()) {
host_->on_subscription_status(_service, _instance,
_eventgroup, its_event->get_event(),
0x0 /*OK*/);
}
} else {
- stub_->send_subscribe_ack(its_subscriber, _service,
- _instance, _eventgroup, _event);
- }
- }
- }
- }
-}
-
-void routing_manager_impl::on_subscribe_nack(client_t _client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event, pending_subscription_id_t _subscription_id) {
- client_t its_client = is_specific_endpoint_client(_client, _service, _instance);
- bool specific_endpoint_client = its_client != VSOMEIP_ROUTING_CLIENT;
- auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- std::unique_lock<std::mutex> eventgroup_lock(its_eventgroup->get_subscription_lock());
- if (_subscription_id == DEFAULT_SUBSCRIPTION) {
- // NACK coming in via SD from remote or as answer to a subscription
- // of the application hosting the rm_impl to a local service
- auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, its_client);
- std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
- auto its_state = remote_subscription_state_.find(its_tuple);
- if (its_state != remote_subscription_state_.end()) {
- if (its_state->second == subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED) {
- // Already notified!
- return;
- }
- }
- remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED;
- } else { // NACK sent back from local client as answer to a remote subscription
- if (discovery_) {
- std::vector<pending_subscription_t> its_pending_subscriptions =
- its_eventgroup->remove_pending_subscription(_subscription_id);
- for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) {
- if (its_sd_message_id.ttl_ > 0) {
- if (its_sd_message_id.sd_message_identifier_ && its_sd_message_id.subscriber_) {
- discovery_->remote_subscription_acknowledge(_service, _instance,
- _eventgroup, _client, false, its_sd_message_id.sd_message_identifier_);
- VSOMEIP_INFO << "REMOTE SUBSCRIBE("
- << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
- << " from " << its_sd_message_id.subscriber_->get_address().to_string()
- << ":" << std::dec <<its_sd_message_id.subscriber_->get_port()
- << (its_sd_message_id.subscriber_->is_reliable() ? " reliable" : " unreliable")
- << " was not accepted";
- }
- } else { // unsubscription was queued while subscription was pending -> send it to client
- const client_t its_offering_client = find_local_client(_service, _instance);
- send_unsubscription(its_offering_client,
- its_sd_message_id.subscribing_client_, _service,
- _instance, _eventgroup,
- its_sd_message_id.pending_subscription_id_);
- }
+ host_->on_subscription_status(_service, _instance,
+ _eventgroup, _event, 0x0 /*OK*/);
}
- }
- return;
- }
- if (specific_endpoint_client) {
- if (_client == get_client()) {
- host_->on_subscription_error(_service, _instance, _eventgroup, 0x7 /*Rejected*/);
- host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x7 /*Rejected*/);
} else {
- stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup,
- _event);
- }
- } else {
- std::set<client_t> subscribed_clients;
- for (auto its_event : its_eventgroup->get_events()) {
- for (auto its_client : its_event->get_subscribers()) {
- subscribed_clients.insert(its_client);
- }
- }
- for (auto its_subscriber : subscribed_clients) {
- if (its_subscriber == get_client()) {
- host_->on_subscription_error(_service, _instance,
- _eventgroup, 0x7 /*Rejected*/);
- for (auto its_event : its_eventgroup->get_events()) {
- host_->on_subscription_status(_service, _instance,
- _eventgroup, its_event->get_event(),
- 0x7 /*Rejected*/);
- }
- } else {
- stub_->send_subscribe_nack(its_subscriber, _service,
- _instance, _eventgroup, _event);
- }
+ stub_->send_subscribe_ack(its_subscriber, _service,
+ _instance, _eventgroup, _event);
}
}
- }
+ }
}
-bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service,
- instance_t _instance, const byte_t *_data, length_t _size, endpoint *_receiver) {
- client_t its_client(0x0);
-
- // Try to deliver specific endpoint message (for selective subscribers)
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto found_servic = remote_services_.find(_service);
- if (found_servic != remote_services_.end()) {
- auto found_instance = found_servic->second.find(_instance);
- if (found_instance != found_servic->second.end()) {
- for (auto client_entry : found_instance->second) {
- if (!client_entry.first) {
- continue;
- }
- auto found_reliability = client_entry.second.find(_receiver->is_reliable());
- if (found_reliability != client_entry.second.end()) {
- auto found_enpoint = found_reliability->second;
- if (found_enpoint.get() == _receiver) {
- its_client = client_entry.first;
- break;
- }
- }
- }
- }
- }
- }
- if (its_client) {
- if (its_client != get_client()) {
- auto local_endpoint = find_local(its_client);
- if (local_endpoint) {
- send_local(local_endpoint, its_client, _data, _size, _instance, true,
- _receiver->is_reliable(), VSOMEIP_SEND);
- }
- } else {
- deliver_message(_data, _size, _instance, _receiver->is_reliable(), VSOMEIP_ROUTING_CLIENT, true, true);
- }
- return true;
- }
-
- return false;
+std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
+ service_t _service, instance_t _instance, bool _reliable,
+ client_t _client) {
+ return ep_mgr_impl_->find_or_create_remote_client(_service,
+ _instance, _reliable, _client);
}
-void routing_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance,
- bool _reliable) {
- auto its_specific_endpoint_clients = get_specific_endpoint_clients(_service, _instance);
- std::shared_ptr<endpoint> endpoint_to_delete;
- bool other_services_reachable_through_endpoint(false);
- std::vector<std::shared_ptr<endpoint>> its_specific_endpoints;
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- // Clear client endpoints for remote services (generic and specific ones)
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- auto endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT][_reliable];
- if (endpoint) {
- service_instances_[_service].erase(endpoint.get());
- endpoint_to_delete = endpoint;
- }
- remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].erase(_reliable);
- auto found_endpoint = remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].find(
- !_reliable);
- if (found_endpoint == remote_services_[_service][_instance][VSOMEIP_ROUTING_CLIENT].end()) {
- remote_services_[_service][_instance].erase(VSOMEIP_ROUTING_CLIENT);
- }
- // erase specific client endpoints
- for (const client_t &client : its_specific_endpoint_clients) {
- auto endpoint = remote_services_[_service][_instance][client][_reliable];
- if (endpoint) {
- service_instances_[_service].erase(endpoint.get());
- its_specific_endpoints.push_back(endpoint);
- }
- remote_services_[_service][_instance][client].erase(_reliable);
- auto found_endpoint = remote_services_[_service][_instance][client].find(!_reliable);
- if (found_endpoint == remote_services_[_service][_instance][client].end()) {
- remote_services_[_service][_instance].erase(client);
- }
- }
- }
- }
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- if (!remote_services_[_service][_instance].size()) {
- remote_services_[_service].erase(_instance);
- if (0 >= remote_services_[_service].size()) {
- remote_services_.erase(_service);
- }
- }
- }
- }
-
- if (!service_instances_[_service].size()) {
- service_instances_.erase(_service);
- }
-
- // Only stop and delete the endpoint if none of the services
- // reachable through it is online anymore.
- if (endpoint_to_delete) {
- for (const auto& service : remote_services_) {
- for (const auto& instance : service.second) {
- const auto& client = instance.second.find(VSOMEIP_ROUTING_CLIENT);
- if (client != instance.second.end()) {
- for (const auto& reliable : client->second) {
- if (reliable.second == endpoint_to_delete) {
- other_services_reachable_through_endpoint = true;
- break;
- }
- }
- }
- if (other_services_reachable_through_endpoint) { break; }
- }
- if (other_services_reachable_through_endpoint) { break; }
- }
+void routing_manager_impl::on_subscribe_nack(client_t _client,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event, remote_subscription_id_t _id) {
+ (void)_event; // TODO: Remove completely?
- if (!other_services_reachable_through_endpoint) {
- std::uint16_t its_port(0);
- boost::asio::ip::address its_address;
- if (_reliable) {
- std::shared_ptr<tcp_client_endpoint_impl> ep =
- std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete);
- if (ep) {
- its_port = ep->get_remote_port();
- ep->get_remote_address(its_address);
- }
- } else {
- std::shared_ptr<udp_client_endpoint_impl> ep =
- std::dynamic_pointer_cast<udp_client_endpoint_impl>(endpoint_to_delete);
- if (ep) {
- its_port = ep->get_remote_port();
- ep->get_remote_address(its_address);
- }
- }
- const auto found_ip = client_endpoints_by_ip_.find(its_address);
- if (found_ip != client_endpoints_by_ip_.end()) {
- const auto found_port = found_ip->second.find(its_port);
- if (found_port != found_ip->second.end()) {
- const auto found_reliable = found_port->second.find(_reliable);
- if (found_reliable != found_port->second.end()) {
- if (found_reliable->second == endpoint_to_delete) {
- found_port->second.erase(_reliable);
- // delete if necessary
- if (!found_port->second.size()) {
- found_ip->second.erase(found_port);
- if (!found_ip->second.size()) {
- client_endpoints_by_ip_.erase(found_ip);
- }
- }
- }
- }
- }
+ auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eventgroup) {
+ auto its_subscription = its_eventgroup->get_remote_subscription(_id);
+ if (its_subscription) {
+ its_subscription->set_client_state(_client,
+ remote_subscription_state_e::SUBSCRIPTION_NACKED);
+
+ auto its_parent = its_subscription->get_parent();
+ if (its_parent) {
+ its_parent->set_client_state(_client,
+ remote_subscription_state_e::SUBSCRIPTION_NACKED);
+ if (!its_subscription->is_pending()) {
+ its_eventgroup->remove_remote_subscription(_id);
}
}
- }
- }
- if (!other_services_reachable_through_endpoint && endpoint_to_delete) {
- endpoint_to_delete->stop();
- }
- for (const auto &specific_endpoint : its_specific_endpoints) {
- specific_endpoint->stop();
- }
-}
-void routing_manager_impl::clear_multicast_endpoints(service_t _service, instance_t _instance) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- // Clear multicast info and endpoint and multicast instance (remote service)
- if (multicast_info.find(_service) != multicast_info.end()) {
- if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
- std::string address = multicast_info[_service][_instance]->get_address().to_string();
- uint16_t port = multicast_info[_service][_instance]->get_port();
- std::shared_ptr<endpoint> multicast_endpoint;
- auto found_port = server_endpoints_.find(port);
- if (found_port != server_endpoints_.end()) {
- auto found_unreliable = found_port->second.find(false);
- if (found_unreliable != found_port->second.end()) {
- multicast_endpoint = found_unreliable->second;
- multicast_endpoint->leave(address);
- multicast_endpoint->stop();
- server_endpoints_[port].erase(false);
- }
- if (found_port->second.find(true) == found_port->second.end()) {
- server_endpoints_.erase(port);
- }
- }
- multicast_info[_service].erase(_instance);
- if (0 >= multicast_info[_service].size()) {
- multicast_info.erase(_service);
- }
- // Clear service_instances_ for multicase endpoint
- if (1 >= service_instances_[_service].size()) {
- service_instances_.erase(_service);
- } else if (multicast_endpoint) {
- service_instances_[_service].erase(multicast_endpoint.get());
+ if (discovery_) {
+ discovery_->update_remote_subscription(its_subscription);
+ VSOMEIP_INFO << "REMOTE SUBSCRIBE("
+ << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " from " << its_subscription->get_subscriber()->get_address()
+ << ":" << std::dec << its_subscription->get_subscriber()->get_port()
+ << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
+ << " was not accepted";
}
}
}
@@ -3495,7 +2900,7 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s
void routing_manager_impl::send_error(return_code_e _return_code,
const byte_t *_data, length_t _size,
instance_t _instance, bool _reliable,
- endpoint *_receiver,
+ endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
std::uint16_t _remote_port) {
@@ -3535,150 +2940,39 @@ void routing_manager_impl::send_error(return_code_e _return_code,
error_message->set_service(its_service);
error_message->set_session(its_session);
{
- std::lock_guard<std::mutex> its_lock(serialize_mutex_);
- if (serializer_->serialize(error_message.get())) {
+ std::shared_ptr<serializer> its_serializer(get_serializer());
+ if (its_serializer->serialize(error_message.get())) {
if (_receiver) {
auto its_endpoint_def = std::make_shared<endpoint_definition>(
_remote_address, _remote_port,
_receiver->is_reliable());
its_endpoint_def->set_remote_port(_receiver->get_local_port());
- send_to(its_endpoint_def, serializer_->get_data(),
- serializer_->get_size(), _instance, true);
- }
- serializer_->reset();
- } else {
- VSOMEIP_ERROR<< "Failed to serialize error message.";
- }
- }
-}
-
-void routing_manager_impl::on_identify_response(client_t _client, service_t _service,
- instance_t _instance, bool _reliable) {
- {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identifying_clients_.find(_service);
- if (its_service != identifying_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_reliable = its_instance->second.find(_reliable);
- if (its_reliable != its_instance->second.end()) {
- its_reliable->second.erase(_client);
- }
- }
- }
- identified_clients_[_service][_instance][_reliable].insert(_client);
- }
- discovery_->send_subscriptions(_service, _instance, _client, _reliable);
-}
-
-void routing_manager_impl::identify_for_subscribe(client_t _client,
- service_t _service, instance_t _instance, major_version_t _major,
- subscription_type_e _subscription_type) {
- (void)_subscription_type;
- if (!has_identified(_client, _service, _instance, false)
- && !is_identifying(_client, _service, _instance, false)) {
- send_identify_message(_client, _service, _instance, _major, false);
- }
- if (!has_identified(_client, _service, _instance, true)
- && !is_identifying(_client, _service, _instance, true)) {
- send_identify_message(_client, _service, _instance, _major, true);
- }
-}
-
-bool routing_manager_impl::send_identify_message(client_t _client,
- service_t _service,
- instance_t _instance,
- major_version_t _major,
- bool _reliable) {
- auto its_endpoint = find_or_create_remote_client(_service, _instance,
- _reliable, _client);
- if (!its_endpoint) {
- VSOMEIP_WARNING << "routing_manager_impl::send_identify_message: "
- << "No " << (_reliable ? "reliable" : "unreliable")
- << " route for identify message to service/instance "
- << std::hex << _service << "/" << _instance << " for client "
- << _client;
- return false;
- }
- {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- identifying_clients_[_service][_instance][_reliable].insert(_client);
- }
-
- if (_client == get_client()) {
- send_identify_request(_service, _instance, _major, _reliable);
- } else {
- stub_->send_identify_request_command(find_local(_client),
- _service, _instance, _major, _reliable);
- }
-
- return true;
-}
-
-
-bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) {
- bool supports_selective(false);
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- auto its_service = remote_service_info_.find(_service);
- if (its_service != remote_service_info_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- for (auto its_reliable : its_instance->second) {
- supports_selective |= configuration_->
- supports_selective_broadcasts(
- its_reliable.second->get_address());
- }
- }
- }
- return supports_selective;
-}
-
-bool routing_manager_impl::is_identifying(client_t _client, service_t _service,
- instance_t _instance, bool _reliable) {
- if (!supports_selective(_service, _instance)) {
- // For legacy selective services clients can't be identified!
- return false;
- }
- bool is_identifieing(false);
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identifying_clients_.find(_service);
- if (its_service != identifying_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_reliable = its_instance->second.find(_reliable);
- if (its_reliable != its_instance->second.end()) {
- auto its_client = its_reliable->second.find(_client);
- if (its_client != its_reliable->second.end()) {
- is_identifieing = true;
- }
- }
- }
- }
- return is_identifieing;
-}
+ std::shared_ptr<endpoint> its_endpoint =
+ ep_mgr_impl_->find_server_endpoint(
+ its_endpoint_def->get_remote_port(),
+ its_endpoint_def->is_reliable());
+ if (its_endpoint) {
+ #ifdef USE_DLT
+ const uint16_t its_data_size
+ = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
-bool routing_manager_impl::has_identified(client_t _client, service_t _service,
- instance_t _instance, bool _reliable) {
- if (!supports_selective(_service, _instance)) {
- // For legacy selective services clients can't be identified!
- return true;
- }
- bool has_identified(false);
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identified_clients_.find(_service);
- if (its_service != identified_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_reliable = its_instance->second.find(_reliable);
- if (its_reliable != its_instance->second.end()) {
- auto its_client = its_reliable->second.find(_client);
- if (its_client != its_reliable->second.end()) {
- has_identified = true;
+ trace::header its_header;
+ if (its_header.prepare(its_endpoint, true, _instance))
+ tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
+ _data, its_data_size);
+ #else
+ (void) _instance;
+ #endif
+ its_endpoint->send_error(its_endpoint_def,
+ its_serializer->get_data(), its_serializer->get_size());
}
}
+ its_serializer->reset();
+ put_serializer(its_serializer);
+ } else {
+ VSOMEIP_ERROR<< "Failed to serialize error message.";
}
}
- return has_identified;
}
void routing_manager_impl::clear_remote_subscriber(
@@ -3703,19 +2997,12 @@ void routing_manager_impl::clear_remote_subscriber(
std::chrono::steady_clock::time_point
routing_manager_impl::expire_subscriptions(bool _force) {
- struct subscriptions_info {
- service_t service_id_;
- instance_t instance_id_;
- eventgroup_t eventgroup_id_;
- std::shared_ptr<endpoint_definition> invalid_endpoint_;
- client_t client_;
- std::set<std::shared_ptr<event>> events_;
- std::shared_ptr<eventgroupinfo> eventgroupinfo_;
- };
- std::vector<struct subscriptions_info> subscriptions_to_expire_;
+ std::map<std::shared_ptr<remote_subscription>,
+ std::set<client_t> > its_expired_subscriptions;
+
std::chrono::steady_clock::time_point now
= std::chrono::steady_clock::now();
- std::chrono::steady_clock::time_point next_expiration
+ std::chrono::steady_clock::time_point its_next_expiration
= std::chrono::steady_clock::now() + std::chrono::hours(24);
{
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
@@ -3723,86 +3010,70 @@ routing_manager_impl::expire_subscriptions(bool _force) {
for (auto &its_service : eventgroups_) {
for (auto &its_instance : its_service.second) {
for (auto &its_eventgroup : its_instance.second) {
- std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints;
- for (auto &its_target : its_eventgroup.second->get_targets()) {
- if (_force) {
- its_expired_endpoints.insert(its_target.endpoint_);
- } else {
- if (its_target.expiration_ < now) {
- its_expired_endpoints.insert(its_target.endpoint_);
- } else if (its_target.expiration_ < next_expiration) {
- next_expiration = its_target.expiration_;
+ auto its_subscriptions
+ = its_eventgroup.second->get_remote_subscriptions();
+ for (auto &s : its_subscriptions) {
+ for (auto its_client : s->get_clients()) {
+ if (_force) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else {
+ auto its_expiration = s->get_expiration(its_client);
+ if (its_expiration != std::chrono::steady_clock::time_point()) {
+ if (its_expiration < now) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else if (its_expiration < its_next_expiration) {
+ its_next_expiration = its_expiration;
+ }
+ }
}
}
}
-
- for (auto its_endpoint : its_expired_endpoints) {
- its_eventgroup.second->remove_target(its_endpoint);
-
- client_t its_client
- = find_client(its_service.first, its_instance.first,
- its_eventgroup.second, its_endpoint);
- clear_remote_subscriber(its_service.first, its_instance.first,
- its_client, its_endpoint);
-
- std::set<std::shared_ptr<event> > its_events;
- if (its_eventgroup.second->get_targets().size() == 0) {
- its_events = its_eventgroup.second->get_events();
- }
- subscriptions_to_expire_.push_back({its_service.first,
- its_instance.first,
- its_eventgroup.first,
- its_endpoint,
- its_client,
- its_events,
- its_eventgroup.second});
- }
- if(its_eventgroup.second->is_multicast() && its_expired_endpoints.size() &&
- 0 == its_eventgroup.second->get_unreliable_target_count() ) {
- //clear multicast targets if no unreliable subscriber is left for multicast eventgroup
- its_eventgroup.second->clear_multicast_targets();
- }
}
}
}
}
- for (const auto &s : subscriptions_to_expire_) {
- if (s.invalid_endpoint_) {
- for (const auto e: s.events_) {
- if (e->is_shadow()) {
- e->unset_payload();
- }
+ for (auto &s : its_expired_subscriptions) {
+ auto its_info = s.first->get_eventgroupinfo();
+ if (its_info) {
+ auto its_service = its_info->get_service();
+ auto its_instance = its_info->get_instance();
+ auto its_eventgroup = its_info->get_eventgroup();
+ auto its_major = its_info->get_major();
+
+ remote_subscription_id_t its_id;
+ auto its_result = its_info->update_remote_subscription(
+ s.first, std::chrono::steady_clock::now(),
+ s.second, its_id, false);
+ if (its_result) {
+ const client_t its_offering_client
+ = find_local_client(its_service, its_instance);
+ send_unsubscription(its_offering_client,
+ its_service, its_instance, its_eventgroup, its_major,
+ s.second, s.first->get_id());
}
- const client_t its_hosting_client = find_local_client(s.service_id_,
- s.instance_id_);
-
- if (its_hosting_client != VSOMEIP_ROUTING_CLIENT) {
- const pending_subscription_t its_pending_unsubscription(
- std::shared_ptr<sd_message_identifier_t>(),
- s.invalid_endpoint_, s.invalid_endpoint_,
- 0, s.client_);
- pending_subscription_id_t its_pending_unsubscription_id =
- s.eventgroupinfo_->add_pending_subscription(
- its_pending_unsubscription);
- if (its_pending_unsubscription_id != DEFAULT_SUBSCRIPTION) {
- send_unsubscription(its_hosting_client, s.client_, s.service_id_,
- s.instance_id_, s.eventgroup_id_,
- its_pending_unsubscription_id);
- }
+
+ if (s.first->get_unreliable()) {
+ VSOMEIP_INFO << "Expired subscription ["
+ << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
+ << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
+ << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from "
+ << s.first->get_unreliable()->get_address() << ":"
+ << std::dec << s.first->get_unreliable()->get_port();
}
- VSOMEIP_INFO << "Expired subscription ["
- << std::hex << std::setfill('0') << std::setw(4) << s.service_id_ << "."
- << std::hex << std::setfill('0') << std::setw(4) << s.instance_id_ << "."
- << std::hex << std::setfill('0') << std::setw(4) << s.eventgroup_id_ << "] from "
- << s.invalid_endpoint_->get_address() << ":"
- << std::dec << s.invalid_endpoint_->get_port()
- << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ") "
- << _force;
+ if (s.first->get_reliable()) {
+ VSOMEIP_INFO << "Expired subscription ["
+ << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
+ << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
+ << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from "
+ << s.first->get_reliable()->get_address() << ":"
+ << std::dec << s.first->get_reliable()->get_port();
+ }
}
}
- return next_expiration;
+
+ return its_next_expiration;
}
void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const & _error) {
@@ -3811,6 +3082,9 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
#ifndef VSOMEIP_VERSION
#define VSOMEIP_VERSION "unknown version"
#endif
+ static int counter(0);
+ static uint32_t its_interval = configuration_->get_log_version_interval();
+
bool is_diag_mode(false);
if (discovery_) {
@@ -3825,62 +3099,24 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
std::chrono::steady_clock::now() - last_resume_).count() << "s";
}
}
+
VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | ("
<< ((is_diag_mode == true) ? "diagnosis)" : "default)")
<< its_last_resume.str();
- {
- std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
- version_log_timer_.expires_from_now(
- std::chrono::seconds(configuration_->get_log_version_interval()));
- version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
- this, std::placeholders::_1));
- }
- }
-}
-
-#ifndef WITHOUT_SYSTEMD
-void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error) {
- if (!_error) {
- static bool is_ready(false);
- static bool has_interval(false);
- static uint64_t its_interval(0);
-
- if (is_ready) {
- sd_notify(0, "WATCHDOG=1");
- VSOMEIP_INFO << "Triggered systemd watchdog";
- } else {
- is_ready = true;
- sd_notify(0, "READY=1");
- VSOMEIP_INFO << "Sent READY to systemd watchdog";
- if (0 < sd_watchdog_enabled(0, &its_interval)) {
- has_interval = true;
- VSOMEIP_INFO << "systemd watchdog is enabled";
- }
- }
- if (has_interval) {
- std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
- watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2));
- watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
- this, std::placeholders::_1));
+ counter++;
+ if (counter == 6) {
+ ep_mgr_->log_client_states();
+ ep_mgr_impl_->log_client_states();
+ counter = 0;
}
- }
-}
-#endif
-void routing_manager_impl::clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable) {
- // Clear remote_service_info_
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- if (remote_service_info_.find(_service) != remote_service_info_.end()) {
- if (remote_service_info_[_service].find(_instance) != remote_service_info_[_service].end()) {
- remote_service_info_[_service][_instance].erase(_reliable);
- auto found_endpoint_def = remote_service_info_[_service][_instance].find(!_reliable);
- if (found_endpoint_def == remote_service_info_[_service][_instance].end()) {
- remote_service_info_[_service].erase(_instance);
- if (0 >= remote_service_info_[_service].size()) {
- remote_service_info_.erase(_service);
- }
- }
+ {
+ std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
+ version_log_timer_.expires_from_now(std::chrono::seconds(its_interval));
+ version_log_timer_.async_wait(
+ std::bind(&routing_manager_impl::log_version_timer_cbk,
+ this, std::placeholders::_1));
}
}
}
@@ -4123,135 +3359,28 @@ void routing_manager_impl::handle_client_error(client_t _client) {
}
for (const auto &offer : its_offers) {
offer_service(std::get<0>(offer), std::get<1>(offer), std::get<2>(offer),
- std::get<3>(offer), std::get<4>(offer));
+ std::get<3>(offer), std::get<4>(offer), true);
}
}
-void routing_manager_impl::remove_specific_client_endpoint(client_t _client, service_t _service,
- instance_t _instance, bool _reliable) {
- client_t its_client = is_specific_endpoint_client(_client, _service, _instance);
- if (its_client != VSOMEIP_ROUTING_CLIENT) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- if (remote_services_.find(_service) != remote_services_.end()) {
- if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
- auto endpoint = remote_services_[_service][_instance][_client][_reliable];
- if (endpoint) {
- service_instances_[_service].erase(endpoint.get());
- endpoint->stop();
- }
- remote_services_[_service][_instance][_client].erase(_reliable);
- auto found_endpoint = remote_services_[_service][_instance][_client].find(!_reliable);
- if (found_endpoint == remote_services_[_service][_instance][_client].end()) {
- remote_services_[_service][_instance].erase(_client);
- }
- }
- }
- }
-}
-
-void routing_manager_impl::clear_identified_clients( service_t _service, instance_t _instance) {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identified_clients_.find(_service);
- if (its_service != identified_clients_.end()) {
- auto found_instance = its_service->second.find(_instance);
- if (found_instance != its_service->second.end()) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- found_reliable->second.clear();
- }
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- found_unreliable->second.clear();
- }
- }
- }
-}
-
-void routing_manager_impl::clear_identifying_clients( service_t _service, instance_t _instance) {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identifying_clients_.find(_service);
- if (its_service != identifying_clients_.end()) {
- auto found_instance = its_service->second.find(_instance);
- if (found_instance != its_service->second.end()) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- found_reliable->second.clear();
- }
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- found_unreliable->second.clear();
- }
- }
- }
-}
-
-void routing_manager_impl::remove_identified_client(service_t _service, instance_t _instance, client_t _client) {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identified_clients_.find(_service);
- if (its_service != identified_clients_.end()) {
- auto found_instance = its_service->second.find(_instance);
- if (found_instance != its_service->second.end()) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- auto found_client = found_reliable->second.find(_client);
- if(found_client != found_reliable->second.end())
- found_reliable->second.erase(_client);
- }
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- auto found_client = found_unreliable->second.find(_client);
- if(found_client != found_unreliable->second.end())
- found_unreliable->second.erase(_client);
- }
- }
- }
-}
-
-void routing_manager_impl::remove_identifying_client(service_t _service, instance_t _instance, client_t _client) {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- auto its_service = identifying_clients_.find(_service);
- if (its_service != identifying_clients_.end()) {
- auto found_instance = its_service->second.find(_instance);
- if (found_instance != its_service->second.end()) {
- auto found_reliable = found_instance->second.find(true);
- if (found_reliable != found_instance->second.end()) {
- auto found_client = found_reliable->second.find(_client);
- if(found_client != found_reliable->second.end())
- found_reliable->second.erase(_client);
- }
- auto found_unreliable = found_instance->second.find(false);
- if (found_unreliable != found_instance->second.end()) {
- auto found_client = found_unreliable->second.find(_client);
- if(found_client != found_unreliable->second.end())
- found_unreliable->second.erase(_client);
- }
- }
- }
-}
-
-void routing_manager_impl::unsubscribe_specific_client_at_sd(
- service_t _service, instance_t _instance, client_t _client) {
- client_t subscriber = is_specific_endpoint_client(_client, _service, _instance);
- if (subscriber != VSOMEIP_ROUTING_CLIENT && discovery_) {
- discovery_->unsubscribe_client(_service, _instance, _client);
- }
+std::shared_ptr<endpoint_manager_impl> routing_manager_impl::get_endpoint_manager() const {
+ return ep_mgr_impl_;
}
void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
- event_t _event, subscription_type_e _subscription_type) {
- (void)_subscription_type;
- auto endpoint = find_local(_service, _instance);
+ event_t _event) {
+ auto endpoint = ep_mgr_->find_local(_service, _instance);
if (endpoint) {
stub_->send_subscribe(endpoint, _client,
- _service, _instance, _eventgroup, _major, _event, DEFAULT_SUBSCRIPTION);
+ _service, _instance, _eventgroup, _major, _event, PENDING_SUBSCRIPTION_ID);
}
}
void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
if(discovery_) {
switch (_routing_state) {
- case vsomeip::routing_state_e::RS_SUSPENDED:
+ case routing_state_e::RS_SUSPENDED:
{
VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is "
<< ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
@@ -4271,7 +3400,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
<< std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by "
<< std::hex << std::setw(4) << std::setfill('0') << its_client;
}
- discovery_->stop_offer_service(its_service.first, its_instance.first, its_instance.second);
+ discovery_->stop_offer_service(its_instance.second);
}
}
{
@@ -4289,13 +3418,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
// determine existing subscriptions to remote service and send StopSubscribe
for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) {
discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
- auto specific_endpoint_clients = get_specific_endpoint_clients(s.first, i.first);
- for (auto its_client : specific_endpoint_clients) {
- discovery_->unsubscribe(s.first, i.first, its_eventgroup, its_client);
- }
- for (const auto &e : find_events(s.first, i.first, its_eventgroup)) {
- e->clear_subscribers();
- }
}
const bool has_reliable(i.second->get_endpoint(true));
@@ -4308,7 +3430,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
break;
}
- case vsomeip::routing_state_e::RS_RESUMED:
+ case routing_state_e::RS_RESUMED:
{
VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was "
<< ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
@@ -4337,8 +3459,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
// Trigger initial offer phase for relevant services
for (const auto &its_service : get_offered_services()) {
for (const auto &its_instance : its_service.second) {
- discovery_->offer_service(its_service.first,
- its_instance.first, its_instance.second);
+ discovery_->offer_service(its_instance.second);
}
}
break;
@@ -4353,8 +3474,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
for (const auto &its_instance : its_service.second) {
if (host_->get_configuration()->is_someip(
its_service.first, its_instance.first)) {
- discovery_->stop_offer_service(
- its_service.first, its_instance.first, its_instance.second);
+ discovery_->stop_offer_service(its_instance.second);
}
}
}
@@ -4382,8 +3502,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
for (const auto &its_instance : its_service.second) {
if (host_->get_configuration()->is_someip(
its_service.first, its_instance.first)) {
- discovery_->offer_service(its_service.first,
- its_instance.first, its_instance.second);
+ discovery_->offer_service(its_instance.second);
}
}
}
@@ -4446,6 +3565,10 @@ void routing_manager_impl::on_net_interface_or_route_state_changed(
}
void routing_manager_impl::start_ip_routing() {
+#ifdef _WIN32
+ if_state_running_ = true;
+#endif
+
if (routing_ready_handler_) {
routing_ready_handler_();
}
@@ -4506,8 +3629,8 @@ routing_manager_impl::get_subscribed_eventgroups(
if (found_service != eventgroups_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- for (auto its_group : found_instance->second) {
- for (auto its_event : its_group.second->get_events()) {
+ for (const auto& its_group : found_instance->second) {
+ for (const auto& its_event : its_group.second->get_events()) {
if (its_event->has_subscriber(its_group.first, ANY_CLIENT)) {
its_eventgroups.insert(its_group.first);
}
@@ -4540,7 +3663,7 @@ void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups(
its_eventgroup.first, its_subscriber);
}
- client_t its_client = is_specific_endpoint_client(its_subscriber, _service, _instance);
+ client_t its_client = VSOMEIP_ROUTING_CLIENT; //is_specific_endpoint_client(its_subscriber, _service, _instance);
{
std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
const auto its_tuple =
@@ -4551,8 +3674,9 @@ void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups(
}
its_events.push_back(its_event);
}
- its_eventgroup.second->clear_targets();
- its_eventgroup.second->clear_pending_subscriptions();
+ // TODO dn: find out why this was commented out
+ //its_eventgroup.second->clear_targets();
+ //its_eventgroup.second->clear_pending_subscriptions();
}
}
}
@@ -4578,7 +3702,7 @@ void routing_manager_impl::clear_remote_subscriber(service_t _service,
void routing_manager_impl::call_sd_endpoint_connected(
const boost::system::error_code& _error,
service_t _service, instance_t _instance,
- std::shared_ptr<endpoint> _endpoint,
+ const std::shared_ptr<endpoint>& _endpoint,
std::shared_ptr<boost::asio::steady_timer> _timer) {
(void)_timer;
if (_error) {
@@ -4604,14 +3728,20 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe(
if (its_local_client == host_->get_client()) {
// received subscription for event of a service instance hosted by
// application acting as rm_impl register with own client id and shadow = false
- register_event(host_->get_client(), _service, _instance, _event,
- its_eventgroups, true, std::chrono::milliseconds::zero(), false,
+ register_event(host_->get_client(),
+ _service, _instance,
+ _event,
+ its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN,
+ std::chrono::milliseconds::zero(), false, true,
nullptr, false, false, true);
} else if (its_local_client != VSOMEIP_ROUTING_CLIENT) {
// received subscription for event of a service instance hosted on
// this node register with client id of local_client and set shadow to true
- register_event(its_local_client, _service, _instance, _event,
- its_eventgroups, true, std::chrono::milliseconds::zero(), false,
+ register_event(its_local_client,
+ _service, _instance,
+ _event, its_eventgroups, event_type_e::ET_UNKNOWN,
+ reliability_type_e::RT_UNKNOWN,
+ std::chrono::milliseconds::zero(), false, true,
nullptr, false, true, true);
} else {
// received subscription for event of a unknown or remote service instance
@@ -4620,9 +3750,12 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe(
if (its_info && !its_info->is_local()) {
// remote service, register shadow event with client ID of subscriber
// which should have called register_event
- register_event(_client, _service, _instance, _event,
- its_eventgroups, true, std::chrono::milliseconds::zero(),
- false, nullptr, false, true, true);
+ register_event(_client,
+ _service, _instance,
+ _event, its_eventgroups, event_type_e::ET_UNKNOWN,
+ reliability_type_e::RT_UNKNOWN,
+ std::chrono::milliseconds::zero(),
+ false, true, nullptr, false, true, true);
} else {
VSOMEIP_WARNING
<< "routing_manager_impl::create_placeholder_event_and_subscribe("
@@ -4642,126 +3775,84 @@ bool routing_manager_impl::create_placeholder_event_and_subscribe(
return is_inserted;
}
-void routing_manager_impl::handle_subscription_state(client_t _client, service_t _service, instance_t _instance,
+void routing_manager_impl::handle_subscription_state(
+ client_t _client, service_t _service, instance_t _instance,
eventgroup_t _eventgroup, event_t _event) {
+#if 0
+ VSOMEIP_ERROR << "routing_manager_impl::" << __func__
+ << "(" << std::hex << _client << "): "
+ << "event="
+ << std::hex << _service << "."
+ << std::hex << _instance << "."
+ << std::hex << _eventgroup << "."
+ << std::hex << _event
+ << " me="
+ << std::hex << get_client();
+#endif
+ // Note: remote_subscription_state_mutex_ is already locked as this
+ // method builds a critical section together with insert_subscription
+ // from routing_manager_base.
+ // Todo: Improve this situation...
+ auto its_event = find_event(_service, _instance, _event);
+ client_t its_client(VSOMEIP_ROUTING_CLIENT);
+ if (its_event &&
+ its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT) {
+ its_client = _client;
+ }
- client_t subscriber = is_specific_endpoint_client(_client, _service, _instance);
- auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, subscriber);
-
- std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
+ auto its_tuple
+ = std::make_tuple(_service, _instance, _eventgroup, its_client);
auto its_state = remote_subscription_state_.find(its_tuple);
if (its_state != remote_subscription_state_.end()) {
+#if 0
+ VSOMEIP_ERROR << "routing_manager_impl::" << __func__
+ << "(" << std::hex << _client << "): "
+ << "event="
+ << std::hex << _service << "."
+ << std::hex << _instance << "."
+ << std::hex << _eventgroup << "."
+ << std::hex << _event
+ << " state=" << std::hex << (int)its_state->second
+ << " me="
+ << std::hex << get_client();
+#endif
if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
// Subscription already acknowledged!
if (_client == get_client()) {
- host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/);
host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/);
} else {
stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
}
}
- } else {
- remote_subscription_state_[its_tuple] = subscription_state_e::IS_SUBSCRIBING;
}
}
-client_t routing_manager_impl::is_specific_endpoint_client(client_t _client,
- service_t _service, instance_t _instance) {
- client_t result = VSOMEIP_ROUTING_CLIENT;
- {
- std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
- auto found_service = specific_endpoint_clients_.find(_service);
- if (found_service != specific_endpoint_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_client = found_instance->second.find(_client);
- if(found_client != found_instance->second.end()) {
- result = _client;
- }
- }
- }
- }
- // A client_t != VSOMEIP_ROUTING_CLIENT implies true
- return result;
-}
-
-std::unordered_set<client_t> routing_manager_impl::get_specific_endpoint_clients(
- service_t _service, instance_t _instance) {
- std::unordered_set<client_t> result;
- {
- std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
- auto found_service = specific_endpoint_clients_.find(_service);
- if (found_service != specific_endpoint_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- result = found_instance->second;
- }
- }
- }
- return result;
-}
-
-void routing_manager_impl::send_initial_events(
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- const std::shared_ptr<endpoint_definition> &_subscriber) {
- std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service,
- _instance, _eventgroup);
- if (!its_eventgroup) {
- return;
- }
- bool is_offered_both(false);
- if (configuration_->get_reliable_port(_service, _instance) != ILLEGAL_PORT &&
- configuration_->get_unreliable_port(_service, _instance) != ILLEGAL_PORT) {
- is_offered_both = true;
- }
- // send initial events if we already have a cached field (is_set)
- for (const auto &its_event : its_eventgroup->get_events()) {
- if (its_event->is_field()) {
- if (its_event->is_set()) {
- if (!is_offered_both) {
- its_event->notify_one(_subscriber, true);
- } else {
- if (its_event->is_reliable() && _subscriber->is_reliable()) {
- its_event->notify_one(_subscriber, true);
- }
- if (!its_event->is_reliable() && !_subscriber->is_reliable()) {
- its_event->notify_one(_subscriber, true);
- }
- }
- } else {
- // received a subscription but can't notify due to missing payload
- its_event->set_remote_notification_pending(true);
- }
- }
- }
-}
-
-void routing_manager_impl::register_offer_acceptance_handler(
- vsomeip::offer_acceptance_handler_t _handler) const {
+void routing_manager_impl::register_sd_acceptance_handler(
+ const sd_acceptance_handler_t& _handler) const {
if (discovery_) {
- discovery_->register_offer_acceptance_handler(_handler);
+ discovery_->register_sd_acceptance_handler(_handler);
}
}
void routing_manager_impl::register_reboot_notification_handler(
- vsomeip::reboot_notification_handler_t _handler) const {
+ const reboot_notification_handler_t& _handler) const {
if (discovery_) {
discovery_->register_reboot_notification_handler(_handler);
}
}
void routing_manager_impl::register_routing_ready_handler(
- routing_ready_handler_t _handler) {
+ const routing_ready_handler_t& _handler) {
routing_ready_handler_ = _handler;
}
void routing_manager_impl::register_routing_state_handler(
- routing_state_handler_t _handler) {
+ const routing_state_handler_t& _handler) {
routing_state_handler_ = _handler;
}
-void routing_manager_impl::offer_acceptance_enabled(
- boost::asio::ip::address _address) {
+void routing_manager_impl::sd_acceptance_enabled(
+ const boost::asio::ip::address& _address) {
boost::system::error_code ec;
VSOMEIP_INFO << "ipsec-plugin-mgu: expire subscriptions and services: "
<< _address.to_string(ec);
@@ -4775,7 +3866,7 @@ void routing_manager_impl::memory_log_timer_cbk(
return;
}
#ifndef _WIN32
- static const std::uint32_t its_pagesize = getpagesize() / 1024;
+ static const std::uint32_t its_pagesize = static_cast<std::uint32_t>(getpagesize() / 1024);
#else
static const std::uint32_t its_pagesize = 4096 / 1024;
#endif
@@ -4836,75 +3927,7 @@ void routing_manager_impl::status_log_timer_cbk(
return;
}
- // local client endpoints
- {
- std::map<client_t, std::shared_ptr<endpoint>> lces = get_local_endpoints();
- VSOMEIP_INFO << "status local client endpoints: " << std::dec << lces.size();
- for (const auto lce : lces) {
- lce.second->print_status();
- }
- }
-
- // udp and tcp client endpoints
- {
- client_endpoints_by_ip_t client_endpoints_by_ip;
- remote_services_t remote_services;
- server_endpoints_t server_endpoints;
- {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- client_endpoints_by_ip = client_endpoints_by_ip_;
- remote_services = remote_services_;
- server_endpoints = server_endpoints_;
- }
- VSOMEIP_INFO << "status start remote client endpoints:";
- std::uint32_t num_remote_client_endpoints(0);
- // normal endpoints
- for (const auto &a : client_endpoints_by_ip) {
- for (const auto p : a.second) {
- for (const auto ru : p.second) {
- ru.second->print_status();
- num_remote_client_endpoints++;
- }
- }
- }
- VSOMEIP_INFO << "status end remote client endpoints: " << std::dec
- << num_remote_client_endpoints;
-
- // selective client endpoints
- VSOMEIP_INFO << "status start selective remote client endpoints:";
- std::uint32_t num_remote_selectiv_client_endpoints(0);
- for (const auto s : remote_services) {
- for (const auto i : s.second) {
- for (const auto c : i.second) {
- if (c.first != VSOMEIP_ROUTING_CLIENT) {
- for (const auto ur : c.second) {
- ur.second->print_status();
- num_remote_selectiv_client_endpoints++;
- }
- }
- }
- }
- }
- VSOMEIP_INFO << "status end selective remote client endpoints: "
- << std::dec << num_remote_selectiv_client_endpoints;
-
- VSOMEIP_INFO << "status start server endpoints:";
- std::uint32_t num_server_endpoints(1);
- // local server endpoints
- stub_->print_endpoint_status();
-
- // server endpoints
- for (const auto p : server_endpoints) {
- for (const auto ru : p.second ) {
- ru.second->print_status();
- num_server_endpoints++;
- }
- }
- VSOMEIP_INFO << "status end server endpoints:"
- << std::dec << num_server_endpoints;
- }
-
-
+ ep_mgr_impl_->print_status();
{
std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
boost::system::error_code ec;
@@ -4916,138 +3939,107 @@ void routing_manager_impl::status_log_timer_cbk(
}
}
-void routing_manager_impl::on_unsubscribe_ack(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup,
- pending_subscription_id_t _unsubscription_id) {
- std::shared_ptr<eventgroupinfo> its_eventgroup = find_eventgroup(_service,
- _instance, _eventgroup);
- if (!its_eventgroup) {
- VSOMEIP_ERROR << __func__ << ": Received UNSUBSCRIBE_ACK for unknown "
- << "eventgroup: ("
- << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
- return;
- }
- // there will only be one or zero subscriptions returned here
- std::vector<pending_subscription_t> its_pending_subscriptions =
- its_eventgroup->remove_pending_subscription(_unsubscription_id);
- for (const pending_subscription_t& its_sd_message_id : its_pending_subscriptions) {
- if (its_sd_message_id.pending_subscription_id_ == _unsubscription_id) {
- its_eventgroup->remove_target(its_sd_message_id.target_);
- clear_remote_subscriber(_service, _instance,
- its_sd_message_id.subscribing_client_,
- its_sd_message_id.target_);
- if (its_eventgroup->get_targets().size() == 0) {
- for (auto e : its_eventgroup->get_events()) {
- if (e->is_shadow()) {
- e->unset_payload();
+void
+routing_manager_impl::on_unsubscribe_ack(client_t _client,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ remote_subscription_id_t _id) {
+ std::shared_ptr<eventgroupinfo> its_info
+ = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_info) {
+ const auto its_subscription = its_info->get_remote_subscription(_id);
+ if (its_subscription) {
+ its_info->remove_remote_subscription(_id);
+
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ remote_subscribers_[_service][_instance].erase(_client);
+
+ if (its_info->get_remote_subscriptions().size() == 0) {
+ for (const auto &its_event : its_info->get_events()) {
+ bool has_remote_subscriber(false);
+ for (const auto &its_eventgroup : its_event->get_eventgroups()) {
+ const auto its_eventgroup_info
+ = find_eventgroup(_service, _instance, its_eventgroup);
+ if (its_eventgroup_info
+ && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
+ has_remote_subscriber = true;
+ }
+ }
+
+ if (!has_remote_subscriber && its_event->is_shadow()) {
+ its_event->unset_payload();
}
}
}
} else {
- const pending_subscription_id_t its_subscription_id =
- its_sd_message_id.pending_subscription_id_;
- const client_t its_subscribing_client = its_sd_message_id.subscribing_client_;
- const client_t its_offering_client = find_local_client(_service, _instance);
- send_subscription(its_offering_client, its_subscribing_client, _service,
- _instance, _eventgroup, its_eventgroup->get_major(),
- its_subscription_id);
+ VSOMEIP_ERROR << __func__
+ << ": Unknown StopSubscribe " << std::dec << _id << " for eventgroup ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
+ } else {
+ VSOMEIP_ERROR << __func__
+ << ": Received StopSubscribe for unknown eventgroup: ("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
}
-void routing_manager_impl::send_unsubscription(
- client_t _offering_client, client_t _subscribing_client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- pending_subscription_id_t _pending_unsubscription_id) {
+void routing_manager_impl::on_connect(const std::shared_ptr<endpoint>& _endpoint) {
+ (void)_endpoint;
+}
+void routing_manager_impl::on_disconnect(const std::shared_ptr<endpoint>& _endpoint) {
+ (void)_endpoint;
+}
+void routing_manager_impl::send_subscription(
+ const client_t _offering_client,
+ const service_t _service, const instance_t _instance,
+ const eventgroup_t _eventgroup, const major_version_t _major,
+ const std::set<client_t> &_clients,
+ const remote_subscription_id_t _id) {
if (host_->get_client() == _offering_client) {
auto self = shared_from_this();
- host_->on_subscription(_service, _instance, _eventgroup,
- _subscribing_client, false,
- [this, self, _service, _instance, _eventgroup,
- _subscribing_client, _pending_unsubscription_id, _offering_client]
- (const bool _subscription_accepted) {
- (void)_subscription_accepted;
+ for (const auto its_client : _clients) {
+ host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, true,
+ [this, self, _service, _instance, _eventgroup, its_client, _id]
+ (const bool _is_accepted) {
try {
- const auto its_callback = std::bind(
- &routing_manager_stub_host::on_unsubscribe_ack,
- std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
- _offering_client, _service, _instance,
- _eventgroup, _pending_unsubscription_id);
- io_.post(its_callback);
+ if (!_is_accepted) {
+ const auto its_callback = std::bind(
+ &routing_manager_stub_host::on_subscribe_nack,
+ std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
+ its_client, _service, _instance,
+ _eventgroup, ANY_EVENT, _id);
+ io_.post(its_callback);
+ } else {
+ const auto its_callback = std::bind(
+ &routing_manager_stub_host::on_subscribe_ack,
+ std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
+ its_client, _service, _instance,
+ _eventgroup, ANY_EVENT, _id);
+ io_.post(its_callback);
+ }
} catch (const std::exception &e) {
VSOMEIP_ERROR << __func__ << e.what();
}
- }
- );
- } else {
- if (!stub_->send_unsubscribe(find_local(_offering_client),
- _subscribing_client,
- _service, _instance, _eventgroup, ANY_EVENT,
- _pending_unsubscription_id)) {
- try {
- const auto its_callback = std::bind(
- &routing_manager_stub_host::on_unsubscribe_ack,
- std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
- _offering_client, _service, _instance,
- _eventgroup, _pending_unsubscription_id);
- io_.post(its_callback);
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << __func__ << e.what();
- }
+ });
}
- }
-}
-
-void routing_manager_impl::send_subscription(
- client_t _offering_client, client_t _subscribing_client,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
- pending_subscription_id_t _pending_subscription_id) {
- if (host_->get_client() == _offering_client) {
- auto self = shared_from_this();
- host_->on_subscription(_service, _instance, _eventgroup,
- _subscribing_client, true,
- [this, self, _service, _instance,
- _eventgroup, _subscribing_client, _pending_subscription_id]
- (const bool _subscription_accepted) {
- try {
- if (!_subscription_accepted) {
+ } else { // service hosted by local client
+ for (const auto its_client : _clients) {
+ if (!stub_->send_subscribe(find_local(_offering_client), its_client,
+ _service, _instance, _eventgroup, _major, ANY_EVENT, _id)) {
+ try {
const auto its_callback = std::bind(
&routing_manager_stub_host::on_subscribe_nack,
std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
- _subscribing_client, _service, _instance,
- _eventgroup, ANY_EVENT, _pending_subscription_id);
- io_.post(its_callback);
- } else {
- const auto its_callback = std::bind(
- &routing_manager_stub_host::on_subscribe_ack,
- std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
- _subscribing_client, _service, _instance,
- _eventgroup, ANY_EVENT, _pending_subscription_id);
+ its_client, _service, _instance, _eventgroup,
+ ANY_EVENT, _id);
io_.post(its_callback);
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << __func__ << e.what();
}
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << __func__ << e.what();
- }
- });
- } else { // service hosted by local client
- if (!stub_->send_subscribe(find_local(_offering_client),
- _subscribing_client,
- _service, _instance, _eventgroup,
- _major, ANY_EVENT,
- _pending_subscription_id)) {
- try {
- const auto its_callback = std::bind(
- &routing_manager_stub_host::on_subscribe_nack,
- std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
- _subscribing_client, _service, _instance,
- _eventgroup, ANY_EVENT, _pending_subscription_id);
- io_.post(its_callback);
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << __func__ << e.what();
}
}
}
@@ -5056,31 +4048,15 @@ void routing_manager_impl::send_subscription(
void routing_manager_impl::cleanup_server_endpoint(
service_t _service, const std::shared_ptr<endpoint>& _endpoint) {
if (_endpoint) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- bool reliable = _endpoint->is_reliable();
- // Check whether any service still uses this endpoint
- _endpoint->decrement_use_count();
- bool isLastService = (_endpoint->get_use_count() == 0);
-
- // Clear service_instances_
- if (1 >= service_instances_[_service].size()) {
- service_instances_.erase(_service);
- } else {
- service_instances_[_service].erase(_endpoint.get());
- }
-
- // Clear server endpoint if no service remains using it
- if (isLastService) {
- const uint16_t port = _endpoint->get_local_port();
- if (server_endpoints_.find(port) != server_endpoints_.end()) {
- server_endpoints_[port].erase(reliable);
- if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) {
- server_endpoints_.erase(port);
- }
+ // Clear service_instances_, check whether any service still
+ // uses this endpoint and clear server endpoint if no service
+ // remains using it
+ if (ep_mgr_impl_->remove_instance(_service, _endpoint.get())) {
+ if (ep_mgr_impl_->remove_server_endpoint(
+ _endpoint->get_local_port(), _endpoint->is_reliable())) {
+ // Stop endpoint (close socket) to release its async_handlers!
+ _endpoint->stop();
}
-
- // Stop endpoint (close socket) to release its async_handlers!
- _endpoint->stop();
}
}
}
@@ -5182,16 +4158,18 @@ void routing_manager_impl::on_security_update_timeout(
bool routing_manager_impl::update_security_policy_configuration(
uint32_t _uid, uint32_t _gid,
- ::std::shared_ptr<policy> _policy, std::shared_ptr<payload> _payload, security_update_handler_t _handler) {
+ const std::shared_ptr<policy>& _policy,
+ const std::shared_ptr<payload>& _payload,
+ const security_update_handler_t& _handler) {
bool ret(true);
// cache security policy payload for later distribution to new registering clients
stub_->policy_cache_add(_uid, _payload);
// update security policy from configuration
- configuration_->update_security_policy(_uid, _gid, _policy);
+ security::get()->update_security_policy(_uid, _gid, _policy);
// determine currently connected clients
- std::unordered_set<client_t> its_clients_to_inform = get_connected_clients();
+ std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients();
// add handler
pending_security_update_id_t its_id;
@@ -5254,13 +4232,13 @@ bool routing_manager_impl::update_security_policy_configuration(
}
bool routing_manager_impl::remove_security_policy_configuration(
- uint32_t _uid, uint32_t _gid, security_update_handler_t _handler) {
+ uint32_t _uid, uint32_t _gid, const security_update_handler_t& _handler) {
bool ret(true);
// remove security policy from configuration (only if there was a updateACL call before)
if (stub_->is_policy_cached(_uid)) {
- if (!configuration_->remove_security_policy(_uid, _gid)) {
- _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID);
+ if (!security::get()->remove_security_policy(_uid, _gid)) {
+ _handler(security_update_state_e::SU_UNKNOWN_USER_ID);
ret = false;
} else {
// remove policy from cache to prevent sending it to registering clients
@@ -5270,7 +4248,7 @@ bool routing_manager_impl::remove_security_policy_configuration(
pending_security_update_id_t its_id;
// determine currently connected clients
- std::unordered_set<client_t> its_clients_to_inform = get_connected_clients();
+ std::unordered_set<client_t> its_clients_to_inform = ep_mgr_impl_->get_connected_clients();
if (!its_clients_to_inform.empty()) {
its_id = pending_security_update_add(its_clients_to_inform);
@@ -5330,14 +4308,14 @@ bool routing_manager_impl::remove_security_policy_configuration(
}
}
else {
- _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID);
+ _handler(security_update_state_e::SU_UNKNOWN_USER_ID);
ret = false;
}
return ret;
}
pending_security_update_id_t routing_manager_impl::pending_security_update_add(
- std::unordered_set<client_t> _clients) {
+ const std::unordered_set<client_t>& _clients) {
std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
if (++pending_security_update_id_ == 0) {
pending_security_update_id_++;
@@ -5423,4 +4401,96 @@ void routing_manager_impl::on_security_update_response(
}
}
-} // namespace vsomeip
+void routing_manager_impl::print_stub_status() const {
+ stub_->print_endpoint_status();
+}
+
+void routing_manager_impl::service_endpoint_connected(
+ service_t _service, instance_t _instance, major_version_t _major,
+ minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint,
+ bool _unreliable_only) {
+
+ if (!_unreliable_only) {
+ // Mark only TCP-only and TCP+UDP services available here
+ // UDP-only services are already marked as available in add_routing_info
+ on_availability(_service, _instance, true, _major, _minor);
+ stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
+ _major, _minor);
+ }
+
+ std::shared_ptr<boost::asio::steady_timer> its_timer =
+ std::make_shared<boost::asio::steady_timer>(io_);
+ boost::system::error_code ec;
+ its_timer->expires_from_now(std::chrono::milliseconds(3), ec);
+ if (!ec) {
+ its_timer->async_wait(
+ std::bind(&routing_manager_impl::call_sd_endpoint_connected,
+ std::static_pointer_cast<routing_manager_impl>(
+ shared_from_this()), std::placeholders::_1,
+ _service, _instance, _endpoint, its_timer));
+ } else {
+ VSOMEIP_ERROR << __func__ << " " << ec.message();
+ }
+}
+
+void routing_manager_impl::service_endpoint_disconnected(
+ service_t _service, instance_t _instance, major_version_t _major,
+ minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint) {
+ (void)_endpoint;
+ on_availability(_service, _instance, false, _major, _minor);
+ stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
+ _major, _minor);
+ VSOMEIP_WARNING << __func__ << ": lost connection to remote service: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
+}
+
+void
+routing_manager_impl::send_unsubscription(client_t _offering_client,
+ service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, major_version_t _major,
+ const std::set<client_t> &_removed,
+ remote_subscription_id_t _id) {
+
+ (void)_major; // TODO: Remove completely?
+
+ if (host_->get_client() == _offering_client) {
+ auto self = shared_from_this();
+ for (const auto its_client : _removed) {
+ host_->on_subscription(_service, _instance,
+ _eventgroup, its_client, own_uid_, own_gid_, false,
+ [this, self, _service, _instance, _eventgroup,
+ its_client, _id, _offering_client]
+ (const bool _is_accepted) {
+ (void)_is_accepted;
+ try {
+ const auto its_callback = std::bind(
+ &routing_manager_stub_host::on_unsubscribe_ack,
+ std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
+ its_client, _service, _instance, _eventgroup, _id);
+ io_.post(its_callback);
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << __func__ << e.what();
+ }
+ }
+ );
+ }
+ } else {
+ for (const auto its_client : _removed) {
+ if (!stub_->send_unsubscribe(find_local(_offering_client), its_client,
+ _service, _instance, _eventgroup, ANY_EVENT, _id)) {
+ try {
+ const auto its_callback = std::bind(
+ &routing_manager_stub_host::on_unsubscribe_ack,
+ std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
+ its_client, _service, _instance, _eventgroup, _id);
+ io_.post(its_callback);
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << __func__ << e.what();
+ }
+ }
+ }
+ }
+}
+
+} // namespace vsomeip_v3