summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_base.cpp')
-rw-r--r--implementation/routing/src/routing_manager_base.cpp372
1 files changed, 218 insertions, 154 deletions
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index ac5da9c..8f10057 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -95,16 +95,7 @@ void routing_manager_base::stop_offer_service(client_t _client, service_t _servi
}
for (auto &e : events) {
e.second->unset_payload();
- }
- {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- its_instance->second.clear();
- }
- }
+ e.second->clear_subscribers();
}
}
@@ -142,6 +133,7 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
epsilon_change_func_t _epsilon_change_func,
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ bool transfer_subscriptions_from_any_event(false);
if (its_event) {
if(!its_event->is_cache_placeholder()) {
if (its_event->is_field() == _is_field) {
@@ -151,9 +143,13 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
if (_is_shadow && _is_provided) {
its_event->set_shadow(_is_shadow);
}
+ if (_client == host_->get_client() && _is_provided) {
+ its_event->set_shadow(false);
+ }
for (auto eg : _eventgroups) {
its_event->add_eventgroup(eg);
}
+ transfer_subscriptions_from_any_event = true;
} else {
VSOMEIP_ERROR << "Event registration update failed. "
"Specified arguments do not match existing registration.";
@@ -168,6 +164,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
if (_is_shadow && _is_provided) {
its_event->set_shadow(_is_shadow);
}
+ if (_client == host_->get_client() && _is_provided) {
+ its_event->set_shadow(false);
+ }
its_event->set_field(_is_field);
its_event->set_provided(_is_provided);
its_event->set_cache_placeholder(false);
@@ -180,7 +179,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
its_eventgroups.insert(_event);
its_event->set_eventgroups(its_eventgroups);
} else {
- its_event->set_eventgroups(_eventgroups);
+ for (auto eg : _eventgroups) {
+ its_event->add_eventgroup(eg);
+ }
}
its_event->set_epsilon_change_function(_epsilon_change_func);
@@ -211,8 +212,33 @@ void routing_manager_base::register_event(client_t _client, service_t _service,
its_event->set_epsilon_change_function(_epsilon_change_func);
its_event->set_change_resets_cycle(_change_resets_cycle);
its_event->set_update_cycle(_cycle);
+
+ if (_is_provided) {
+ transfer_subscriptions_from_any_event = true;
+ }
}
+ if (transfer_subscriptions_from_any_event) {
+ // check if someone subscribed to ANY_EVENT and the subscription
+ // was stored in the cache placeholder. Move the subscribers
+ // into new event
+ std::shared_ptr<event> its_any_event =
+ find_event(_service, _instance, ANY_EVENT);
+ if (its_any_event) {
+ std::set<eventgroup_t> any_events_eventgroups =
+ its_any_event->get_eventgroups();
+ for (eventgroup_t eventgroup : _eventgroups) {
+ auto found_eg = any_events_eventgroups.find(eventgroup);
+ if (found_eg != any_events_eventgroups.end()) {
+ std::set<client_t> its_any_event_subscribers =
+ its_any_event->get_subscribers(eventgroup);
+ for (const client_t subscriber : its_any_event_subscribers) {
+ its_event->add_subscriber(eventgroup, subscriber);
+ }
+ }
+ }
+ }
+ }
if(!_is_cache_placeholder) {
its_event->add_ref(_client, _is_provided);
}
@@ -265,24 +291,6 @@ void routing_manager_base::unregister_event(client_t _client, service_t _service
}
}
-std::shared_ptr<event> routing_manager_base::get_event(
- service_t _service, instance_t _instance, event_t _event) const {
- std::shared_ptr<event> its_event;
- std::lock_guard<std::mutex> its_lock(events_mutex_);
- auto found_service = events_.find(_service);
- if (found_service != events_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_event = found_instance->second.find(_event);
- if (found_event != found_instance->second.end()) {
- return found_event->second;
- }
- }
- }
- return its_event;
-}
-
-
std::set<std::shared_ptr<event>> routing_manager_base::find_events(
service_t _service, instance_t _instance,
eventgroup_t _eventgroup) const {
@@ -303,45 +311,30 @@ std::set<std::shared_ptr<event>> routing_manager_base::find_events(
void routing_manager_base::subscribe(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- major_version_t _major,
+ major_version_t _major, event_t _event,
subscription_type_e _subscription_type) {
(void) _major;
(void) _subscription_type;
- bool inserted = insert_subscription(_service, _instance, _eventgroup, _client);
+ bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client);
if (inserted) {
- auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
- if (its_eventgroup) {
- std::set<std::shared_ptr<event> > its_events
- = its_eventgroup->get_events();
- for (auto e : its_events) {
- if (e->is_field())
- e->notify_one(_client, true); // TODO: use _flush to send all events together!
- }
- }
+ notify_one_current_value(_client, _service, _instance, _eventgroup, _event);
}
}
void routing_manager_base::unsubscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto its_group = its_instance->second.find(_eventgroup);
- if (its_group != its_instance->second.end()) {
- its_group->second.erase(_client);
- if (!its_group->second.size()) {
- its_instance->second.erase(_eventgroup);
- if (!its_instance->second.size()) {
- its_service->second.erase(_instance);
- if (!its_service->second.size()) {
- eventgroup_clients_.erase(_service);
- }
- }
- }
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
+ if (_event != ANY_EVENT) {
+ auto its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ its_event->remove_subscriber(_eventgroup, _client);
+ }
+ } else {
+ auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eventgroup) {
+ for (auto e : its_eventgroup->get_events()) {
+ e->remove_subscriber(_eventgroup, _client);
}
}
}
@@ -378,20 +371,7 @@ void routing_manager_base::notify_one(service_t _service, instance_t _instance,
found_eventgroup = true;
valid_group = its_group;
if (find_local(_client)) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto its_service = eventgroup_clients_.find(_service);
- if (its_service != eventgroup_clients_.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- auto found_eventgroup = its_instance->second.find(its_group);
- if (found_eventgroup != its_instance->second.end()) {
- auto its_client = found_eventgroup->second.find(_client);
- if (its_client != found_eventgroup->second.end()) {
- already_subscribed = true;
- }
- }
- }
- }
+ already_subscribed = its_event->has_subscriber(its_group, _client);
} else {
// Remotes always needs to be marked as subscribed here
already_subscribed = true;
@@ -445,36 +425,69 @@ void routing_manager_base::send_pending_notify_ones(service_t _service, instance
void routing_manager_base::unset_all_eventpayloads(service_t _service,
instance_t _instance) {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- const auto found_service = eventgroups_.find(_service);
- if (found_service != eventgroups_.end()) {
- const auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- for (const auto &eventgroupinfo : found_instance->second) {
- for (const auto &event : eventgroupinfo.second->get_events()) {
- event->unset_payload(true);
+ std::set<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (const auto &eventgroupinfo : found_instance->second) {
+ for (const auto &event : eventgroupinfo.second->get_events()) {
+ its_events.insert(event);
+ }
}
}
}
}
+ for (const auto &e : its_events) {
+ e->unset_payload(true);
+ }
}
void routing_manager_base::unset_all_eventpayloads(service_t _service,
instance_t _instance,
eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- const auto found_service = eventgroups_.find(_service);
- if (found_service != eventgroups_.end()) {
- const auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- const auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- for (const auto &event : found_eventgroup->second->get_events()) {
- event->unset_payload(true);
+ std::set<std::shared_ptr<event>> its_events;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ const auto found_eventgroup = found_instance->second.find(_eventgroup);
+ if (found_eventgroup != found_instance->second.end()) {
+ for (const auto &event : found_eventgroup->second->get_events()) {
+ its_events.insert(event);
+ }
}
}
}
}
+ for (const auto &e : its_events) {
+ e->unset_payload(true);
+ }
+}
+
+void routing_manager_base::notify_one_current_value(client_t _client,
+ service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup,
+ event_t _event) {
+ if (_event != ANY_EVENT) {
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event && its_event->is_field())
+ its_event->notify_one(_client, true);
+ } else {
+ auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
+ if (its_eventgroup) {
+ std::set<std::shared_ptr<event> > its_events = its_eventgroup->get_events();
+ for (auto e : its_events) {
+ if (e->is_field())
+ e->notify_one(_client, true); // TODO: use _flush to send all events together!
+ }
+ }
+ }
}
bool routing_manager_base::send(client_t its_client,
@@ -560,8 +573,14 @@ bool routing_manager_base::is_available(service_t _service, instance_t _instance
std::lock_guard<std::mutex> its_lock(local_services_mutex_);
auto its_service = local_services_.find(_service);
if (its_service != local_services_.end()) {
+ if (_instance == ANY_INSTANCE) {
+ return true;
+ }
auto its_instance = its_service->second.find(_instance);
if (its_instance != its_service->second.end()) {
+ if (_major == ANY_MAJOR) {
+ return true;
+ }
if (std::get<0>(its_instance->second) == _major) {
available = true;
}
@@ -658,7 +677,7 @@ void routing_manager_base::remove_local(client_t _client) {
host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription),
std::get<2>(its_subscription), _client, false);
routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription),
- std::get<1>(its_subscription), std::get<2>(its_subscription));
+ std::get<1>(its_subscription), std::get<2>(its_subscription), ANY_EVENT);
}
std::shared_ptr<endpoint> its_endpoint(find_local(_client));
@@ -675,8 +694,11 @@ void routing_manager_base::remove_local(client_t _client) {
std::set<std::pair<service_t, instance_t>> its_services;
for (auto& s : local_services_) {
for (auto& i : s.second) {
- if (std::get<2>(i.second) == _client)
+ if (std::get<2>(i.second) == _client) {
its_services.insert({ s.first, i.first });
+ host_->on_availability(s.first, i.first, false,
+ std::get<0>(i.second), std::get<1>(i.second));
+ }
}
}
@@ -777,23 +799,6 @@ void routing_manager_base::remove_eventgroup_info(service_t _service,
}
}
-std::set<client_t> routing_manager_base::find_local_clients(service_t _service,
- instance_t _instance, eventgroup_t _eventgroup) {
- std::set<client_t> its_clients;
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto found_service = eventgroup_clients_.find(_service);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- its_clients = found_eventgroup->second;
- }
- }
- }
- return (its_clients);
-}
-
bool routing_manager_base::send_local_notification(client_t _client,
const byte_t *_data, uint32_t _size, instance_t _instance,
bool _flush, bool _reliable) {
@@ -809,24 +814,21 @@ bool routing_manager_base::send_local_notification(client_t _client,
if (its_event && !its_event->is_shadow()) {
std::vector< byte_t > its_data;
- for (auto its_group : its_event->get_eventgroups()) {
+ for (auto its_client : its_event->get_subscribers()) {
// local
- auto its_local_clients = find_local_clients(its_service, _instance, its_group);
- for (auto its_local_client : its_local_clients) {
- if (its_local_client == VSOMEIP_ROUTING_CLIENT) {
- has_remote = true;
- continue;
- }
+ if (its_client == VSOMEIP_ROUTING_CLIENT) {
+ has_remote = true;
+ continue;
+ }
#ifdef USE_DLT
- else {
- has_local = true;
- }
+ else {
+ has_local = true;
+ }
#endif
- std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
- if (its_local_target) {
- send_local(its_local_target, _client, _data, _size,
- _instance, _flush, _reliable, VSOMEIP_SEND);
- }
+ std::shared_ptr<endpoint> its_local_target = find_local(its_client);
+ if (its_local_target) {
+ send_local(its_local_target, _client, _data, _size,
+ _instance, _flush, _reliable, VSOMEIP_SEND);
}
}
}
@@ -881,23 +883,56 @@ bool routing_manager_base::send_local(
bool routing_manager_base::insert_subscription(
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- client_t _client) {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- auto found_service = eventgroup_clients_.find(_service);
- if (found_service != eventgroup_clients_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- if (found_eventgroup->second.find(_client)
- != found_eventgroup->second.end())
- return false;
+ event_t _event, client_t _client) {
+ bool is_inserted(false);
+ if (_event != ANY_EVENT) { // subscribe to specific event
+ std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
+ if (its_event) {
+ is_inserted = its_event->add_subscriber(_eventgroup, _client);
+ } else {
+ VSOMEIP_WARNING << "routing_manager_base::insert_subscription("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " received subscription for unknown (unrequested / "
+ << "unoffered) event. Creating placeholder event holding "
+ << "subscription until event is requested/offered.";
+ is_inserted = create_placeholder_event_and_subscribe(_service,
+ _instance, _eventgroup, _event, _client);
+ }
+ } else { // subscribe to all events of the eventgroup
+ std::shared_ptr<eventgroupinfo> its_eventgroup
+ = find_eventgroup(_service, _instance, _eventgroup);
+ bool create_place_holder(false);
+ if (its_eventgroup) {
+ std::set<std::shared_ptr<event>> its_events = its_eventgroup->get_events();
+ if (!its_events.size()) {
+ create_place_holder = true;
+ } else {
+ for (auto e : its_events) {
+ is_inserted = e->add_subscriber(_eventgroup, _client) || is_inserted;
+ }
}
+ } else {
+ create_place_holder = true;
+ }
+ if (create_place_holder) {
+ VSOMEIP_WARNING << "routing_manager_base::insert_subscription("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " received subscription for unknown (unrequested / "
+ << "unoffered) eventgroup. Creating placeholder event holding "
+ << "subscription until event is requested/offered.";
+ is_inserted = create_placeholder_event_and_subscribe(_service,
+ _instance, _eventgroup, _event, _client);
}
}
-
- eventgroup_clients_[_service][_instance][_eventgroup].insert(_client);
- return true;
+ return is_inserted;
}
std::shared_ptr<deserializer> routing_manager_base::get_deserializer() {
@@ -934,33 +969,63 @@ void routing_manager_base::send_pending_subscriptions(service_t _service,
if (ps.service_ == _service &&
ps.instance_ == _instance && ps.major_ == _major) {
send_subscribe(client_, ps.service_, ps.instance_,
- ps.eventgroup_, ps.major_, ps.subscription_type_);
+ ps.eventgroup_, ps.major_, ps.event_, ps.subscription_type_);
}
}
}
void routing_manager_base::remove_pending_subscription(service_t _service,
- instance_t _instance) {
- auto it = pending_subscriptions_.begin();
- while (it != pending_subscriptions_.end()) {
- if (it->service_ == _service
- && it->instance_ == _instance) {
- break;
+ instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
+ if (_eventgroup == 0xFFFF) {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance) {
+ it = pending_subscriptions_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ } else if (_event == ANY_EVENT) {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance
+ && it->eventgroup_ == _eventgroup) {
+ it = pending_subscriptions_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ } else {
+ for (auto it = pending_subscriptions_.begin();
+ it != pending_subscriptions_.end();) {
+ if (it->service_ == _service
+ && it->instance_ == _instance
+ && it->eventgroup_ == _eventgroup
+ && it->event_ == _event) {
+ it = pending_subscriptions_.erase(it);
+ break;
+ } else {
+ it++;
+ }
}
- it++;
}
- if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it);
}
-std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::get_subscriptions(const client_t _client) {
+std::set<std::tuple<service_t, instance_t, eventgroup_t>>
+routing_manager_base::get_subscriptions(const client_t _client) {
std::set<std::tuple<service_t, instance_t, eventgroup_t>> result;
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- for (auto its_service : eventgroup_clients_) {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
+ for (auto its_service : events_) {
for (auto its_instance : its_service.second) {
- for (auto its_eventgroup : its_instance.second) {
- auto its_client = its_eventgroup.second.find(_client);
- if (its_client != its_eventgroup.second.end()) {
- result.insert(std::make_tuple(its_service.first, its_instance.first, its_eventgroup.first));
+ for (auto its_event : its_instance.second) {
+ auto its_eventgroups = its_event.second->get_eventgroups(_client);
+ for (auto e : its_eventgroups) {
+ result.insert(std::make_tuple(
+ its_service.first,
+ its_instance.first,
+ e));
}
}
}
@@ -968,5 +1033,4 @@ std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::
return result;
}
-
} // namespace vsomeip