summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp465
1 files changed, 289 insertions, 176 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 7a27e4e..c1daec7 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -47,7 +47,8 @@ namespace vsomeip {
routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
routing_manager_base(_host),
- version_log_timer_(_host->get_io())
+ version_log_timer_(_host->get_io()),
+ if_state_running_(false)
#ifndef WITHOUT_SYSTEMD
, watchdog_timer_(_host->get_io())
#endif
@@ -87,16 +88,22 @@ void routing_manager_impl::init() {
discovery_ = (*its_runtime)->create_service_discovery(this);
discovery_->init();
}
- } else {
- init_routing_info(); // Static routing
}
}
void routing_manager_impl::start() {
- stub_->start();
- if (discovery_)
- discovery_->start();
+#ifndef WIN32
+ netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(),
+ configuration_->get_unicast_address());
+ netlink_connector_->register_net_if_changes_handler(
+ std::bind(&routing_manager_impl::on_net_if_state_changed,
+ this, std::placeholders::_1, std::placeholders::_2));
+ netlink_connector_->start();
+#else
+ start_ip_routing();
+#endif
+ stub_->start();
host_->on_state(state_type_e::ST_REGISTERED);
if (configuration_->log_version()) {
@@ -115,6 +122,11 @@ void routing_manager_impl::start() {
void routing_manager_impl::stop() {
version_log_timer_.cancel();
+#ifndef WIN32
+ if (netlink_connector_) {
+ netlink_connector_->stop();
+ }
+#endif
#ifndef WITHOUT_SYSTEMD
watchdog_timer_.cancel();
@@ -136,7 +148,7 @@ void routing_manager_impl::stop() {
bool routing_manager_impl::offer_service(client_t _client, service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor) {
- VSOMEIP_DEBUG << "OFFER("
+ VSOMEIP_INFO << "OFFER("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
@@ -146,7 +158,14 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service,
return false;
}
- init_service_info(_service, _instance, true);
+ {
+ std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
+ if (if_state_running_) {
+ init_service_info(_service, _instance, true);
+ } else {
+ pending_sd_offers_.push_back(std::make_pair(_service, _instance));
+ }
+ }
{
std::lock_guard<std::mutex> its_lock(events_mutex_);
@@ -181,7 +200,7 @@ bool routing_manager_impl::offer_service(client_t _client, service_t _service,
send_pending_subscriptions(_service, _instance, _major);
}
stub_->on_offer_service(_client, _service, _instance, _major, _minor);
- host_->on_availability(_service, _instance, true, _major, _minor);
+ on_availability(_service, _instance, true, _major, _minor);
return true;
}
@@ -189,23 +208,35 @@ void routing_manager_impl::stop_offer_service(client_t _client,
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor) {
- VSOMEIP_DEBUG << "STOP OFFER("
+ VSOMEIP_INFO << "STOP OFFER("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance
<< ":" << std::dec << int(_major) << "." << _minor << "]";
+ {
+ std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
+ for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
+ if (it->first == _service && it->second == _instance) {
+ it = pending_sd_offers_.erase(it);
+ break;
+ } else {
+ ++it;
+ }
+ }
+ }
+
routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor);
on_stop_offer_service(_client, _service, _instance, _major, _minor);
stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor);
- host_->on_availability(_service, _instance, false, _major, _minor);
+ on_availability(_service, _instance, false, _major, _minor);
}
void routing_manager_impl::request_service(client_t _client, service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor,
bool _use_exclusive_proxy) {
- VSOMEIP_DEBUG << "REQUEST("
+ VSOMEIP_INFO << "REQUEST("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << ":"
@@ -226,7 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
discovery_->request_service(_service, _instance, _major, _minor,
DEFAULT_TTL);
} else {
- VSOMEIP_DEBUG << std::hex
+ VSOMEIP_INFO << std::hex
<< "Avoid trigger SD find-service message"
<< " for local service/instance/major/minor: "
<< _service << "/" << _instance << std::dec
@@ -269,7 +300,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
void routing_manager_impl::release_service(client_t _client, service_t _service,
instance_t _instance) {
- VSOMEIP_DEBUG << "RELEASE("
+ VSOMEIP_INFO << "RELEASE("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
@@ -292,9 +323,11 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
if(its_info && !its_info->is_local()) {
+ unsubscribe_specific_client_at_sd(_service, _instance, _client);
if(!its_info->get_requesters_size()) {
if(discovery_) {
discovery_->release_service(_service, _instance);
+ discovery_->unsubscribe_client(_service, _instance, VSOMEIP_ROUTING_CLIENT);
}
clear_client_endpoints(_service, _instance, true);
clear_client_endpoints(_service, _instance, false);
@@ -319,7 +352,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
subscription_type_e _subscription_type) {
- VSOMEIP_DEBUG << "SUBSCRIBE("
+ VSOMEIP_INFO << "SUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
@@ -339,6 +372,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup);
}
routing_manager_base::subscribe(_client, _service, _instance, _eventgroup, _major, _subscription_type);
+ send_pending_notify_ones(_service, _instance, _eventgroup, _client);
} else {
if (discovery_) {
client_t subscriber = VSOMEIP_ROUTING_CLIENT;
@@ -365,7 +399,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
}
}
if(identify) {
- identify_for_subscribe(_client, _service, _instance, _major);
+ identify_for_subscribe(_client, _service, _instance, _major, _subscription_type);
}
}
bool inserted = insert_subscription(_service, _instance, _eventgroup, _client);
@@ -404,7 +438,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup) {
- VSOMEIP_DEBUG << "UNSUBSCRIBE("
+ VSOMEIP_INFO << "UNSUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
@@ -447,8 +481,14 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
}
}
}
- if( last_subscriber_removed )
+ if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) {
+ // for normal subscribers only unsubscribe via SD if last
+ // subscriber was removed
+ discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber);
+ } else if (subscriber != VSOMEIP_ROUTING_CLIENT) {
+ // for selective subscribers always unsubscribe at the SD
discovery_->unsubscribe(_service, _instance, _eventgroup, subscriber);
+ }
} else {
stub_->send_unsubscribe(find_local(_service, _instance),
_client, _service, _instance, _eventgroup, false);
@@ -493,7 +533,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
} else if (is_notification && _client) { // Selective notifications!
if (_client == get_client()) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -509,7 +549,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
if (its_target) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -545,7 +585,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
its_target = find_or_create_remote_client(its_service, _instance, _reliable, client);
if (its_target) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -600,7 +640,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
}
#ifdef USE_DLT
if (has_sent) {
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -618,7 +658,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
(sd_info_ ? sd_info_->get_endpoint(false) : nullptr) : its_info->get_endpoint(_reliable);
if (its_target) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -671,7 +711,7 @@ bool routing_manager_impl::send_to(
if (its_endpoint) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -691,7 +731,7 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_
if (its_endpoint) {
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
@@ -782,15 +822,23 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
}
}
-void routing_manager_impl::on_error(const byte_t *_data, length_t _length, endpoint *_receiver) {
+void routing_manager_impl::on_availability(service_t _service, instance_t _instance,
+ bool _is_available, major_version_t _major, minor_version_t _minor) {
+ host_->on_availability(_service, _instance, _is_available, _major, _minor);
+}
+
+void routing_manager_impl::on_error(
+ const byte_t *_data, length_t _length, endpoint *_receiver,
+ const boost::asio::ip::address &_remote_address,
+ std::uint16_t _remote_port) {
instance_t its_instance = 0;
if (_length >= VSOMEIP_SERVICE_POS_MAX) {
service_t its_service = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
its_instance = find_instance(its_service, _receiver);
}
- send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length,
- its_instance, _receiver->is_reliable(), _receiver);
+ send_error(return_code_e::E_MALFORMED_MESSAGE, _data, _length, its_instance,
+ _receiver->is_reliable(), _receiver, _remote_address, _remote_port);
}
void routing_manager_impl::release_port(uint16_t _port, bool _reliable) {
@@ -800,13 +848,15 @@ void routing_manager_impl::release_port(uint16_t _port, bool _reliable) {
void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
endpoint *_receiver, const boost::asio::ip::address &_destination,
- client_t _bound_client) {
+ client_t _bound_client,
+ const boost::asio::ip::address &_remote_address,
+ std::uint16_t _remote_port) {
#if 0
std::stringstream msg;
msg << "rmi::on_message: ";
for (uint32_t i = 0; i < _size; ++i)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
- VSOMEIP_DEBUG << msg.str();
+ VSOMEIP_INFO << msg.str();
#endif
(void)_bound_client;
service_t its_service;
@@ -818,16 +868,15 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
if (discovery_ && its_method == sd::method) {
- if (configuration_->get_sd_port() == _receiver->get_remote_port()) {
- boost::asio::ip::address its_address;
- if (_receiver->get_remote_address(its_address)) {
- discovery_->on_message(_data, _size, its_address, _destination);
+ if (configuration_->get_sd_port() == _remote_port) {
+ if (!_remote_address.is_unspecified()) {
+ discovery_->on_message(_data, _size, _remote_address, _destination);
} else {
VSOMEIP_ERROR << "Ignored SD message from unknown address.";
}
} else {
VSOMEIP_ERROR << "Ignored SD message from unknown port ("
- << _receiver->get_remote_port() << ")";
+ << _remote_port << ")";
}
}
} else {
@@ -843,7 +892,8 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
if(!(_size >= VSOMEIP_MESSAGE_TYPE_POS && utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]))) {
if (return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
send_error(return_code, _data, _size, its_instance,
- _receiver->is_reliable(), _receiver);
+ _receiver->is_reliable(), _receiver,
+ _remote_address, _remote_port);
return;
}
} else if(return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
@@ -893,13 +943,20 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
}
}
#ifdef USE_DLT
- uint16_t its_data_size
+ const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
tc::trace_header its_header;
- if (its_header.prepare(_receiver, false))
- tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
- _data, its_data_size);
+ const boost::asio::ip::address_v4 its_remote_address =
+ _remote_address.is_v4() ? _remote_address.to_v4() :
+ boost::asio::ip::address_v4::from_string("6.6.6.6");
+ tc::protocol_e its_protocol =
+ _receiver->is_local() ? tc::protocol_e::local :
+ _receiver->is_reliable() ? tc::protocol_e::tcp :
+ tc::protocol_e::udp;
+ its_header.prepare(its_remote_address, _remote_port, its_protocol, false);
+ tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
+ its_data_size);
#endif
}
@@ -914,7 +971,7 @@ void routing_manager_impl::on_message(
<< _service << ", " << _instance << "): ";
for (uint32_t i = 0; i < _size; ++i)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
- VSOMEIP_DEBUG << msg.str();
+ VSOMEIP_INFO << msg.str();
#endif
client_t its_client;
@@ -1031,8 +1088,7 @@ void routing_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
}
}
for (const auto &s : services_to_report_) {
- host_->on_availability(s.service_id_, s.instance_id_, true, s.major_,
- s.minor_);
+ on_availability(s.service_id_, s.instance_id_, true, s.major_, s.minor_);
if (s.reliable_) {
stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, s.service_id_,
s.instance_id_, s.major_, s.minor_);
@@ -1060,7 +1116,7 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
if(!its_info){
return;
}
- host_->on_availability(its_service.first, its_instance.first,
+ on_availability(its_service.first, its_instance.first,
false, its_info->get_major(), its_info->get_minor());
}
}
@@ -1072,7 +1128,7 @@ void routing_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
if(!its_info){
return;
}
- host_->on_availability(its_service.first, its_instance.first,
+ on_availability(its_service.first, its_instance.first,
false, its_info->get_major(), its_info->get_minor());
}
}
@@ -1119,17 +1175,22 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
}
}
}
+ std::map<event_t, std::shared_ptr<event> > events;
{
- std::lock_guard<std::mutex> its_lock(events_mutex_);
+ std::unique_lock<std::mutex> its_lock(events_mutex_);
auto its_events_service = events_.find(_service);
if (its_events_service != events_.end()) {
auto its_events_instance = its_events_service->second.find(_instance);
if (its_events_instance != its_events_service->second.end()) {
- for (auto &e : its_events_instance->second)
- e.second->unset_payload();
+ for (auto &e : its_events_instance->second) {
+ events[e.first] = e.second;
+ }
}
}
}
+ for (auto &e : events) {
+ e.second->unset_payload();
+ }
{
std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
auto its_service = eventgroup_clients_.find(_service);
@@ -1236,7 +1297,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
if (its_message) {
its_message->set_instance(_instance);
its_message->set_reliable(_reliable);
- host_->on_message(its_message);
+ host_->on_message(std::move(its_message));
is_delivered = true;
} else {
VSOMEIP_ERROR << "Routing manager: deliver_message: "
@@ -1305,18 +1366,25 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin
std::shared_ptr<endpoint> its_service_endpoint = find_server_endpoint(_port,
_reliable);
if (!its_service_endpoint) {
- its_service_endpoint = create_server_endpoint(_port, _reliable, true);
-
- if (its_service_endpoint) {
- sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR, DEFAULT_TTL,
- false); // false, because we do _not_ want to announce it...
- sd_info_->set_endpoint(its_service_endpoint, _reliable);
- its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
- _address, _port);
- its_service_endpoint->join(_address);
- } else {
- VSOMEIP_ERROR << "Service Discovery endpoint could not be created. "
- "Please check your network configuration.";
+ try {
+ its_service_endpoint = create_server_endpoint(_port, _reliable,
+ true);
+
+ if (its_service_endpoint) {
+ sd_info_ = std::make_shared<serviceinfo>(ANY_MAJOR, ANY_MINOR,
+ DEFAULT_TTL, false); // false, because we do _not_ want to announce it...
+ sd_info_->set_endpoint(its_service_endpoint, _reliable);
+ its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
+ _address, _port);
+ its_service_endpoint->join(_address);
+ } else {
+ VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
+ "Please check your network configuration.";
+ }
+ } catch (const std::exception &e) {
+ host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
+ VSOMEIP_ERROR << "Service Discovery endpoint could not be created: "
+ << e.what();
}
}
return its_service_endpoint;
@@ -1398,7 +1466,7 @@ void routing_manager_impl::init_service_info(
if (ILLEGAL_PORT == its_reliable_port
&& ILLEGAL_PORT == its_unreliable_port) {
- VSOMEIP_DEBUG << "Port configuration missing for ["
+ VSOMEIP_INFO << "Port configuration missing for ["
<< std::hex << _service << "." << _instance
<< "]. Service is internal.";
}
@@ -1427,7 +1495,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint(
boost::asio::ip::tcp::endpoint(_address, _remote_port),
io_,
configuration_->get_message_size_reliable(
- _address.to_string(), _remote_port));
+ _address.to_string(), _remote_port),
+ configuration_->get_buffer_shrink_threshold());
if (configuration_->has_enabled_magic_cookies(_address.to_string(),
_remote_port)) {
@@ -1465,7 +1534,8 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
shared_from_this(),
boost::asio::ip::tcp::endpoint(its_unicast, _port), io_,
configuration_->get_message_size_reliable(
- its_unicast.to_string(), _port));
+ its_unicast.to_string(), _port),
+ configuration_->get_buffer_shrink_threshold());
if (configuration_->has_enabled_magic_cookies(
its_unicast.to_string(), _port) ||
configuration_->has_enabled_magic_cookies(
@@ -1495,7 +1565,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
server_endpoints_[_port][_reliable] = its_endpoint;
its_endpoint->start();
}
- } catch (std::exception &e) {
+ } catch (const std::exception &e) {
host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
VSOMEIP_ERROR << e.what();
}
@@ -1887,8 +1957,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
- host_->on_availability(_service, _instance,
- true, its_info->get_major(), its_info->get_minor());
+ on_availability(_service, _instance,
+ true, its_info->get_major(), its_info->get_minor());
if (!stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
@@ -1916,7 +1986,7 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
= endpoint_definition::get(_unreliable_address, _unreliable_port, false);
remote_service_info_[_service][_instance][false] = endpoint_def;
if (!is_reliable_known) {
- host_->on_availability(_service, _instance, true, _major, _minor);
+ on_availability(_service, _instance, true, _major, _minor);
stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor);
}
}
@@ -1931,7 +2001,7 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
if(!its_info)
return;
- host_->on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor());
+ on_availability(_service, _instance, false, its_info->get_major(), its_info->get_minor());
stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, its_info->get_major(), its_info->get_minor());
// Implicit unsubscribe
{
@@ -2022,7 +2092,7 @@ std::chrono::milliseconds routing_manager_impl::update_routing_info(std::chrono:
for (auto &s : its_expired_offers) {
for (auto &i : s.second) {
- VSOMEIP_DEBUG << "update_routing_info: elapsed=" << _elapsed.count()
+ VSOMEIP_INFO << "update_routing_info: elapsed=" << _elapsed.count()
<< " : delete service/instance " << std::hex << s.first << "/" << i.first;
del_routing_info(s.first, i.first, i.second.first, i.second.second);
}
@@ -2043,15 +2113,19 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr
}
bool is_gone(false);
boost::asio::ip::address its_address;
- std::shared_ptr<endpoint> its_endpoint = i.second->get_endpoint(true);
- if (its_endpoint) {
- if (its_endpoint->get_remote_address(its_address)) {
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(
+ i.second->get_endpoint(true));
+ if (its_client_endpoint) {
+ if (its_client_endpoint->get_remote_address(its_address)) {
is_gone = (its_address == _address);
}
} else {
- its_endpoint = i.second->get_endpoint(false);
- if (its_endpoint) {
- if (its_endpoint->get_remote_address(its_address)) {
+ its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(
+ i.second->get_endpoint(false));
+ if (its_client_endpoint) {
+ if (its_client_endpoint->get_remote_address(its_address)) {
is_gone = (its_address == _address);
}
}
@@ -2070,7 +2144,7 @@ void routing_manager_impl::expire_services(const boost::asio::ip::address &_addr
for (auto &s : its_expired_offers) {
for (auto &i : s.second) {
- VSOMEIP_DEBUG << "expire_services for address: " << _address.to_string()
+ VSOMEIP_INFO << "expire_services for address: " << _address.to_string()
<< " : delete service/instance " << std::hex << s.first << "/" << i.first;
del_routing_info(s.first, i.first, i.second.first, i.second.second);
}
@@ -2175,13 +2249,13 @@ bool routing_manager_impl::on_subscribe_accepted(service_t _service, instance_t
}
if (client != VSOMEIP_ROUTING_CLIENT) {
- VSOMEIP_DEBUG << "Subscription accepted: eventgroup=" << _eventgroup
+ VSOMEIP_INFO << "Subscription accepted: eventgroup=" << _eventgroup
<< " : target: " << _target->get_address().to_string()
<< ":" << std::dec <<_target->get_port()
<< (_target->is_reliable() ? " reliable" : " unreliable")
<< " from client: 0x" << std::hex << client << ".";
} else {
- VSOMEIP_DEBUG << "Subscription accepted: eventgroup: " << _eventgroup
+ VSOMEIP_INFO << "Subscription accepted: eventgroup: " << _eventgroup
<< " : target: " << _target->get_address().to_string()
<< ":" << std::dec <<_target->get_port()
<< (_target->is_reliable() ? " reliable" : " unreliable")
@@ -2267,12 +2341,12 @@ void routing_manager_impl::on_unsubscribe(service_t _service,
client_t its_client = find_client(_service, _instance, its_eventgroup, _target);
if (its_client != VSOMEIP_ROUTING_CLIENT) {
- VSOMEIP_DEBUG << "on_unsubscribe: target: " << _target->get_address().to_string()
+ VSOMEIP_INFO << "on_unsubscribe: target: " << _target->get_address().to_string()
<< ":" << std::dec <<_target->get_port()
<< (_target->is_reliable() ? " reliable" : " unreliable")
<< " from client: 0x" << std::hex << its_client;
} else {
- VSOMEIP_DEBUG << "on_unsubscribe: target: " << _target->get_address().to_string()
+ VSOMEIP_INFO << "on_unsubscribe: target: " << _target->get_address().to_string()
<< ":" << std::dec <<_target->get_port()
<< (_target->is_reliable() ? " reliable" : " unreliable");
}
@@ -2609,7 +2683,9 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s
void routing_manager_impl::send_error(return_code_e _return_code,
const byte_t *_data, length_t _size,
instance_t _instance, bool _reliable,
- endpoint *_receiver) {
+ endpoint *_receiver,
+ const boost::asio::ip::address &_remote_address,
+ std::uint16_t _remote_port) {
client_t its_client = 0;
service_t its_service = 0;
@@ -2646,53 +2722,21 @@ void routing_manager_impl::send_error(return_code_e _return_code,
error_message->set_return_code(_return_code);
error_message->set_service(its_service);
error_message->set_session(its_session);
-
- std::lock_guard<std::mutex> its_lock(serialize_mutex_);
- if (serializer_->serialize(error_message.get())) {
- if (_receiver) {
- boost::asio::ip::address adr;
- uint16_t port;
- if (_receiver->is_reliable()) {
- auto endpoint = dynamic_cast<tcp_server_endpoint_impl*>(_receiver);
- if(!endpoint) {
- return;
- }
- if (!endpoint->get_remote_address(adr)) {
- VSOMEIP_ERROR << "routing_manager_impl::send_error: "
- "couldn't determine remote address (reliable)";
- return;
- }
- port = endpoint->get_remote_port();
- if (!port) {
- VSOMEIP_ERROR << "routing_manager_impl::send_error: "
- "couldn't determine remote port (reliable)";
- return;
- }
- } else {
- auto endpoint = dynamic_cast<udp_server_endpoint_impl*>(_receiver);
- if (!endpoint) {
- return;
- }
- if (!endpoint->get_remote_address(adr)) {
- VSOMEIP_ERROR << "routing_manager_impl::send_error: "
- "couldn't determine remote address (unreliable)";
- return;
- }
- port = endpoint->get_remote_port();
- if (!port) {
- VSOMEIP_ERROR << "routing_manager_impl::send_error: "
- "couldn't determine remote port (unreliable)";
- return;
- }
- }
- auto its_endpoint_def =
- std::make_shared<endpoint_definition>(adr, port, _receiver->is_reliable());
- its_endpoint_def->set_remote_port(_receiver->get_local_port());
- send_to(its_endpoint_def, serializer_->get_data(), serializer_->get_size(), true);
+ {
+ std::lock_guard<std::mutex> its_lock(serialize_mutex_);
+ if (serializer_->serialize(error_message.get())) {
+ if (_receiver) {
+ auto its_endpoint_def = std::make_shared<endpoint_definition>(
+ _remote_address, _remote_port,
+ _receiver->is_reliable());
+ its_endpoint_def->set_remote_port(_receiver->get_local_port());
+ send_to(its_endpoint_def, serializer_->get_data(),
+ serializer_->get_size(), true);
+ }
+ serializer_->reset();
+ } else {
+ VSOMEIP_ERROR<< "Failed to serialize error message.";
}
- serializer_->reset();
- } else {
- VSOMEIP_ERROR << "Failed to serialize error message.";
}
}
@@ -2716,55 +2760,72 @@ void routing_manager_impl::on_identify_response(client_t _client, service_t _ser
}
void routing_manager_impl::identify_for_subscribe(client_t _client,
- service_t _service, instance_t _instance, major_version_t _major) {
- if (!has_identified(_client, _service, _instance, false) &&
- !is_identifying(_client, _service, _instance, false)) {
- auto unreliable_endpoint = find_or_create_remote_client(_service, _instance, false, _client);
- if (unreliable_endpoint) {
- {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- identifying_clients_[_service][_instance][false].insert(_client);
- }
- auto message = runtime::get()->create_message(false);
- message->set_service(_service);
- message->set_instance(_instance);
- message->set_client(_client);
- message->set_method(ANY_METHOD - 1);
- message->set_interface_version(_major);
- message->set_message_type(message_type_e::MT_REQUEST);
- std::lock_guard<std::mutex> its_lock(serialize_mutex_);
- if (serializer_->serialize(message.get())) {
- unreliable_endpoint->send(serializer_->get_data(),
- serializer_->get_size());
- serializer_->reset();
- }
- }
- }
- if (!has_identified(_client, _service, _instance, true) &&
- !is_identifying(_client, _service, _instance, true)) {
- auto reliable_endpoint = find_or_create_remote_client(_service, _instance, true, _client);
- if (reliable_endpoint) {
- {
- std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
- identifying_clients_[_service][_instance][true].insert(_client);
+ service_t _service, instance_t _instance, major_version_t _major,
+ subscription_type_e _subscription_type) {
+
+ if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE
+ || _subscription_type == subscription_type_e::SU_PREFER_UNRELIABLE
+ || _subscription_type == subscription_type_e::SU_UNRELIABLE) {
+ if (!has_identified(_client, _service, _instance, false)
+ && !is_identifying(_client, _service, _instance, false)) {
+ if (!send_identify_message(_client, _service, _instance, _major,
+ false) && _subscription_type
+ == subscription_type_e::SU_PREFER_UNRELIABLE) {
+ send_identify_message(_client, _service, _instance, _major,
+ true);
}
- auto message = runtime::get()->create_message(true);
- message->set_service(_service);
- message->set_instance(_instance);
- message->set_client(_client);
- message->set_method(ANY_METHOD - 1);
- message->set_interface_version(_major);
- message->set_message_type(message_type_e::MT_REQUEST);
- std::lock_guard<std::mutex> its_lock(serialize_mutex_);
- if (serializer_->serialize(message.get())) {
- reliable_endpoint->send(serializer_->get_data(),
- serializer_->get_size());
- serializer_->reset();
+ }
+ }
+
+ if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE
+ || _subscription_type == subscription_type_e::SU_PREFER_RELIABLE
+ || _subscription_type == subscription_type_e::SU_RELIABLE) {
+ if (!has_identified(_client, _service, _instance, true)
+ && !is_identifying(_client, _service, _instance, true)) {
+ if (!send_identify_message(_client, _service, _instance, _major,
+ true) && _subscription_type
+ == subscription_type_e::SU_PREFER_RELIABLE) {
+ send_identify_message(_client, _service, _instance, _major,
+ false);
}
}
}
}
+bool routing_manager_impl::send_identify_message(client_t _client,
+ service_t _service,
+ instance_t _instance,
+ major_version_t _major,
+ bool _reliable) {
+ auto its_endpoint = find_or_create_remote_client(_service, _instance,
+ _reliable, _client);
+ if (!its_endpoint) {
+ return false;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(identified_clients_mutex_);
+ identifying_clients_[_service][_instance][_reliable].insert(_client);
+ }
+ auto message = runtime::get()->create_message(_reliable);
+ message->set_service(_service);
+ message->set_instance(_instance);
+ message->set_client(_client);
+ message->set_method(ANY_METHOD - 1);
+ message->set_interface_version(_major);
+ message->set_message_type(message_type_e::MT_REQUEST);
+ {
+ std::lock_guard<std::mutex> its_lock(serialize_mutex_);
+ if (serializer_->serialize(message.get())) {
+ its_endpoint->send(serializer_->get_data(), serializer_->get_size());
+ serializer_->reset();
+ } else {
+ return false;
+ }
+ }
+ return true;
+}
+
+
bool routing_manager_impl::supports_selective(service_t _service, instance_t _instance) {
bool supports_selective(false);
auto its_service = remote_service_info_.find(_service);
@@ -2884,7 +2945,7 @@ routing_manager_impl::expire_subscriptions() {
its_instance.first, its_eventgroup.first, true);
}
- VSOMEIP_DEBUG << "Expired subscription ("
+ VSOMEIP_INFO << "Expired subscription ("
<< std::hex << its_service.first << "."
<< its_instance .first << "."
<< its_eventgroup.first << " from "
@@ -3141,10 +3202,12 @@ void routing_manager_impl::on_pong(client_t _client) {
}
void routing_manager_impl::on_clientendpoint_error(client_t _client) {
- VSOMEIP_WARNING << "Application/Client "
- << std::hex << std::setw(4) << std::setfill('0')
- << _client << " will be deregistered because of an client endpoint error.";
- stub_->deregister_erroneous_client(_client);
+ if (stub_->is_registered(_client)) {
+ VSOMEIP_WARNING << "Application/Client "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _client << " will be deregistered because of an client endpoint error.";
+ stub_->deregister_erroneous_client(_client);
+ }
}
void routing_manager_impl::confirm_pending_offers(client_t _client) {
@@ -3208,6 +3271,7 @@ void routing_manager_impl::remove_specific_client_endpoint(client_t _client, ser
if(found_instance != found_service->second.end()) {
auto its_client = found_instance->second.find(_client);
if (its_client != found_instance->second.end()) {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
if (remote_services_.find(_service) != remote_services_.end()) {
if (remote_services_[_service].find(_instance) != remote_services_[_service].end()) {
auto endpoint = remote_services_[_service][_instance][_client][_reliable];
@@ -3307,6 +3371,26 @@ void routing_manager_impl::remove_identifying_client(service_t _service, instanc
}
}
+void routing_manager_impl::unsubscribe_specific_client_at_sd(
+ service_t _service, instance_t _instance, client_t _client) {
+ bool found(false);
+ {
+ std::lock_guard<std::mutex> its_lock(specific_endpoint_clients_mutex_);
+ auto its_service = specific_endpoint_clients_.find(_service);
+ if (its_service != specific_endpoint_clients_.end()) {
+ auto its_instance = its_service->second.find(_instance);
+ if (its_instance != its_service->second.end()) {
+ if (its_instance->second.find(_client) != its_instance->second.end()) {
+ found = true;
+ }
+ }
+ }
+ }
+ if (found && discovery_) {
+ discovery_->unsubscribe_client(_service, _instance, _client);
+ }
+}
+
void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
subscription_type_e _subscription_type) {
@@ -3377,5 +3461,34 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
}
+void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _available) {
+ if (_available != if_state_running_) {
+ if (_available) {
+ VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running.";
+ start_ip_routing();
+#ifndef WIN32
+ if (netlink_connector_) {
+ netlink_connector_->unregister_net_if_changes_handler();
+ }
+#endif
+ }
+ }
+}
+
+void routing_manager_impl::start_ip_routing() {
+ std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
+ if_state_running_ = true;
+
+ if (discovery_) {
+ discovery_->start();
+ } else {
+ init_routing_info();
+ }
+
+ for (auto its_service : pending_sd_offers_) {
+ init_service_info(its_service.first, its_service.second, true);
+ }
+ pending_sd_offers_.clear();
+}
} // namespace vsomeip