summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp400
1 files changed, 297 insertions, 103 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index 0cc2d6f..9fc955d 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -59,6 +59,8 @@ service_discovery_impl::service_discovery_impl(service_discovery_host *_host)
repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX),
cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY),
offer_debounce_timer_(_host->get_io()),
+ find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
+ find_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
is_suspended_(true) {
std::chrono::seconds smallest_ttl(DEFAULT_TTL);
@@ -140,22 +142,12 @@ void service_discovery_impl::start() {
VSOMEIP_ERROR << "Couldn't start service discovery";
return;
}
- // Send out pending find services messages if have any
- bool send_find(false);
- {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- if (requested_.size()) {
- send_find = true;
- }
- }
- if (send_find) {
- send(false, true);
- }
}
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
+ start_find_debounce_timer(true);
}
void service_discovery_impl::stop() {
@@ -170,36 +162,38 @@ void service_discovery_impl::stop() {
offer_debounce_timer_.cancel(ec);
}
{
+ std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_);
+ find_debounce_timer_.cancel(ec);
+ }
+ {
std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
for(const auto &t : repetition_phase_timers_) {
t.first->cancel(ec);
}
}
+ {
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ for(const auto &t : find_repetition_phase_timers_) {
+ t.first->cancel(ec);
+ }
+ }
+
}
void service_discovery_impl::request_service(service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor,
ttl_t _ttl) {
- bool is_new_request(true);
- {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- auto find_service = requested_.find(_service);
- if (find_service != requested_.end()) {
- auto find_instance = find_service->second.find(_instance);
- if (find_instance != find_service->second.end()) {
- is_new_request = false;
- // TODO: check version and report errors
- } else {
- find_service->second[_instance] = std::make_shared < request
- > (_major, _minor, _ttl);
- }
- } else {
- requested_[_service][_instance] = std::make_shared < request
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ auto find_service = requested_.find(_service);
+ if (find_service != requested_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance == find_service->second.end()) {
+ find_service->second[_instance] = std::make_shared < request
> (_major, _minor, _ttl);
}
- }
- if (is_new_request && !is_suspended_) {
- send(false, true);
+ } else {
+ requested_[_service][_instance] = std::make_shared < request
+ > (_major, _minor, _ttl);
}
}
@@ -239,11 +233,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
if (found_eventgroup != found_instance->second.end()) {
auto found_client = found_eventgroup->second.find(_client);
if (found_client != found_eventgroup->second.end()) {
- if (found_client->second->get_major() == _major) {
- found_client->second->set_ttl(_ttl);
- found_client->second->set_expiration(std::chrono::steady_clock::now()
- + std::chrono::seconds(_ttl));
- } else {
+ if (found_client->second->get_major() != _major) {
VSOMEIP_ERROR
<< "Subscriptions to different versions of the same "
"service instance are not supported!";
@@ -281,8 +271,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
// New subscription
std::shared_ptr < subscription > its_subscription = std::make_shared
< subscription > (_major, _ttl, its_reliable, its_unreliable,
- _subscription_type, subscribe_count,
- std::chrono::steady_clock::time_point() + std::chrono::seconds(_ttl));
+ _subscription_type, subscribe_count);
subscribed_[_service][_instance][_eventgroup][_client] = its_subscription;
if (has_address) {
@@ -774,44 +763,53 @@ void service_discovery_impl::insert_option(
}
void service_discovery_impl::insert_find_entries(
- std::shared_ptr<message_impl> &_message,
+ std::shared_ptr<message_impl> &_message, const requests_t &_requests,
uint32_t _start, uint32_t &_size, bool &_done) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
uint32_t its_size(0);
uint32_t i = 0;
_done = true;
- for (auto its_service : requested_) {
+ for (auto its_service : _requests) {
for (auto its_instance : its_service.second) {
auto its_request = its_instance.second;
- uint8_t its_sent_counter = its_request->get_sent_counter();
- if (its_sent_counter != repetitions_max_ + 1) {
- if (i >= _start) {
- if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) {
- std::shared_ptr < serviceentry_impl > its_entry =
- _message->create_service_entry();
- if (its_entry) {
- its_entry->set_type(entry_type_e::FIND_SERVICE);
- its_entry->set_service(its_service.first);
- its_entry->set_instance(its_instance.first);
- its_entry->set_major_version(its_request->get_major());
- its_entry->set_minor_version(its_request->get_minor());
- its_entry->set_ttl(its_request->get_ttl());
- its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
- its_sent_counter++;
-
- its_request->set_sent_counter(its_sent_counter);
- } else {
- VSOMEIP_ERROR << "Failed to create service entry!";
+
+ // check if release_service was called
+ auto the_service = requested_.find(its_service.first);
+ if ( the_service != requested_.end() ) {
+ auto the_instance = the_service->second.find(its_instance.first);
+ if(the_instance != the_service->second.end() ) {
+ uint8_t its_sent_counter = its_request->get_sent_counter();
+ if (its_sent_counter != repetitions_max_ + 1) {
+ if (i >= _start) {
+ if (its_size + VSOMEIP_SOMEIP_SD_ENTRY_SIZE <= max_message_size_) {
+ std::shared_ptr < serviceentry_impl > its_entry =
+ _message->create_service_entry();
+ if (its_entry) {
+ its_entry->set_type(entry_type_e::FIND_SERVICE);
+ its_entry->set_service(its_service.first);
+ its_entry->set_instance(its_instance.first);
+ its_entry->set_major_version(its_request->get_major());
+ its_entry->set_minor_version(its_request->get_minor());
+ its_entry->set_ttl(its_request->get_ttl());
+ its_size += VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
+ its_sent_counter++;
+
+ its_request->set_sent_counter(its_sent_counter);
+ } else {
+ VSOMEIP_ERROR << "Failed to create service entry!";
+ }
+ } else {
+ _done = false;
+ _size = its_size;
+ return;
+ }
}
- } else {
- _done = false;
- _size = its_size;
- return;
}
+ i++;
}
}
- i++;
+
}
}
_size = its_size;
@@ -912,7 +910,7 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
std::shared_ptr < endpoint > its_endpoint;
its_endpoint = _subscription->get_endpoint(true);
- if (its_endpoint) {
+ if (its_endpoint && its_endpoint->is_connected()) {
insert_option(_message, its_stop_entry, unicast_,
its_endpoint->get_local_port(), true);
insert_option(_message, its_entry, unicast_,
@@ -931,6 +929,25 @@ void service_discovery_impl::insert_subscription_ack(
std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) {
+
+ for (auto its_entry : _message->get_entries()) {
+ if (its_entry->is_eventgroup_entry()) {
+ std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
+ std::dynamic_pointer_cast < eventgroupentry_impl
+ > (its_entry);
+ if(its_eventgroup_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ && its_eventgroup_entry->get_service() == _service
+ && its_eventgroup_entry->get_instance() == _instance
+ && its_eventgroup_entry->get_eventgroup() == _eventgroup
+ && its_eventgroup_entry->get_major_version() == _major
+ && its_eventgroup_entry->get_reserved() == _reserved
+ && its_eventgroup_entry->get_counter() == _counter
+ && its_eventgroup_entry->get_ttl() == _ttl) {
+ return;
+ }
+ }
+ }
+
std::shared_ptr < eventgroupentry_impl > its_entry =
_message->create_eventgroup_entry();
its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP_ACK);
@@ -968,36 +985,23 @@ void service_discovery_impl::insert_subscription_nack(
its_entry->set_ttl(0x0);
}
-bool service_discovery_impl::send(bool _is_announcing, bool _is_find) {
+bool service_discovery_impl::send(bool _is_announcing) {
std::shared_ptr < runtime > its_runtime = runtime_.lock();
if (its_runtime) {
std::vector< std::shared_ptr< message_impl > > its_messages;
std::shared_ptr < message_impl > its_message;
- if (_is_find || !_is_announcing) {
- uint32_t its_start(0);
- uint32_t its_size(0);
- bool is_done(false);
- while (!is_done) {
- its_message = its_runtime->create_message();
- its_messages.push_back(its_message);
-
- insert_find_entries(its_message, its_start, its_size, is_done);
- its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
- };
- } else {
+ if(_is_announcing) {
its_message = its_runtime->create_message();
its_messages.push_back(its_message);
- }
- if (!_is_find) {
services_t its_offers = host_->get_offered_services();
fill_message_with_offer_entries(its_runtime, its_message,
its_messages, its_offers, false);
- }
- // Serialize and send
- return serialize_and_send_messages(its_messages);
+ // Serialize and send
+ return serialize_and_send_messages(its_messages);
+ }
}
return false;
}
@@ -1063,7 +1067,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
std::dynamic_pointer_cast < eventgroupentry_impl
> (its_entry);
- process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers);
+ process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination);
}
}
@@ -1182,6 +1186,8 @@ void service_discovery_impl::process_serviceentry(
} else {
std::shared_ptr<request> its_request = find_request(its_service, its_instance);
if (its_request) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ // ID: SIP_SD_830
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
}
unsubscribe_all(its_service, its_instance);
@@ -1203,8 +1209,10 @@ void service_discovery_impl::process_offerservice_serviceentry(
return;
std::shared_ptr<request> its_request = find_request(_service, _instance);
- if (its_request)
+ if (its_request) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
+ }
smallest_ttl_ = host_->add_routing_info(_service, _instance,
_major, _minor, _ttl,
@@ -1547,7 +1555,8 @@ void service_discovery_impl::process_eventgroupentry(
std::shared_ptr<eventgroupentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr < message_impl > &its_message_response,
- std::vector <accepted_subscriber_t> &accepted_subscribers) {
+ std::vector <accepted_subscriber_t> &accepted_subscribers,
+ const boost::asio::ip::address &_destination) {
service_t its_service = _entry->get_service();
instance_t its_instance = _entry->get_instance();
eventgroup_t its_eventgroup = _entry->get_eventgroup();
@@ -1567,6 +1576,14 @@ void service_discovery_impl::process_eventgroupentry(
}
if(its_type == entry_type_e::SUBSCRIBE_EVENTGROUP) {
+ if( _destination.is_multicast() ) {
+ VSOMEIP_ERROR << "Received a SubscribeEventGroup entry on multicast address";
+ if(its_ttl > 0) {
+ insert_subscription_nack(its_message_response, its_service, its_instance,
+ its_eventgroup, its_counter, its_major, its_reserved);
+ }
+ return;
+ }
if (_entry->get_num_options(1) == 0
&& _entry->get_num_options(2) == 0) {
VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry";
@@ -1785,6 +1802,7 @@ void service_discovery_impl::process_eventgroupentry(
if(its_ttl > 0) {
insert_subscription_nack(its_message_response, its_service, its_instance,
its_eventgroup, its_counter, its_major, its_reserved);
+ return;
}
break;
}
@@ -1982,12 +2000,27 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv
if (client.second->get_counter() == _counter) {
// Deliver nack
nackedClient = client.first;
- host_->on_subscribe_nack(client.first, _service, _instance, _eventgroup);
+ host_->on_subscribe_nack(client.first, _service,
+ _instance, _eventgroup, ANY_EVENT);
break;
}
}
- // Remove nacked subscription
- found_eventgroup->second.erase(nackedClient);
+
+ // Restart TCP connection only for non selective subscriptions
+ for (auto client : found_eventgroup->second) {
+ if( !client.second->is_acknowledged()
+ && client.first == VSOMEIP_ROUTING_CLIENT ) {
+ auto endpoint = client.second->get_endpoint(true);
+ if(endpoint) {
+ endpoint->restart();
+ }
+ }
+ }
+
+ // Remove nacked subscription only for selective events
+ if(nackedClient != VSOMEIP_ROUTING_CLIENT) {
+ found_eventgroup->second.erase(nackedClient);
+ }
}
}
}
@@ -2010,7 +2043,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack(
if (its_client.second->get_counter() == _counter) {
its_client.second->set_acknowledged(true);
host_->on_subscribe_ack(its_client.first, _service,
- _instance, _eventgroup);
+ _instance, _eventgroup, ANY_EVENT);
}
if (_address.is_multicast()) {
host_->on_subscribe_ack(_service, _instance, _address,
@@ -2294,6 +2327,92 @@ void service_discovery_impl::start_offer_debounce_timer(bool _first_start) {
this, std::placeholders::_1));
}
+
+
+void service_discovery_impl::start_find_debounce_timer(bool _first_start) {
+ std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_);
+ boost::system::error_code ec;
+ if (_first_start) {
+ find_debounce_timer_.expires_from_now(initial_delay_, ec);
+ } else {
+ find_debounce_timer_.expires_from_now(find_debounce_time_, ec);
+ }
+ if (ec) {
+ VSOMEIP_ERROR<< "service_discovery_impl::start_find_debounce_timer "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ find_debounce_timer_.async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_debounce_timer_expired,
+ this, std::placeholders::_1));
+}
+
+//initial delay
+void service_discovery_impl::on_find_debounce_timer_expired(
+ const boost::system::error_code &_error) {
+ if(_error) { // timer was canceled
+ return;
+ }
+ // only copy the accumulated requests of the initial wait phase
+ // if the sent counter for the request is zero.
+ requests_t repetition_phase_finds;
+ bool new_finds(false);
+ {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
+ for (const auto its_service : requested_) {
+ for (const auto its_instance : its_service.second) {
+ if( its_instance.second->get_sent_counter() == 0) {
+ repetition_phase_finds[its_service.first][its_instance.first] = its_instance.second;
+ }
+ }
+ }
+ if (repetition_phase_finds.size()) {
+ new_finds = true;
+ }
+ }
+
+ if (!new_finds) {
+ start_find_debounce_timer(false);
+ return;
+ }
+
+ // Sent out finds for the first time as initial wait phase ended
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ // Serialize and send FindService (increments sent counter in requested_ map)
+ fill_message_with_find_entries(its_runtime, its_message, its_messages,
+ repetition_phase_finds);
+ serialize_and_send_messages(its_messages);
+ }
+
+ std::chrono::milliseconds its_delay(repetitions_base_delay_);
+ std::uint8_t its_repetitions(1);
+
+ std::shared_ptr<boost::asio::steady_timer> its_timer = std::make_shared<
+ boost::asio::steady_timer>(host_->get_io());
+ {
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ find_repetition_phase_timers_[its_timer] = repetition_phase_finds;
+ }
+
+ boost::system::error_code ec;
+ its_timer->expires_from_now(its_delay, ec);
+ if (ec) {
+ VSOMEIP_ERROR<< "service_discovery_impl::on_find_debounce_timer_expired "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ its_timer->async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_repetition_phase_timer_expired,
+ this, std::placeholders::_1, its_timer, its_repetitions,
+ its_delay.count()));
+ start_find_debounce_timer(false);
+}
+
void service_discovery_impl::on_offer_debounce_timer_expired(
const boost::system::error_code &_error) {
if(_error) { // timer was canceled
@@ -2385,20 +2504,10 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
if (its_timer_pair != repetition_phase_timers_.end()) {
std::chrono::milliseconds new_delay(0);
std::uint8_t repetition(0);
+ bool move_to_main(false);
if (_repetition <= repetitions_max_) {
// sent offers, double time to wait and start timer again.
- std::shared_ptr<runtime> its_runtime = runtime_.lock();
- if (its_runtime) {
- std::vector<std::shared_ptr<message_impl>> its_messages;
- std::shared_ptr<message_impl> its_message =
- its_runtime->create_message();
- its_messages.push_back(its_message);
- fill_message_with_offer_entries(its_runtime, its_message,
- its_messages, its_timer_pair->second, true);
-
- // Serialize and send
- serialize_and_send_messages(its_messages);
- }
+
new_delay = std::chrono::milliseconds(_last_delay * 2);
repetition = ++_repetition;
} else {
@@ -2410,13 +2519,28 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
// the cyclic offer delay before moving the offers in to main
// phase
if (last_offer_shorter_half_offer_delay_ago()) {
- move_offers_into_main_phase(_timer);
- return;
+ move_to_main = true;
} else {
new_delay = cyclic_offer_delay_;
repetition = 0;
}
}
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ fill_message_with_offer_entries(its_runtime, its_message,
+ its_messages, its_timer_pair->second, true);
+
+ // Serialize and send
+ serialize_and_send_messages(its_messages);
+ }
+ if (move_to_main) {
+ move_offers_into_main_phase(_timer);
+ return;
+ }
boost::system::error_code ec;
its_timer_pair->first->expires_from_now(new_delay, ec);
if (ec) {
@@ -2433,6 +2557,55 @@ void service_discovery_impl::on_repetition_phase_timer_expired(
}
}
+
+void service_discovery_impl::on_find_repetition_phase_timer_expired(
+ const boost::system::error_code &_error,
+ std::shared_ptr<boost::asio::steady_timer> _timer,
+ std::uint8_t _repetition, std::uint32_t _last_delay) {
+ if (_error) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
+ auto its_timer_pair = find_repetition_phase_timers_.find(_timer);
+ if (its_timer_pair != find_repetition_phase_timers_.end()) {
+ std::chrono::milliseconds new_delay(0);
+ std::uint8_t repetition(0);
+ if (_repetition <= repetitions_max_) {
+ // sent findService entries in one message, double time to wait and start timer again.
+ std::shared_ptr<runtime> its_runtime = runtime_.lock();
+ if (its_runtime) {
+ std::vector<std::shared_ptr<message_impl>> its_messages;
+ std::shared_ptr<message_impl> its_message =
+ its_runtime->create_message();
+ its_messages.push_back(its_message);
+ fill_message_with_find_entries(its_runtime, its_message,
+ its_messages, its_timer_pair->second);
+ serialize_and_send_messages(its_messages);
+ }
+ new_delay = std::chrono::milliseconds(_last_delay * 2);
+ repetition = ++_repetition;
+ } else {
+ // repetition phase is now over, erase the timer on next expiry time
+ find_repetition_phase_timers_.erase(its_timer_pair);
+ return;
+ }
+ boost::system::error_code ec;
+ its_timer_pair->first->expires_from_now(new_delay, ec);
+ if (ec) {
+ VSOMEIP_ERROR <<
+ "service_discovery_impl::on_find_repetition_phase_timer_expired "
+ "setting expiry time of timer failed: " << ec.message();
+ }
+ its_timer_pair->first->async_wait(
+ std::bind(
+ &service_discovery_impl::on_find_repetition_phase_timer_expired,
+ this, std::placeholders::_1, its_timer_pair->first,
+ repetition, new_delay.count()));
+ }
+}
+
+
void service_discovery_impl::move_offers_into_main_phase(
const std::shared_ptr<boost::asio::steady_timer> &_timer) {
// HINT: make sure to lock the repetition_phase_timers_mutex_ before calling
@@ -2469,6 +2642,27 @@ void service_discovery_impl::fill_message_with_offer_entries(
}
}
+void service_discovery_impl::fill_message_with_find_entries(
+ std::shared_ptr<runtime> _runtime,
+ std::shared_ptr<message_impl> _message,
+ std::vector<std::shared_ptr<message_impl>> &_messages,
+ const requests_t &_requests) {
+ uint32_t its_start(0);
+ uint32_t its_size(0);
+ bool is_done(false);
+ while (!is_done) {
+ insert_find_entries(_message, _requests, its_start, its_size,
+ is_done);
+ its_start += its_size / VSOMEIP_SOMEIP_SD_ENTRY_SIZE;
+ if (!is_done) {
+ its_start = 0;
+ _message = _runtime->create_message();
+ _messages.push_back(_message);
+ }
+ };
+}
+
+
bool service_discovery_impl::serialize_and_send_messages(
const std::vector<std::shared_ptr<message_impl>> &_messages) {
bool has_sent(false);
@@ -2585,7 +2779,7 @@ void service_discovery_impl::on_main_phase_timer_expired(
if (_error) {
return;
}
- send(true, false);
+ send(true);
start_main_phase_timer();
}