summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp269
1 files changed, 200 insertions, 69 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index 94a03b4..e33027a 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -25,6 +25,7 @@
#include "../../configuration/include/configuration.hpp"
#include "../../configuration/include/internal.hpp"
#include "../../endpoints/include/endpoint.hpp"
+#include "../../endpoints/include/client_endpoint.hpp"
#include "../../endpoints/include/endpoint_definition.hpp"
#include "../../endpoints/include/tcp_server_endpoint_impl.hpp"
#include "../../endpoints/include/udp_server_endpoint_impl.hpp"
@@ -41,8 +42,12 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
host_(_host),
port_(VSOMEIP_SD_DEFAULT_PORT),
reliable_(false),
- serializer_(std::make_shared<serializer>()),
- deserializer_(std::make_shared<deserializer>()),
+ serializer_(
+ std::make_shared<serializer>(
+ host_->get_configuration()->get_buffer_shrink_threshold())),
+ deserializer_(
+ std::make_shared<deserializer>(
+ host_->get_configuration()->get_buffer_shrink_threshold())),
ttl_timer_(_host->get_io()),
smallest_ttl_(DEFAULT_TTL),
ttl_(VSOMEIP_SD_DEFAULT_TTL),
@@ -55,7 +60,7 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY),
offer_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
- is_suspended_(false) {
+ is_suspended_(true) {
std::chrono::seconds smallest_ttl(DEFAULT_TTL);
smallest_ttl_ = std::chrono::duration_cast<std::chrono::milliseconds>(smallest_ttl);
@@ -81,6 +86,7 @@ void service_discovery_impl::init() {
host_->get_configuration();
if (its_configuration) {
unicast_ = its_configuration->get_unicast_address();
+ sd_multicast_ = its_configuration->get_sd_multicast();
port_ = its_configuration->get_sd_port();
reliable_ = (its_configuration->get_sd_protocol()
@@ -88,14 +94,6 @@ void service_discovery_impl::init() {
max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD :
VSOMEIP_MAX_UDP_SD_PAYLOAD);
- serializer_->create_data(
- reliable_ ?
- VSOMEIP_MAX_TCP_MESSAGE_SIZE :
- VSOMEIP_MAX_UDP_MESSAGE_SIZE);
-
- endpoint_ = host_->create_service_discovery_endpoint(
- its_configuration->get_sd_multicast(), port_, reliable_);
-
ttl_ = its_configuration->get_sd_ttl();
// generate random initial delay based on initial delay min and max
@@ -135,6 +133,26 @@ void service_discovery_impl::init() {
}
void service_discovery_impl::start() {
+ if (!endpoint_) {
+ endpoint_ = host_->create_service_discovery_endpoint(
+ sd_multicast_, port_, reliable_);
+ if (!endpoint_) {
+ VSOMEIP_ERROR << "Couldn't start service discovery";
+ return;
+ }
+ // Send out pending find services messages if have any
+ bool send_find(false);
+ {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ if (requested_.size()) {
+ send_find = true;
+ }
+ }
+ if (send_find) {
+ send(false, true);
+ }
+ }
+
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
@@ -174,13 +192,14 @@ void service_discovery_impl::request_service(service_t _service,
> (_major, _minor, _ttl);
}
}
- if (is_new_request) {
+ if (is_new_request && !is_suspended_) {
send(false, true);
}
}
void service_discovery_impl::release_service(service_t _service,
instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto find_service = requested_.find(_service);
if (find_service != requested_.end()) {
find_service->second.erase(_instance);
@@ -189,6 +208,7 @@ void service_discovery_impl::release_service(service_t _service,
std::shared_ptr<request>
service_discovery_impl::find_request(service_t _service, instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto find_service = requested_.find(_service);
if (find_service != requested_.end()) {
auto find_instance = find_service->second.find(_instance);
@@ -309,23 +329,44 @@ void service_discovery_impl::get_subscription_endpoints(
_unreliable = host_->find_or_create_remote_client(_service,
_instance, false, _client);
if (_unreliable) {
- *_has_address = _unreliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_unreliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
if (_reliable) {
- *_has_address = *_has_address
- || _reliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_reliable);
+ if (its_client_endpoint) {
+ *_has_address = *_has_address
+ || its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
break;
case subscription_type_e::SU_PREFER_UNRELIABLE:
_unreliable = host_->find_or_create_remote_client(_service,
_instance, false, _client);
if (_unreliable) {
- *_has_address = _unreliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_unreliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
} else {
_reliable = host_->find_or_create_remote_client(_service,
_instance, true, _client);
if (_reliable) {
- *_has_address = _reliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(
+ _reliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
}
break;
@@ -333,12 +374,23 @@ void service_discovery_impl::get_subscription_endpoints(
_reliable = host_->find_or_create_remote_client(_service,
_instance, true, _client);
if (_reliable) {
- *_has_address = _reliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_reliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
} else {
- _unreliable = host_->find_or_create_remote_client(_service,
- _instance, false, _client);
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance, false, _client);
if (_unreliable) {
- *_has_address = _unreliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(
+ _unreliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
}
break;
@@ -347,14 +399,24 @@ void service_discovery_impl::get_subscription_endpoints(
_instance,
false, _client);
if (_unreliable) {
- *_has_address = _unreliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_unreliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
break;
case subscription_type_e::SU_RELIABLE:
_reliable = host_->find_or_create_remote_client(_service, _instance,
- true, _client);
+ true, _client);
if (_reliable) {
- *_has_address = _reliable->get_remote_address(*_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_reliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
}
}
}
@@ -384,11 +446,25 @@ void service_discovery_impl::unsubscribe(service_t _service,
found_eventgroup->second.erase(_client);
auto endpoint = its_subscription->get_endpoint(false);
if (endpoint) {
- has_address = endpoint->get_remote_address(its_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(
+ endpoint);
+ if (its_client_endpoint) {
+ has_address =
+ its_client_endpoint->get_remote_address(
+ its_address);
+ }
} else {
endpoint = its_subscription->get_endpoint(true);
if (endpoint) {
- has_address = endpoint->get_remote_address(its_address);
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<
+ client_endpoint>(endpoint);
+ if (its_client_endpoint) {
+ has_address =
+ its_client_endpoint->get_remote_address(
+ its_address);
+ }
} else {
return;
}
@@ -422,6 +498,70 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins
}
}
+void service_discovery_impl::unsubscribe_client(service_t _service,
+ instance_t _instance,
+ client_t _client) {
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (!its_runtime) {
+ return;
+ }
+ std::shared_ptr < message_impl > its_message = its_runtime->create_message();
+ boost::asio::ip::address its_address;
+ bool has_address(false);
+ {
+ std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
+ std::shared_ptr < subscription > its_subscription;
+ auto found_service = subscribed_.find(_service);
+ if (found_service != subscribed_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (auto &found_eventgroup : found_instance->second) {
+ auto found_client = found_eventgroup.second.find(_client);
+ if (found_client != found_eventgroup.second.end()) {
+ its_subscription = found_client->second;
+ its_subscription->set_ttl(0);
+ found_eventgroup.second.erase(_client);
+ if (!has_address) {
+ auto endpoint = its_subscription->get_endpoint(
+ false);
+ if (endpoint) {
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<
+ client_endpoint>(endpoint);
+ if (its_client_endpoint) {
+ has_address =
+ its_client_endpoint->get_remote_address(
+ its_address);
+ }
+ } else {
+ endpoint = its_subscription->get_endpoint(true);
+ if (endpoint) {
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<
+ client_endpoint>(endpoint);
+ if (its_client_endpoint) {
+ has_address =
+ its_client_endpoint->get_remote_address(
+ its_address);
+ }
+ } else {
+ return;
+ }
+ }
+ }
+ insert_subscription(its_message, _service, _instance,
+ found_eventgroup.first, its_subscription, true,
+ true);
+ }
+ }
+ }
+ }
+ }
+ if (has_address && 0 < its_message->get_entries().size()) {
+ serialize_and_send(its_message, its_address);
+ }
+}
+
std::pair<session_t, bool> service_discovery_impl::get_session(
const boost::asio::ip::address &_address) {
std::pair<session_t, bool> its_session;
@@ -628,14 +768,14 @@ void service_discovery_impl::insert_option(
}
void service_discovery_impl::insert_find_entries(
- std::shared_ptr<message_impl> &_message, requests_t &_requests,
+ std::shared_ptr<message_impl> &_message,
uint32_t _start, uint32_t &_size, bool &_done) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
uint32_t its_size(0);
uint32_t i = 0;
_done = true;
- for (auto its_service : _requests) {
+ for (auto its_service : requested_) {
for (auto its_instance : its_service.second) {
auto its_request = its_instance.second;
uint8_t its_sent_counter = its_request->get_sent_counter();
@@ -838,7 +978,7 @@ bool service_discovery_impl::send(bool _is_announcing, bool _is_find) {
its_message = its_runtime->create_message();
its_messages.push_back(its_message);
- insert_find_entries(its_message, requested_, its_start, its_size, is_done);
+ insert_find_entries(its_message, its_start, its_size, is_done);
its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
};
its_remaining -= its_size;
@@ -868,14 +1008,16 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
msg << "sdi::on_message: ";
for (length_t i = 0; i < _length; ++i)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
- VSOMEIP_DEBUG << msg.str();
+ VSOMEIP_INFO << msg.str();
#endif
if(is_suspended_) {
return;
}
+ current_remote_address_ = _sender;
deserializer_->set_data(_data, _length);
std::shared_ptr < message_impl
> its_message(deserializer_->deserialize_sd_message());
+ deserializer_->reset();
if (its_message) {
// ignore all SD messages with source address equal to node's unicast address
if (!check_source_address(_sender)) {
@@ -1195,13 +1337,12 @@ void service_discovery_impl::send_unicast_offer_service(
uint32_t its_size(max_message_size_);
insert_offer_service(its_message, _service, _instance, _info,
its_size);
- const boost::asio::ip::address its_address(get_current_remote_address());
- if (its_address.is_unspecified()) {
+ if (current_remote_address_.is_unspecified()) {
VSOMEIP_ERROR << "service_discovery_impl::"
"send_unicast_offer_service current remote address "
"is unspecified, won't send offer.";
} else {
- serialize_and_send(its_message, its_address);
+ serialize_and_send(its_message, current_remote_address_);
}
}
}
@@ -1891,14 +2032,13 @@ bool service_discovery_impl::is_tcp_connected(service_t _service,
std::shared_ptr<serviceinfo> its_info = found_instance->second;
if(its_info) {
//get reliable server endpoint
- auto its_reliable_endpoint = its_info->get_endpoint(true);
- if(its_reliable_endpoint) {
- std::shared_ptr<tcp_server_endpoint_impl> its_ptr(std::static_pointer_cast<tcp_server_endpoint_impl>(its_reliable_endpoint));
- if( !its_ptr->is_established(its_endpoint)) {
- }
- else {
- is_connected = true;
- }
+ auto its_reliable_server_endpoint =
+ std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
+ its_info->get_endpoint(true));
+ if (its_reliable_server_endpoint
+ && its_reliable_server_endpoint->is_established(
+ its_endpoint)) {
+ is_connected = true;
}
}
}
@@ -1947,24 +2087,6 @@ void service_discovery_impl::check_ttl(const boost::system::error_code &_error)
}
}
-boost::asio::ip::address service_discovery_impl::get_current_remote_address() const {
- boost::asio::ip::address its_address;
- if (reliable_) {
- auto endpoint = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(endpoint_);
- if (endpoint && !endpoint->get_remote_address(its_address)) {
- VSOMEIP_ERROR << "service_discovery_impl::get_current_remote_address: "
- "couldn't determine remote address (reliable)";
- }
- } else {
- auto endpoint = std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
- if (endpoint && !endpoint->get_remote_address(its_address)) {
- VSOMEIP_ERROR << "service_discovery_impl::get_current_remote_address: "
- "couldn't determine remote address (unreliable)";
- }
- }
- return its_address;
-}
-
bool service_discovery_impl::check_static_header_fields(
const std::shared_ptr<const message> &_message) const {
if(_message->get_protocol_version() != protocol_version) {
@@ -2031,7 +2153,14 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
found_client->second->set_endpoint(its_unreliable, false);
}
if (endpoint) {
- endpoint->get_remote_address(its_address);
+ if (!has_address) {
+ VSOMEIP_WARNING << "service_discovery_impl::"
+ "send_subscriptions couldn't determine "
+ "address for service.instance: "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _service << "." << _instance;
+ continue;
+ }
std::shared_ptr<message_impl> its_message
= its_runtime->create_message();
@@ -2119,7 +2248,7 @@ bool service_discovery_impl::check_ipv4_address(
} else {
#if 0
- VSOMEIP_DEBUG << "First 3 triples of subscribers endpoint IP address are valid!";
+ VSOMEIP_INFO << "First 3 triples of subscribers endpoint IP address are valid!";
#endif
}
}
@@ -2418,16 +2547,18 @@ bool service_discovery_impl::send_stop_offer(
std::shared_ptr<serviceinfo> _info) {
std::shared_ptr < runtime > its_runtime = runtime_.lock();
if (its_runtime) {
- std::vector<std::shared_ptr<message_impl>> its_messages;
- std::shared_ptr<message_impl> its_message;
- its_message = its_runtime->create_message();
- its_messages.push_back(its_message);
+ if (_info->get_endpoint(false) || _info->get_endpoint(true)) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message;
+ its_message = its_runtime->create_message();
+ its_messages.push_back(its_message);
- uint32_t its_size(max_message_size_);
- insert_offer_service(its_message, _service, _instance, _info, its_size);
+ uint32_t its_size(max_message_size_);
+ insert_offer_service(its_message, _service, _instance, _info, its_size);
- // Serialize and send
- return serialize_and_send_messages(its_messages);
+ // Serialize and send
+ return serialize_and_send_messages(its_messages);
+ }
}
return false;
}