summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp1012
1 files changed, 864 insertions, 148 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index faf5282..f8dbd14 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -51,6 +51,9 @@
#include "../../utility/include/byteorder.hpp"
#include "../../utility/include/utility.hpp"
#include "../../plugin/include/plugin_manager.hpp"
+#ifdef USE_DLT
+#include "../../tracing/include/connector_impl.hpp"
+#endif
#include "../../e2e_protection/include/buffer/buffer.hpp"
#include "../../e2e_protection/include/e2exf/config.hpp"
@@ -75,7 +78,10 @@ routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
watchdog_timer_(_host->get_io()),
#endif
status_log_timer_(_host->get_io()),
- memory_log_timer_(_host->get_io())
+ memory_log_timer_(_host->get_io()),
+ pending_remote_offer_id_(0),
+ last_resume_(std::chrono::steady_clock::now().min()),
+ pending_security_update_id_(0)
{
}
@@ -90,6 +96,16 @@ client_t routing_manager_impl::get_client() const {
return routing_manager_base::get_client();
}
+std::set<client_t> routing_manager_impl::find_local_clients(service_t _service, instance_t _instance) {
+ return routing_manager_base::find_local_clients(_service, _instance);
+}
+
+bool routing_manager_impl::is_subscribe_to_any_event_allowed(client_t _client,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
+ return routing_manager_base::is_subscribe_to_any_event_allowed(_client,
+ _service, _instance, _eventgroup);
+}
+
void routing_manager_impl::init() {
routing_manager_base::init();
@@ -106,7 +122,7 @@ void routing_manager_impl::init() {
plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY);
if (its_plugin) {
VSOMEIP_INFO << "Service Discovery module loaded.";
- discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this);
+ discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this, configuration_);
discovery_->init();
} else {
VSOMEIP_ERROR << "Service Discovery module could not be loaded!";
@@ -116,11 +132,11 @@ void routing_manager_impl::init() {
if( configuration_->is_e2e_enabled()) {
VSOMEIP_INFO << "E2E protection enabled.";
- std::map<e2exf::data_identifier, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration();
+ std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration();
for (auto &identifier : its_e2e_configuration) {
auto its_cfg = identifier.second;
if(its_cfg->profile == "CRC8") {
- e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
+ e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
e2e::profile01::profile_config its_profile_config = e2e::profile01::profile_config(its_cfg->crc_offset, its_cfg->data_id,
(e2e::profile01::p01_data_id_mode) its_cfg->data_id_mode, its_cfg->data_length, its_cfg->counter_offset, its_cfg->data_id_nibble_offset);
if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) {
@@ -130,7 +146,7 @@ void routing_manager_impl::init() {
custom_checkers[its_data_identifier] = std::make_shared<e2e::profile01::profile_01_checker>(its_profile_config);
}
} else if(its_cfg->profile == "CRC32") {
- e2exf::data_identifier its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
+ e2exf::data_identifier_t its_data_identifier = {its_cfg->service_id, its_cfg->event_id};
e2e::profile_custom::profile_config its_profile_config = e2e::profile_custom::profile_config(its_cfg->crc_offset);
if ((its_cfg->variant == "protector") || (its_cfg->variant == "both")) {
@@ -237,7 +253,7 @@ void routing_manager_impl::stop() {
for (auto client: get_connected_clients()) {
if (client != VSOMEIP_ROUTING_CLIENT) {
- remove_local(client);
+ remove_local(client, true);
}
}
}
@@ -296,22 +312,35 @@ void routing_manager_impl::stop_offer_service(client_t _client,
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
<< ":" << std::dec << int(_major) << "." << _minor << "]";
-
+ bool is_local(false);
{
- std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
- for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
- if (it->first == _service && it->second == _instance) {
- it = pending_sd_offers_.erase(it);
- break;
- } else {
- ++it;
+ std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
+ is_local = (its_info && its_info->is_local());
+ }
+ if (is_local) {
+ {
+ std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
+ for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
+ if (it->first == _service && it->second == _instance) {
+ it = pending_sd_offers_.erase(it);
+ break;
+ } else {
+ ++it;
+ }
}
}
- }
- on_stop_offer_service(_client, _service, _instance, _major, _minor);
- stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor);
- on_availability(_service, _instance, false, _major, _minor);
+ on_stop_offer_service(_client, _service, _instance, _major, _minor);
+ stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor);
+ on_availability(_service, _instance, false, _major, _minor);
+ } else {
+ VSOMEIP_WARNING << __func__ << " received STOP_OFFER("
+ << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance
+ << ":" << std::dec << int(_major) << "." << _minor << "] "
+ << "for remote service --> ignore";
+ }
}
void routing_manager_impl::request_service(client_t _client, service_t _service,
@@ -435,6 +464,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
<< std::dec << (uint16_t)_major << "]";
const client_t its_local_client = find_local_client(_service, _instance);
if (get_client() == its_local_client) {
+ routing_manager_base::set_incoming_subscription_state(_client, _service, _instance,
+ _eventgroup, _event, subscription_state_e::IS_SUBSCRIBING);
auto self = shared_from_this();
host_->on_subscription(_service, _instance, _eventgroup, _client, true,
[this, self, _client, _service, _instance, _eventgroup,
@@ -452,6 +483,8 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
}
routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _event, _subscription_type);
send_pending_notify_ones(_service, _instance, _eventgroup, _client);
+ routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance,
+ _eventgroup, _event);
});
} else {
if (discovery_) {
@@ -581,8 +614,8 @@ bool routing_manager_impl::send(client_t _client,
}
bool routing_manager_impl::send(client_t _client, const byte_t *_data,
- length_t _size, instance_t _instance,
- bool _flush, bool _reliable, bool _is_valid_crc) {
+ length_t _size, instance_t _instance, bool _flush, bool _reliable,
+ client_t _bound_client, bool _is_valid_crc, bool _sent_from_remote) {
bool is_sent(false);
if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
std::shared_ptr<endpoint> its_target;
@@ -616,12 +649,12 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_target, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
#endif
- deliver_message(_data, _size, _instance, _reliable, _is_valid_crc);
+ deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote);
return true;
}
its_target = find_local(_client);
@@ -634,7 +667,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_target, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -648,7 +681,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
|| (find_local_client(its_service, _instance)
== host_->get_client() && is_request)) {
// TODO: find out how to handle session id here
- is_sent = deliver_message(_data, _size, _instance, _reliable, _is_valid_crc);
+ is_sent = deliver_message(_data, _size, _instance, _reliable, _bound_client, _is_valid_crc, _sent_from_remote);
} else {
e2e_buffer outputBuffer;
if( configuration_->is_e2e_enabled()) {
@@ -675,7 +708,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_target, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -754,7 +787,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(nullptr, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -786,7 +819,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_target, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -869,7 +902,7 @@ bool routing_manager_impl::send_to(
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_endpoint, true, _instance))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -892,7 +925,7 @@ bool routing_manager_impl::send_to(
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
if (its_header.prepare(its_endpoint, true, 0x0))
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
_data, its_data_size);
@@ -911,8 +944,10 @@ void routing_manager_impl::register_event(
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
auto its_event = find_event(_service, _instance, _event);
bool is_first(false);
- if (its_event && !its_event->has_ref(_client, _is_provided)) {
- is_first = true;
+ if (its_event) {
+ if (!its_event->has_ref(_client, _is_provided)) {
+ is_first = true;
+ }
} else {
is_first = true;
}
@@ -950,10 +985,10 @@ void routing_manager_impl::unregister_shadow_event(client_t _client,
void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload, client_t _client,
- bool _force, bool _flush) {
+ bool _force, bool _flush, bool _remote_subscriber) {
if (find_local(_client)) {
routing_manager_base::notify_one(_service, _instance, _event, _payload,
- _client, _force, _flush);
+ _client, _force, _flush, _remote_subscriber);
} else {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
@@ -1034,6 +1069,113 @@ void routing_manager_impl::release_port(uint16_t _port, bool _reliable) {
used_client_ports_[_reliable].erase(_port);
}
+bool routing_manager_impl::offer_service_remotely(service_t _service,
+ instance_t _instance,
+ std::uint16_t _port,
+ bool _reliable,
+ bool _magic_cookies_enabled) {
+ bool ret = true;
+
+ if(!is_available(_service, _instance, ANY_MAJOR)) {
+ VSOMEIP_ERROR << __func__ << ": Service ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance
+ << "] is not offered locally! Won't offer it remotely.";
+ ret = false;
+ } else {
+ // update service info in configuration
+ if (!configuration_->remote_offer_info_add(_service, _instance, _port,
+ _reliable, _magic_cookies_enabled)) {
+ ret = false;
+ } else {
+ // trigger event registration again to create shadow events
+ const client_t its_offering_client = find_local_client(_service, _instance);
+ if (its_offering_client == VSOMEIP_ROUTING_CLIENT) {
+ VSOMEIP_ERROR << __func__ << " didn't find offering client for service ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance
+ << "]";
+ ret = false;
+ } else {
+ if (!stub_->send_provided_event_resend_request(its_offering_client,
+ pending_remote_offer_add(_service, _instance))) {
+ VSOMEIP_ERROR << __func__ << ": Couldn't send event resend"
+ << "request to client 0x" << std::hex << std::setw(4)
+ << std::setfill('0') << its_offering_client << " providing service ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance
+ << "]";
+
+ ret = false;
+ }
+ }
+ }
+ }
+ return ret;
+}
+
+bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
+ instance_t _instance,
+ std::uint16_t _port,
+ bool _reliable,
+ bool _magic_cookies_enabled) {
+ bool ret = true;
+ bool service_still_offered_remote(false);
+ // update service configuration
+ if (!configuration_->remote_offer_info_remove(_service, _instance, _port,
+ _reliable, _magic_cookies_enabled, &service_still_offered_remote)) {
+ VSOMEIP_ERROR << __func__ << " couldn't remove remote offer info for service ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance
+ << "] from configuration";
+ ret = false;
+ }
+ std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
+ std::shared_ptr<endpoint> its_server_endpoint;
+ if (its_info) {
+ its_server_endpoint = its_info->get_endpoint(_reliable);
+ }
+ // don't deregister events if the service is still offered remotely
+ if (!service_still_offered_remote) {
+ const client_t its_offering_client = find_local_client(_service, _instance);
+ major_version_t its_major(0);
+ minor_version_t its_minor(0);
+ if (its_info) {
+ its_major = its_info->get_major();
+ its_minor = its_info->get_minor();
+ }
+ // unset payload and clear subcribers
+ routing_manager_base::stop_offer_service(its_offering_client,
+ _service, _instance, its_major, its_minor);
+ // unregister events
+ for (const event_t its_event_id : find_events(_service, _instance)) {
+ unregister_shadow_event(its_offering_client, _service, _instance,
+ its_event_id, true);
+ }
+ clear_targets_and_pending_sub_from_eventgroups(_service, _instance);
+ clear_remote_subscriber(_service, _instance);
+
+ if (discovery_ && its_info) {
+ discovery_->stop_offer_service(_service, _instance, its_info);
+ its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
+ }
+ } else {
+ // service is still partly offered
+ if (discovery_ && its_info) {
+ std::shared_ptr<serviceinfo> its_copied_info =
+ std::make_shared<serviceinfo>(*its_info);
+ its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
+ // ensure to not send StopOffer for endpoint on which the service is
+ // still offered
+ its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable);
+ discovery_->stop_offer_service(_service, _instance, its_copied_info);
+ }
+ }
+
+ cleanup_server_endpoint(_service, its_server_endpoint);
+ return ret;
+}
+
void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
endpoint *_receiver, const boost::asio::ip::address &_destination,
client_t _bound_client,
@@ -1119,6 +1261,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
client_t requester = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_CLIENT_POS_MIN],
_data[VSOMEIP_CLIENT_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
if (!configuration_->is_offered_remote(its_service, its_instance)) {
VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
<< "for service/instance " << its_service << "/" << its_instance
@@ -1131,11 +1276,13 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
<< " which is already used locally ~> Skip message!";
return;
}
- if (!configuration_->is_client_allowed(requester, its_service, its_instance)) {
- VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
- << "from client 0x" << requester << " for service/instance "
+ if (!configuration_->is_remote_client_allowed()) {
+ // check if policy allows remote requests.
+ VSOMEIP_WARNING << "routing_manager_impl::on_message: "
+ << std::hex << "Security: Remote client with client ID 0x" << requester
+ << " is not allowed to communicate with service/instance/method "
<< its_service << "/" << its_instance
- << " which violates the security policy ~> Skip message!";
+ << "/" << its_method;
return;
}
}
@@ -1167,7 +1314,8 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
#ifdef USE_DLT
is_forwarded =
#endif
- on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(), its_is_crc_valid);
+ on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(),
+ _bound_client, its_is_crc_valid, true);
}
}
@@ -1177,14 +1325,14 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
- tc::trace_header its_header;
+ trace::header its_header;
const boost::asio::ip::address_v4 its_remote_address =
_remote_address.is_v4() ? _remote_address.to_v4() :
boost::asio::ip::address_v4::from_string("6.6.6.6");
- tc::protocol_e its_protocol =
- _receiver->is_local() ? tc::protocol_e::local :
- _receiver->is_reliable() ? tc::protocol_e::tcp :
- tc::protocol_e::udp;
+ trace::protocol_e its_protocol =
+ _receiver->is_local() ? trace::protocol_e::local :
+ _receiver->is_reliable() ? trace::protocol_e::tcp :
+ trace::protocol_e::udp;
its_header.prepare(its_remote_address, _remote_port, its_protocol, false,
its_instance);
tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
@@ -1196,7 +1344,9 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
bool routing_manager_impl::on_message(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _size,
- bool _reliable, bool _is_valid_crc) {
+ bool _reliable, client_t _bound_client,
+ bool _is_valid_crc,
+ bool _is_from_remote) {
#if 0
std::stringstream msg;
msg << "rmi::on_message("
@@ -1219,11 +1369,13 @@ bool routing_manager_impl::on_message(
if (its_client == VSOMEIP_ROUTING_CLIENT
&& utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
- is_forwarded = deliver_notification(_service, _instance, _data, _size, _reliable, _is_valid_crc);
+ is_forwarded = deliver_notification(_service, _instance, _data, _size,
+ _reliable, _bound_client, _is_valid_crc, _is_from_remote);
} else if (its_client == host_->get_client()) {
- deliver_message(_data, _size, _instance, _reliable, _is_valid_crc);
+ deliver_message(_data, _size, _instance,
+ _reliable, _bound_client, _is_valid_crc, _is_from_remote);
} else {
- send(its_client, _data, _size, _instance, true, _reliable, _is_valid_crc); //send to proxy
+ send(its_client, _data, _size, _instance, true, _reliable, _bound_client, _is_valid_crc, _is_from_remote); //send to proxy
}
return is_forwarded;
}
@@ -1244,7 +1396,7 @@ void routing_manager_impl::on_notification(client_t _client,
its_length);
if (_notify_one) {
- notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true);
+ notify_one(_service, _instance, its_event->get_event(), its_payload, _client, true, true, false);
} else {
if (its_event->is_field()) {
if (its_event->is_set()) {
@@ -1311,6 +1463,13 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
bool reliable_;
std::shared_ptr<endpoint> endpoint_;
};
+
+ // Set to state CONNECTED as connection is not yet fully established in remote side POV
+ // but endpoint is ready to send / receive. Set to ESTABLISHED after timer expires
+ // to prevent inserting subscriptions twice or send out subscription before remote side
+ // is finished with TCP 3 way handshake
+ _endpoint->set_connected(true);
+
std::forward_list<struct service_info> services_to_report_;
{
std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
@@ -1324,7 +1483,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
if (found_endpoint->second.get() == _endpoint.get()) {
std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
if (!its_info) {
- _endpoint->set_connected(true);
+ _endpoint->set_established(true);
return;
}
services_to_report_.push_front(
@@ -1340,7 +1499,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
if (found_endpoint->second.get() == _endpoint.get()) {
std::shared_ptr<serviceinfo> its_info(find_service(its_service.first, its_instance.first));
if (!its_info) {
- _endpoint->set_connected(true);
+ _endpoint->set_established(true);
return;
}
services_to_report_.push_front(
@@ -1356,6 +1515,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
}
}
}
+
for (const auto &s : services_to_report_) {
on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_);
if (s.reliable_) {
@@ -1379,7 +1539,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
}
}
if (services_to_report_.empty()) {
- _endpoint->set_connected(true);
+ _endpoint->set_established(true);
}
}
@@ -1488,7 +1648,6 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
if (discovery_) {
if (its_info) {
if (its_info->get_major() == _major && its_info->get_minor() == _minor) {
- its_info->set_ttl(0);
discovery_->stop_offer_service(_service, _instance, its_info);
}
}
@@ -1498,59 +1657,21 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
// Cleanup reliable & unreliable server endpoints hold before
if (its_info) {
- std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
- std::shared_ptr<endpoint> its_empty_endpoint;
- bool reliable = true;
-
- // Loop over reliable/unreliable and cleanup if needed
- for (uint8_t i = 0; i < 2; ++i) {
- std::shared_ptr<endpoint> its_endpoint;
- if (reliable) {
- its_endpoint = its_reliable_endpoint;
- } else {
- its_endpoint = its_unreliable_endpoint;
- }
- if (!its_endpoint) {
- reliable = !reliable;
- continue;
- }
-
- // Check whether any service still uses this endpoint
- its_endpoint->decrement_use_count();
- bool isLastService = (its_endpoint->get_use_count() == 0);
-
- // Clear service_instances_
- if (1 >= service_instances_[_service].size()) {
- service_instances_.erase(_service);
- } else {
- service_instances_[_service].erase(its_endpoint.get());
- }
-
- // Clear server endpoint if no service remains using it
- if (isLastService) {
- uint16_t port = its_endpoint->get_local_port();
- if (server_endpoints_.find(port) != server_endpoints_.end()) {
- server_endpoints_[port].erase(reliable);
- if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) {
- server_endpoints_.erase(port);
- }
- }
-
- // Stop endpoint (close socket) to release its async_handlers!
- its_endpoint->stop();
- }
-
+ if (its_unreliable_endpoint) {
+ cleanup_server_endpoint(_service, its_unreliable_endpoint);
// Clear service info and service group
- clear_service_info(_service, _instance, reliable);
-
- // Invert reliable flag and loop again
- reliable = !reliable;
+ clear_service_info(_service, _instance, false);
+ }
+ if (its_reliable_endpoint) {
+ cleanup_server_endpoint(_service, its_reliable_endpoint);
+ // Clear service info and service group
+ clear_service_info(_service, _instance, true);
}
}
}
bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
- instance_t _instance, bool _reliable, bool _is_valid_crc) {
+ instance_t _instance, bool _reliable, client_t _bound_client, bool _is_valid_crc, bool _is_from_remote) {
bool is_delivered(false);
auto a_deserializer = get_deserializer();
@@ -1563,6 +1684,109 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
its_message->set_instance(_instance);
its_message->set_reliable(_reliable);
its_message->set_is_valid_crc(_is_valid_crc);
+
+ if (!_is_from_remote) {
+ if (utility::is_notification(its_message->get_message_type())) {
+ if (!is_response_allowed(_bound_client, its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << std::hex << " received a notification from client 0x" << _bound_client
+ << " which does not offer service/instance/event "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " ~> Skip message!";
+ return false;
+ } else {
+ if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << " isn't allowed to receive a notification from service/instance/event "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " respectively from client 0x" << _bound_client
+ << " ~> Skip message!";
+ return false;
+ }
+ }
+ } else if (utility::is_request(its_message->get_message_type())) {
+ if (configuration_->is_security_enabled()
+ && its_message->get_client() != _bound_client) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message:"
+ << " received a request from client 0x" << std::setw(4) << std::setfill('0')
+ << its_message->get_client() << " to service/instance/method "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method() << " which doesn't match the bound client 0x"
+ << std::setw(4) << std::setfill('0') << _bound_client
+ << " ~> skip message!";
+ return false;
+ }
+
+ if (!configuration_->is_client_allowed(its_message->get_client(),
+ its_message->get_service(), its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << " isn't allowed to send a request to service/instance/method "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " ~> Skip message!";
+ return false;
+ }
+ } else { // response
+ if (!is_response_allowed(_bound_client, its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << " received a response from client 0x" << _bound_client
+ << " which does not offer service/instance/method "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " ~> Skip message!";
+ return false;
+ } else {
+ if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << " isn't allowed to receive a response from service/instance/method "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " respectively from client 0x" << _bound_client
+ << " ~> Skip message!";
+ return false;
+ }
+ }
+ }
+ } else {
+ if (!configuration_->is_remote_client_allowed()) {
+ // if the message is from remote, check if
+ // policy allows remote requests.
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << std::hex << "Remote clients are not allowed"
+ << " to communicate with service/instance/method "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " respectively with client 0x" << get_client()
+ << " ~> Skip message!";
+ return false;
+ } else if (utility::is_notification(its_message->get_message_type())) {
+ if (!configuration_->is_client_allowed(get_client(), its_message->get_service(),
+ its_message->get_instance(), its_message->get_method())) {
+ VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
+ << " : routing_manager_impl::deliver_message: "
+ << " isn't allowed to receive a notification from service/instance/event "
+ << its_message->get_service() << "/" << its_message->get_instance()
+ << "/" << its_message->get_method()
+ << " respectively from remote client"
+ << " ~> Skip message!";
+ return false;
+ }
+ }
+ }
+
host_->on_message(std::move(its_message));
is_delivered = true;
} else {
@@ -1575,7 +1799,8 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
bool routing_manager_impl::deliver_notification(
service_t _service, instance_t _instance,
const byte_t *_data, length_t _length,
- bool _reliable, bool _is_valid_crc) {
+ bool _reliable, client_t _bound_client,
+ bool _is_valid_crc, bool _is_from_remote) {
method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
@@ -1605,6 +1830,10 @@ bool routing_manager_impl::deliver_notification(
}
}
const uint32_t its_length(utility::get_payload_size(_data, _length));
+ if (its_length != _length - VSOMEIP_FULL_HEADER_SIZE) {
+ VSOMEIP_ERROR << "Message length mismatch, dropping message!";
+ return false;
+ }
std::shared_ptr<payload> its_payload
= runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS],
its_length);
@@ -1616,7 +1845,7 @@ bool routing_manager_impl::deliver_notification(
for (const auto its_local_client : its_event->get_subscribers()) {
if (its_local_client == host_->get_client()) {
- deliver_message(_data, _length, _instance, _reliable, _is_valid_crc);
+ deliver_message(_data, _length, _instance, _reliable, _bound_client, _is_valid_crc, _is_from_remote);
} else {
std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
if (its_local_target) {
@@ -1809,9 +2038,12 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
configuration_->get_max_message_size_reliable(
_address.to_string(), _remote_port),
configuration_->get_buffer_shrink_threshold(),
+ // send timeout after 2/3 of configured ttl, warning after 1/3
std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
configuration_->get_endpoint_queue_limit(
- _address.to_string(), _remote_port));
+ _address.to_string(), _remote_port),
+ configuration_->get_max_tcp_restart_aborts(),
+ configuration_->get_max_tcp_connect_time());
if (configuration_->has_enabled_magic_cookies(_address.to_string(),
_remote_port)) {
@@ -1827,7 +2059,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
_local_port),
boost::asio::ip::udp::endpoint(_address, _remote_port),
io_, configuration_->get_endpoint_queue_limit(
- _address.to_string(), _remote_port));
+ _address.to_string(), _remote_port),
+ configuration_->get_udp_receive_buffer_size());
}
} catch (...) {
host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED);
@@ -1850,6 +2083,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
configuration_->get_max_message_size_reliable(
its_unicast.to_string(), _port),
configuration_->get_buffer_shrink_threshold(),
+ // send timeout after 2/3 of configured ttl, warning after 1/3
std::chrono::milliseconds(configuration_->get_sd_ttl() * 666),
configuration_->get_endpoint_queue_limit(
its_unicast.to_string(), _port));
@@ -1872,7 +2106,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
#endif
boost::asio::ip::udp::endpoint ep(its_unicast, _port);
its_endpoint = std::make_shared<udp_server_endpoint_impl>(
- shared_from_this(), ep, io_, its_limit);
+ shared_from_this(), ep, io_, its_limit,
+ configuration_->get_udp_receive_buffer_size());
}
} else {
@@ -1917,7 +2152,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint(
return (its_endpoint);
}
-void routing_manager_impl::remove_local(client_t _client) {
+void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
auto clients_subscriptions = get_subscriptions(_client);
{
std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
@@ -1925,7 +2160,7 @@ void routing_manager_impl::remove_local(client_t _client) {
remote_subscription_state_.erase(std::tuple_cat(s, std::make_tuple(_client)));
}
}
- routing_manager_base::remove_local(_client, clients_subscriptions);
+ routing_manager_base::remove_local(_client, clients_subscriptions, _remove_uid);
std::forward_list<std::pair<service_t, instance_t>> services_to_release_;
{
@@ -2300,7 +2535,7 @@ void routing_manager_impl::add_routing_info(
its_info->get_minor());
if (discovery_) {
std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
- if (ep && ep->is_connected()) {
+ if (ep && ep->is_established()) {
discovery_->on_endpoint_connected(
_service, _instance,
ep);
@@ -2361,7 +2596,7 @@ void routing_manager_impl::add_routing_info(
}
if (discovery_) {
std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
- if (ep && ep->is_connected()) {
+ if (ep && ep->is_established()) {
discovery_->on_endpoint_connected(_service, _instance, ep);
}
}
@@ -2391,7 +2626,7 @@ void routing_manager_impl::add_routing_info(
its_info->get_minor());
if (discovery_) {
std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
- if (ep && ep->is_connected()) {
+ if (ep && ep->is_established()) {
discovery_->on_endpoint_connected(
_service, _instance,
ep);
@@ -2417,33 +2652,12 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor());
stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor());
// Implicit unsubscribe
- {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- auto found_service = eventgroups_.find(_service);
- if (found_service != eventgroups_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- for (auto &its_eventgroup : found_instance->second) {
- its_eventgroup.second->clear_targets();
- its_eventgroup.second->clear_pending_subscriptions();
- }
- }
- }
- }
+ clear_targets_and_pending_sub_from_eventgroups(_service, _instance);
clear_identified_clients( _service, _instance);
clear_identifying_clients( _service, _instance);
- {
- std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
- auto found_service = remote_subscribers_.find(_service);
- if (found_service != remote_subscribers_.end()) {
- if (found_service->second.erase(_instance) > 0 &&
- !found_service->second.size()) {
- remote_subscribers_.erase(found_service);
- }
- }
- }
+ clear_remote_subscriber(_service, _instance);
if (_has_reliable) {
clear_client_endpoints(_service, _instance, true);
@@ -2509,11 +2723,8 @@ void routing_manager_impl::update_routing_info(std::chrono::milliseconds _elapse
void routing_manager_impl::expire_services(const boost::asio::ip::address &_address) {
std::map<service_t, std::vector<instance_t> > its_expired_offers;
- for (auto &s : get_services()) {
+ for (auto &s : get_services_remote()) {
for (auto &i : s.second) {
- if (find_local_client(s.first, i.first) != VSOMEIP_ROUTING_CLIENT) {
- continue; //don't expire local services
- }
bool is_gone(false);
boost::asio::ip::address its_address;
std::shared_ptr<client_endpoint> its_client_endpoint =
@@ -2919,7 +3130,14 @@ void routing_manager_impl::on_subscribe_ack(client_t _client,
if (specific_endpoint_client) {
if (_client == get_client()) {
host_->on_subscription_error(_service, _instance, _eventgroup, 0x0 /*OK*/);
- host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/);
+ if (_event == ANY_EVENT) {
+ for (const auto &its_event : its_eventgroup->get_events())
+ host_->on_subscription_status(_service, _instance,
+ _eventgroup, its_event->get_event(), 0x0 /*OK*/);
+ } else {
+ host_->on_subscription_status(_service, _instance,
+ _eventgroup, _event, 0x0 /*OK*/);
+ }
} else {
stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup,
_event);
@@ -3068,7 +3286,7 @@ bool routing_manager_impl::deliver_specific_endpoint_message(service_t _service,
_receiver->is_reliable(), VSOMEIP_SEND);
}
} else {
- deliver_message(_data, _size, _instance, _receiver->is_reliable());
+ deliver_message(_data, _size, _instance, _receiver->is_reliable(), VSOMEIP_ROUTING_CLIENT, true, true);
}
return true;
}
@@ -3484,7 +3702,7 @@ void routing_manager_impl::clear_remote_subscriber(
}
std::chrono::steady_clock::time_point
-routing_manager_impl::expire_subscriptions() {
+routing_manager_impl::expire_subscriptions(bool _force) {
struct subscriptions_info {
service_t service_id_;
instance_t instance_id_;
@@ -3507,10 +3725,14 @@ routing_manager_impl::expire_subscriptions() {
for (auto &its_eventgroup : its_instance.second) {
std::set<std::shared_ptr<endpoint_definition>> its_expired_endpoints;
for (auto &its_target : its_eventgroup.second->get_targets()) {
- if (its_target.expiration_ < now) {
+ if (_force) {
its_expired_endpoints.insert(its_target.endpoint_);
- } else if (its_target.expiration_ < next_expiration) {
- next_expiration = its_target.expiration_;
+ } else {
+ if (its_target.expiration_ < now) {
+ its_expired_endpoints.insert(its_target.endpoint_);
+ } else if (its_target.expiration_ < next_expiration) {
+ next_expiration = its_target.expiration_;
+ }
}
}
@@ -3576,7 +3798,8 @@ routing_manager_impl::expire_subscriptions() {
<< std::hex << std::setfill('0') << std::setw(4) << s.eventgroup_id_ << "] from "
<< s.invalid_endpoint_->get_address() << ":"
<< std::dec << s.invalid_endpoint_->get_port()
- << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ")";
+ << "(" << std::hex << std::setfill('0') << std::setw(4) << s.client_ << ") "
+ << _force;
}
}
return next_expiration;
@@ -3593,8 +3816,18 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
if (discovery_) {
is_diag_mode = discovery_->get_diagnosis_mode();
}
+ std::stringstream its_last_resume;
+ {
+ std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ if (last_resume_ != std::chrono::steady_clock::time_point::min()) {
+ its_last_resume << " | " << std::dec
+ << std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::steady_clock::now() - last_resume_).count() << "s";
+ }
+ }
VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | ("
- << ((is_diag_mode == true) ? "diagnosis)" : "default)");
+ << ((is_diag_mode == true) ? "diagnosis)" : "default)")
+ << its_last_resume.str();
{
std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
version_log_timer_.expires_from_now(
@@ -3838,6 +4071,8 @@ void routing_manager_impl::register_client_error_handler(client_t _client,
}
void routing_manager_impl::handle_client_error(client_t _client) {
+ VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
+ << " handles a client error(" << std::hex << _client << ")";
if (stub_)
stub_->update_registration(_client, registration_type_e::DEREGISTER_ON_ERROR);
@@ -4023,14 +4258,26 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
// stop processing of incoming SD messages
discovery_->stop();
+ // remove all remote subscriptions to remotely offered services on this node
+ expire_subscriptions(true);
+
// send StopOffer messages for remotely offered services on this node
for (const auto &its_service : get_offered_services()) {
for (const auto &its_instance : its_service.second) {
- its_instance.second->set_ttl(0);
+ if (its_instance.second->get_endpoint(true) || its_instance.second->get_endpoint(false)) {
+ const client_t its_client(find_local_client(its_service.first, its_instance.first));
+ VSOMEIP_WARNING << "service "
+ << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by "
+ << std::hex << std::setw(4) << std::setfill('0') << its_client;
+ }
discovery_->stop_offer_service(its_service.first, its_instance.first, its_instance.second);
}
}
-
+ {
+ std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
+ remote_subscription_state_.clear();
+ }
// mark all external services as offline
services_t its_remote_services;
{
@@ -4065,6 +4312,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
{
VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was "
<< ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ {
+ std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ last_resume_ = std::chrono::steady_clock::now();
+ }
// Reset relevant in service info
for (const auto &its_service : get_offered_services()) {
@@ -4076,6 +4327,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
// Switch SD back to normal operation
discovery_->set_diagnosis_mode(false);
+ if (routing_state_handler_) {
+ routing_state_handler_(_routing_state);
+ }
+
// start processing of SD messages (incoming remote offers should lead to new subscribe messages)
discovery_->start();
@@ -4098,7 +4353,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
for (const auto &its_instance : its_service.second) {
if (host_->get_configuration()->is_someip(
its_service.first, its_instance.first)) {
- its_instance.second->set_ttl(0);
discovery_->stop_offer_service(
its_service.first, its_instance.first, its_instance.second);
}
@@ -4192,6 +4446,9 @@ void routing_manager_impl::on_net_interface_or_route_state_changed(
}
void routing_manager_impl::start_ip_routing() {
+ if (routing_ready_handler_) {
+ routing_ready_handler_();
+ }
if (discovery_) {
discovery_->start();
} else {
@@ -4262,6 +4519,62 @@ routing_manager_impl::get_subscribed_eventgroups(
return its_eventgroups;
}
+void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups(
+ service_t _service, instance_t _instance) {
+ std::vector<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (const auto &its_eventgroup : found_instance->second) {
+ // As the service is gone, all subscriptions to its events
+ // do no longer exist and the last received payload is no
+ // longer valid.
+ for (auto &its_event : its_eventgroup.second->get_events()) {
+ const auto its_subscribers = its_event->get_subscribers();
+ for (const auto its_subscriber : its_subscribers) {
+ if (its_subscriber != get_client()) {
+ its_event->remove_subscriber(
+ its_eventgroup.first, its_subscriber);
+ }
+
+ client_t its_client = is_specific_endpoint_client(its_subscriber, _service, _instance);
+ {
+ std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
+ const auto its_tuple =
+ std::make_tuple(found_service->first, found_instance->first,
+ its_eventgroup.first, its_client);
+ remote_subscription_state_.erase(its_tuple);
+ }
+ }
+ its_events.push_back(its_event);
+ }
+ its_eventgroup.second->clear_targets();
+ its_eventgroup.second->clear_pending_subscriptions();
+ }
+ }
+ }
+ }
+ for (const auto& e : its_events) {
+ e->unset_payload(true);
+ }
+}
+
+void routing_manager_impl::clear_remote_subscriber(service_t _service,
+ instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ auto found_service = remote_subscribers_.find(_service);
+ if (found_service != remote_subscribers_.end()) {
+ if (found_service->second.erase(_instance) > 0 &&
+ !found_service->second.size()) {
+ remote_subscribers_.erase(found_service);
+ }
+ }
+}
+
+
void routing_manager_impl::call_sd_endpoint_connected(
const boost::system::error_code& _error,
service_t _service, instance_t _instance,
@@ -4271,7 +4584,7 @@ void routing_manager_impl::call_sd_endpoint_connected(
if (_error) {
return;
}
- _endpoint->set_connected(true);
+ _endpoint->set_established(true);
if (discovery_) {
discovery_->on_endpoint_connected(_service, _instance,
_endpoint);
@@ -4423,6 +4736,39 @@ void routing_manager_impl::send_initial_events(
}
}
+void routing_manager_impl::register_offer_acceptance_handler(
+ vsomeip::offer_acceptance_handler_t _handler) const {
+ if (discovery_) {
+ discovery_->register_offer_acceptance_handler(_handler);
+ }
+}
+
+void routing_manager_impl::register_reboot_notification_handler(
+ vsomeip::reboot_notification_handler_t _handler) const {
+ if (discovery_) {
+ discovery_->register_reboot_notification_handler(_handler);
+ }
+}
+
+void routing_manager_impl::register_routing_ready_handler(
+ routing_ready_handler_t _handler) {
+ routing_ready_handler_ = _handler;
+}
+
+void routing_manager_impl::register_routing_state_handler(
+ routing_state_handler_t _handler) {
+ routing_state_handler_ = _handler;
+}
+
+void routing_manager_impl::offer_acceptance_enabled(
+ boost::asio::ip::address _address) {
+ boost::system::error_code ec;
+ VSOMEIP_INFO << "ipsec-plugin-mgu: expire subscriptions and services: "
+ << _address.to_string(ec);
+ expire_subscriptions(_address);
+ expire_services(_address);
+}
+
void routing_manager_impl::memory_log_timer_cbk(
boost::system::error_code const & _error) {
if (_error) {
@@ -4707,4 +5053,374 @@ void routing_manager_impl::send_subscription(
}
}
+void routing_manager_impl::cleanup_server_endpoint(
+ service_t _service, const std::shared_ptr<endpoint>& _endpoint) {
+ if (_endpoint) {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ bool reliable = _endpoint->is_reliable();
+ // Check whether any service still uses this endpoint
+ _endpoint->decrement_use_count();
+ bool isLastService = (_endpoint->get_use_count() == 0);
+
+ // Clear service_instances_
+ if (1 >= service_instances_[_service].size()) {
+ service_instances_.erase(_service);
+ } else {
+ service_instances_[_service].erase(_endpoint.get());
+ }
+
+ // Clear server endpoint if no service remains using it
+ if (isLastService) {
+ const uint16_t port = _endpoint->get_local_port();
+ if (server_endpoints_.find(port) != server_endpoints_.end()) {
+ server_endpoints_[port].erase(reliable);
+ if (server_endpoints_[port].find(!reliable) == server_endpoints_[port].end()) {
+ server_endpoints_.erase(port);
+ }
+ }
+
+ // Stop endpoint (close socket) to release its async_handlers!
+ _endpoint->stop();
+ }
+ }
+}
+
+pending_remote_offer_id_t routing_manager_impl::pending_remote_offer_add(
+ service_t _service, instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
+ if (++pending_remote_offer_id_ == 0) {
+ pending_remote_offer_id_++;
+ }
+ pending_remote_offers_[pending_remote_offer_id_] = std::make_pair(_service,
+ _instance);
+ return pending_remote_offer_id_;
+}
+
+std::pair<service_t, instance_t> routing_manager_impl::pending_remote_offer_remove(
+ pending_remote_offer_id_t _id) {
+ std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
+ std::pair<service_t, instance_t> ret = std::make_pair(ANY_SERVICE,
+ ANY_INSTANCE);
+ auto found_si = pending_remote_offers_.find(_id);
+ if (found_si != pending_remote_offers_.end()) {
+ ret = found_si->second;
+ pending_remote_offers_.erase(found_si);
+ }
+ return ret;
+}
+
+void routing_manager_impl::on_resend_provided_events_response(
+ pending_remote_offer_id_t _id) {
+ const std::pair<service_t, instance_t> its_service =
+ pending_remote_offer_remove(_id);
+ if (its_service.first != ANY_SERVICE) {
+ // create server endpoint
+ std::shared_ptr<serviceinfo> its_info = find_service(its_service.first,
+ its_service.second);
+ if (its_info) {
+ its_info->set_ttl(DEFAULT_TTL);
+ init_service_info(its_service.first, its_service.second, true);
+ }
+ }
+}
+
+void routing_manager_impl::on_security_update_timeout(
+ const boost::system::error_code& _error,
+ pending_security_update_id_t _id,
+ std::shared_ptr<boost::asio::steady_timer> _timer) {
+ (void)_timer;
+ if (_error) {
+ // timer was cancelled
+ return;
+ }
+ security_update_state_e its_state = security_update_state_e::SU_UNKNOWN_USER_ID;
+ std::unordered_set<client_t> its_missing_clients = pending_security_update_get(_id);
+ {
+ // erase timer
+ std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_);
+ security_update_timers_.erase(_id);
+ }
+ {
+ // print missing responses and check if some clients did not respond because they already disconnected
+ if (!its_missing_clients.empty()) {
+ for (auto its_client : its_missing_clients) {
+ VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client
+ << " did not respond to the policy update / removal with ID: 0x" << std::hex << _id;
+ if (!find_local(its_client)) {
+ VSOMEIP_INFO << __func__ << ": Client 0x" << std::hex << its_client
+ << " is not connected anymore, do not expect answer for policy update / removal with ID: 0x"
+ << std::hex << _id;
+ pending_security_update_remove(_id, its_client);
+ }
+ }
+ }
+
+ its_missing_clients = pending_security_update_get(_id);
+ if (its_missing_clients.empty()) {
+ VSOMEIP_INFO << __func__ << ": Received all responses for "
+ "security update/removal ID: 0x" << std::hex << _id;
+ its_state = security_update_state_e::SU_SUCCESS;
+ }
+ {
+ // erase pending security update
+ std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
+ pending_security_updates_.erase(_id);
+ }
+
+ // call handler with error on timeout or with SUCCESS if missing clients are not connected
+ std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_);
+ const auto found_handler = security_update_handlers_.find(_id);
+ if (found_handler != security_update_handlers_.end()) {
+ found_handler->second(its_state);
+ security_update_handlers_.erase(found_handler);
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Callback not found for security update / removal with ID: 0x"
+ << std::hex << _id;
+ }
+ }
+}
+
+bool routing_manager_impl::update_security_policy_configuration(
+ uint32_t _uid, uint32_t _gid,
+ ::std::shared_ptr<policy> _policy, std::shared_ptr<payload> _payload, security_update_handler_t _handler) {
+ bool ret(true);
+ // cache security policy payload for later distribution to new registering clients
+ stub_->policy_cache_add(_uid, _payload);
+
+ // update security policy from configuration
+ configuration_->update_security_policy(_uid, _gid, _policy);
+
+ // determine currently connected clients
+ std::unordered_set<client_t> its_clients_to_inform = get_connected_clients();
+
+ // add handler
+ pending_security_update_id_t its_id;
+ if (!its_clients_to_inform.empty()) {
+ its_id = pending_security_update_add(its_clients_to_inform);
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_);
+ security_update_handlers_[its_id] = _handler;
+ }
+
+ {
+ std::shared_ptr<boost::asio::steady_timer> its_timer =
+ std::make_shared<boost::asio::steady_timer>(io_);
+ boost::system::error_code ec;
+ its_timer->expires_from_now(std::chrono::milliseconds(3000), ec);
+ if (!ec) {
+ its_timer->async_wait(
+ std::bind(
+ &routing_manager_impl::on_security_update_timeout,
+ std::static_pointer_cast<routing_manager_impl>(
+ shared_from_this()),
+ std::placeholders::_1, its_id, its_timer));
+ } else {
+ VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message();
+ }
+ std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_);
+ security_update_timers_[its_id] = its_timer;
+ }
+
+ // trigger all currently connected clients to update the security policy
+ uint32_t sent_counter(0);
+ uint32_t its_tranche =
+ uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1);
+ VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size()
+ << "] currently connected clients about policy update for UID: "
+ << std::dec << _uid << " with update ID: 0x" << std::hex << its_id;
+ for (auto its_client : its_clients_to_inform) {
+ if (!stub_->send_update_security_policy_request(its_client, its_id, _uid, _payload)) {
+ VSOMEIP_INFO << __func__ << ": Couldn't send update security policy "
+ << "request to client 0x" << std::hex << std::setw(4)
+ << std::setfill('0') << its_client << " policy UID: "
+ << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: "
+ << std::hex << std::setw(4) << std::setfill('0') << _gid
+ << " with update ID: 0x" << std::hex << its_id
+ << " as client already disconnected";
+ // remove client from expected answer list
+ pending_security_update_remove(its_id, its_client);
+ }
+ sent_counter++;
+ // Prevent burst
+ if (sent_counter % its_tranche == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ }
+ } else {
+ // if routing manager has no client call the handler directly
+ _handler(security_update_state_e::SU_SUCCESS);
+ }
+ return ret;
+}
+
+bool routing_manager_impl::remove_security_policy_configuration(
+ uint32_t _uid, uint32_t _gid, security_update_handler_t _handler) {
+ bool ret(true);
+
+ // remove security policy from configuration (only if there was a updateACL call before)
+ if (stub_->is_policy_cached(_uid)) {
+ if (!configuration_->remove_security_policy(_uid, _gid)) {
+ _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID);
+ ret = false;
+ } else {
+ // remove policy from cache to prevent sending it to registering clients
+ stub_->policy_cache_remove(_uid);
+
+ // add handler
+ pending_security_update_id_t its_id;
+
+ // determine currently connected clients
+ std::unordered_set<client_t> its_clients_to_inform = get_connected_clients();
+
+ if (!its_clients_to_inform.empty()) {
+ its_id = pending_security_update_add(its_clients_to_inform);
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_);
+ security_update_handlers_[its_id] = _handler;
+ }
+
+ {
+ std::shared_ptr<boost::asio::steady_timer> its_timer =
+ std::make_shared<boost::asio::steady_timer>(io_);
+ boost::system::error_code ec;
+ its_timer->expires_from_now(std::chrono::milliseconds(3000), ec);
+ if (!ec) {
+ its_timer->async_wait(
+ std::bind(
+ &routing_manager_impl::on_security_update_timeout,
+ std::static_pointer_cast<routing_manager_impl>(
+ shared_from_this()),
+ std::placeholders::_1, its_id, its_timer));
+ } else {
+ VSOMEIP_ERROR << __func__ << ": timer creation: " << ec.message();
+ }
+ std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_);
+ security_update_timers_[its_id] = its_timer;
+ }
+
+ // trigger all clients to remove the security policy
+ uint32_t sent_counter(0);
+ uint32_t its_tranche =
+ uint32_t(its_clients_to_inform.size() >= 10 ? (its_clients_to_inform.size() / 10) : 1);
+ VSOMEIP_INFO << __func__ << ": Informing [" << std::dec << its_clients_to_inform.size()
+ << "] currently connected clients about policy removal for UID: "
+ << std::dec << _uid << " with update ID: " << its_id;
+ for (auto its_client : its_clients_to_inform) {
+ if (!stub_->send_remove_security_policy_request(its_client, its_id, _uid, _gid)) {
+ VSOMEIP_INFO << __func__ << ": Couldn't send remove security policy "
+ << "request to client 0x" << std::hex << std::setw(4)
+ << std::setfill('0') << its_client << " policy UID: "
+ << std::hex << std::setw(4) << std::setfill('0') << _uid << " GID: "
+ << std::hex << std::setw(4) << std::setfill('0') << _gid
+ << " with update ID: 0x" << std::hex << its_id
+ << " as client already disconnected";
+ // remove client from expected answer list
+ pending_security_update_remove(its_id, its_client);
+ }
+ sent_counter++;
+ // Prevent burst
+ if (sent_counter % its_tranche == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ }
+ } else {
+ // if routing manager has no client call the handler directly
+ _handler(security_update_state_e::SU_SUCCESS);
+ }
+ }
+ }
+ else {
+ _handler(vsomeip::security_update_state_e::SU_UNKNOWN_USER_ID);
+ ret = false;
+ }
+ return ret;
+}
+
+pending_security_update_id_t routing_manager_impl::pending_security_update_add(
+ std::unordered_set<client_t> _clients) {
+ std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
+ if (++pending_security_update_id_ == 0) {
+ pending_security_update_id_++;
+ }
+ pending_security_updates_[pending_security_update_id_] = _clients;
+ return pending_security_update_id_;
+}
+
+std::unordered_set<client_t> routing_manager_impl::pending_security_update_get(
+ pending_security_update_id_t _id) {
+ std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
+ std::unordered_set<client_t> its_missing_clients;
+ auto found_si = pending_security_updates_.find(_id);
+ if (found_si != pending_security_updates_.end()) {
+ its_missing_clients = pending_security_updates_[_id];
+ }
+ return its_missing_clients;
+}
+
+bool routing_manager_impl::pending_security_update_remove(
+ pending_security_update_id_t _id, client_t _client) {
+ std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
+ auto found_si = pending_security_updates_.find(_id);
+ if (found_si != pending_security_updates_.end()) {
+ if (found_si->second.erase(_client)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool routing_manager_impl::is_pending_security_update_finished(
+ pending_security_update_id_t _id) {
+ std::lock_guard<std::mutex> its_lock(pending_security_updates_mutex_);
+ bool ret(false);
+ auto found_si = pending_security_updates_.find(_id);
+ if (found_si != pending_security_updates_.end()) {
+ if (!found_si->second.size()) {
+ ret = true;
+ }
+ }
+ if (ret) {
+ pending_security_updates_.erase(_id);
+ }
+ return ret;
+}
+
+void routing_manager_impl::on_security_update_response(
+ pending_security_update_id_t _id, client_t _client) {
+ if (pending_security_update_remove(_id, _client)) {
+ if (is_pending_security_update_finished(_id)) {
+ // cancel timeout timer
+ {
+ std::lock_guard<std::mutex> its_lock(security_update_timers_mutex_);
+ auto found_timer = security_update_timers_.find(_id);
+ if (found_timer != security_update_timers_.end()) {
+ boost::system::error_code ec;
+ found_timer->second->cancel(ec);
+ security_update_timers_.erase(found_timer);
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Received all responses "
+ "for security update/removal ID: 0x"
+ << std::hex << _id << " but timeout already happened";
+ }
+ }
+
+ // call handler
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(security_update_handlers_mutex_);
+ auto found_handler = security_update_handlers_.find(_id);
+ if (found_handler != security_update_handlers_.end()) {
+ found_handler->second(security_update_state_e::SU_SUCCESS);
+ security_update_handlers_.erase(found_handler);
+ VSOMEIP_INFO << __func__ << ": Received all responses for "
+ "security update/removal ID: 0x" << std::hex << _id;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Received all responses "
+ "for security update/removal ID: 0x"
+ << std::hex << _id << " but didn't find handler";
+ }
+ }
+ }
+ }
+}
+
} // namespace vsomeip