diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_base.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_base.cpp | 372 |
1 files changed, 218 insertions, 154 deletions
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index ac5da9c..8f10057 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -95,16 +95,7 @@ void routing_manager_base::stop_offer_service(client_t _client, service_t _servi } for (auto &e : events) { e.second->unset_payload(); - } - { - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - auto its_service = eventgroup_clients_.find(_service); - if (its_service != eventgroup_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - its_instance->second.clear(); - } - } + e.second->clear_subscribers(); } } @@ -142,6 +133,7 @@ void routing_manager_base::register_event(client_t _client, service_t _service, epsilon_change_func_t _epsilon_change_func, bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { std::shared_ptr<event> its_event = find_event(_service, _instance, _event); + bool transfer_subscriptions_from_any_event(false); if (its_event) { if(!its_event->is_cache_placeholder()) { if (its_event->is_field() == _is_field) { @@ -151,9 +143,13 @@ void routing_manager_base::register_event(client_t _client, service_t _service, if (_is_shadow && _is_provided) { its_event->set_shadow(_is_shadow); } + if (_client == host_->get_client() && _is_provided) { + its_event->set_shadow(false); + } for (auto eg : _eventgroups) { its_event->add_eventgroup(eg); } + transfer_subscriptions_from_any_event = true; } else { VSOMEIP_ERROR << "Event registration update failed. " "Specified arguments do not match existing registration."; @@ -168,6 +164,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service, if (_is_shadow && _is_provided) { its_event->set_shadow(_is_shadow); } + if (_client == host_->get_client() && _is_provided) { + its_event->set_shadow(false); + } its_event->set_field(_is_field); its_event->set_provided(_is_provided); its_event->set_cache_placeholder(false); @@ -180,7 +179,9 @@ void routing_manager_base::register_event(client_t _client, service_t _service, its_eventgroups.insert(_event); its_event->set_eventgroups(its_eventgroups); } else { - its_event->set_eventgroups(_eventgroups); + for (auto eg : _eventgroups) { + its_event->add_eventgroup(eg); + } } its_event->set_epsilon_change_function(_epsilon_change_func); @@ -211,8 +212,33 @@ void routing_manager_base::register_event(client_t _client, service_t _service, its_event->set_epsilon_change_function(_epsilon_change_func); its_event->set_change_resets_cycle(_change_resets_cycle); its_event->set_update_cycle(_cycle); + + if (_is_provided) { + transfer_subscriptions_from_any_event = true; + } } + if (transfer_subscriptions_from_any_event) { + // check if someone subscribed to ANY_EVENT and the subscription + // was stored in the cache placeholder. Move the subscribers + // into new event + std::shared_ptr<event> its_any_event = + find_event(_service, _instance, ANY_EVENT); + if (its_any_event) { + std::set<eventgroup_t> any_events_eventgroups = + its_any_event->get_eventgroups(); + for (eventgroup_t eventgroup : _eventgroups) { + auto found_eg = any_events_eventgroups.find(eventgroup); + if (found_eg != any_events_eventgroups.end()) { + std::set<client_t> its_any_event_subscribers = + its_any_event->get_subscribers(eventgroup); + for (const client_t subscriber : its_any_event_subscribers) { + its_event->add_subscriber(eventgroup, subscriber); + } + } + } + } + } if(!_is_cache_placeholder) { its_event->add_ref(_client, _is_provided); } @@ -265,24 +291,6 @@ void routing_manager_base::unregister_event(client_t _client, service_t _service } } -std::shared_ptr<event> routing_manager_base::get_event( - service_t _service, instance_t _instance, event_t _event) const { - std::shared_ptr<event> its_event; - std::lock_guard<std::mutex> its_lock(events_mutex_); - auto found_service = events_.find(_service); - if (found_service != events_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_event = found_instance->second.find(_event); - if (found_event != found_instance->second.end()) { - return found_event->second; - } - } - } - return its_event; -} - - std::set<std::shared_ptr<event>> routing_manager_base::find_events( service_t _service, instance_t _instance, eventgroup_t _eventgroup) const { @@ -303,45 +311,30 @@ std::set<std::shared_ptr<event>> routing_manager_base::find_events( void routing_manager_base::subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, + major_version_t _major, event_t _event, subscription_type_e _subscription_type) { (void) _major; (void) _subscription_type; - bool inserted = insert_subscription(_service, _instance, _eventgroup, _client); + bool inserted = insert_subscription(_service, _instance, _eventgroup, _event, _client); if (inserted) { - auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); - if (its_eventgroup) { - std::set<std::shared_ptr<event> > its_events - = its_eventgroup->get_events(); - for (auto e : its_events) { - if (e->is_field()) - e->notify_one(_client, true); // TODO: use _flush to send all events together! - } - } + notify_one_current_value(_client, _service, _instance, _eventgroup, _event); } } void routing_manager_base::unsubscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) { - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - auto its_service = eventgroup_clients_.find(_service); - if (its_service != eventgroup_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto its_group = its_instance->second.find(_eventgroup); - if (its_group != its_instance->second.end()) { - its_group->second.erase(_client); - if (!its_group->second.size()) { - its_instance->second.erase(_eventgroup); - if (!its_instance->second.size()) { - its_service->second.erase(_instance); - if (!its_service->second.size()) { - eventgroup_clients_.erase(_service); - } - } - } + instance_t _instance, eventgroup_t _eventgroup, event_t _event) { + if (_event != ANY_EVENT) { + auto its_event = find_event(_service, _instance, _event); + if (its_event) { + its_event->remove_subscriber(_eventgroup, _client); + } + } else { + auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); + if (its_eventgroup) { + for (auto e : its_eventgroup->get_events()) { + e->remove_subscriber(_eventgroup, _client); } } } @@ -378,20 +371,7 @@ void routing_manager_base::notify_one(service_t _service, instance_t _instance, found_eventgroup = true; valid_group = its_group; if (find_local(_client)) { - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - auto its_service = eventgroup_clients_.find(_service); - if (its_service != eventgroup_clients_.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - auto found_eventgroup = its_instance->second.find(its_group); - if (found_eventgroup != its_instance->second.end()) { - auto its_client = found_eventgroup->second.find(_client); - if (its_client != found_eventgroup->second.end()) { - already_subscribed = true; - } - } - } - } + already_subscribed = its_event->has_subscriber(its_group, _client); } else { // Remotes always needs to be marked as subscribed here already_subscribed = true; @@ -445,36 +425,69 @@ void routing_manager_base::send_pending_notify_ones(service_t _service, instance void routing_manager_base::unset_all_eventpayloads(service_t _service, instance_t _instance) { - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - const auto found_service = eventgroups_.find(_service); - if (found_service != eventgroups_.end()) { - const auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - for (const auto &eventgroupinfo : found_instance->second) { - for (const auto &event : eventgroupinfo.second->get_events()) { - event->unset_payload(true); + std::set<std::shared_ptr<event>> its_events; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + const auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + const auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (const auto &eventgroupinfo : found_instance->second) { + for (const auto &event : eventgroupinfo.second->get_events()) { + its_events.insert(event); + } } } } } + for (const auto &e : its_events) { + e->unset_payload(true); + } } void routing_manager_base::unset_all_eventpayloads(service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); - const auto found_service = eventgroups_.find(_service); - if (found_service != eventgroups_.end()) { - const auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - const auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - for (const auto &event : found_eventgroup->second->get_events()) { - event->unset_payload(true); + std::set<std::shared_ptr<event>> its_events; + { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + const auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + const auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + const auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + for (const auto &event : found_eventgroup->second->get_events()) { + its_events.insert(event); + } } } } } + for (const auto &e : its_events) { + e->unset_payload(true); + } +} + +void routing_manager_base::notify_one_current_value(client_t _client, + service_t _service, + instance_t _instance, + eventgroup_t _eventgroup, + event_t _event) { + if (_event != ANY_EVENT) { + std::shared_ptr<event> its_event = find_event(_service, _instance, _event); + if (its_event && its_event->is_field()) + its_event->notify_one(_client, true); + } else { + auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup); + if (its_eventgroup) { + std::set<std::shared_ptr<event> > its_events = its_eventgroup->get_events(); + for (auto e : its_events) { + if (e->is_field()) + e->notify_one(_client, true); // TODO: use _flush to send all events together! + } + } + } } bool routing_manager_base::send(client_t its_client, @@ -560,8 +573,14 @@ bool routing_manager_base::is_available(service_t _service, instance_t _instance std::lock_guard<std::mutex> its_lock(local_services_mutex_); auto its_service = local_services_.find(_service); if (its_service != local_services_.end()) { + if (_instance == ANY_INSTANCE) { + return true; + } auto its_instance = its_service->second.find(_instance); if (its_instance != its_service->second.end()) { + if (_major == ANY_MAJOR) { + return true; + } if (std::get<0>(its_instance->second) == _major) { available = true; } @@ -658,7 +677,7 @@ void routing_manager_base::remove_local(client_t _client) { host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription), std::get<2>(its_subscription), _client, false); routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription), - std::get<1>(its_subscription), std::get<2>(its_subscription)); + std::get<1>(its_subscription), std::get<2>(its_subscription), ANY_EVENT); } std::shared_ptr<endpoint> its_endpoint(find_local(_client)); @@ -675,8 +694,11 @@ void routing_manager_base::remove_local(client_t _client) { std::set<std::pair<service_t, instance_t>> its_services; for (auto& s : local_services_) { for (auto& i : s.second) { - if (std::get<2>(i.second) == _client) + if (std::get<2>(i.second) == _client) { its_services.insert({ s.first, i.first }); + host_->on_availability(s.first, i.first, false, + std::get<0>(i.second), std::get<1>(i.second)); + } } } @@ -777,23 +799,6 @@ void routing_manager_base::remove_eventgroup_info(service_t _service, } } -std::set<client_t> routing_manager_base::find_local_clients(service_t _service, - instance_t _instance, eventgroup_t _eventgroup) { - std::set<client_t> its_clients; - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - auto found_service = eventgroup_clients_.find(_service); - if (found_service != eventgroup_clients_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - its_clients = found_eventgroup->second; - } - } - } - return (its_clients); -} - bool routing_manager_base::send_local_notification(client_t _client, const byte_t *_data, uint32_t _size, instance_t _instance, bool _flush, bool _reliable) { @@ -809,24 +814,21 @@ bool routing_manager_base::send_local_notification(client_t _client, if (its_event && !its_event->is_shadow()) { std::vector< byte_t > its_data; - for (auto its_group : its_event->get_eventgroups()) { + for (auto its_client : its_event->get_subscribers()) { // local - auto its_local_clients = find_local_clients(its_service, _instance, its_group); - for (auto its_local_client : its_local_clients) { - if (its_local_client == VSOMEIP_ROUTING_CLIENT) { - has_remote = true; - continue; - } + if (its_client == VSOMEIP_ROUTING_CLIENT) { + has_remote = true; + continue; + } #ifdef USE_DLT - else { - has_local = true; - } + else { + has_local = true; + } #endif - std::shared_ptr<endpoint> its_local_target = find_local(its_local_client); - if (its_local_target) { - send_local(its_local_target, _client, _data, _size, - _instance, _flush, _reliable, VSOMEIP_SEND); - } + std::shared_ptr<endpoint> its_local_target = find_local(its_client); + if (its_local_target) { + send_local(its_local_target, _client, _data, _size, + _instance, _flush, _reliable, VSOMEIP_SEND); } } } @@ -881,23 +883,56 @@ bool routing_manager_base::send_local( bool routing_manager_base::insert_subscription( service_t _service, instance_t _instance, eventgroup_t _eventgroup, - client_t _client) { - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - auto found_service = eventgroup_clients_.find(_service); - if (found_service != eventgroup_clients_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - if (found_eventgroup->second.find(_client) - != found_eventgroup->second.end()) - return false; + event_t _event, client_t _client) { + bool is_inserted(false); + if (_event != ANY_EVENT) { // subscribe to specific event + std::shared_ptr<event> its_event = find_event(_service, _instance, _event); + if (its_event) { + is_inserted = its_event->add_subscriber(_eventgroup, _client); + } else { + VSOMEIP_WARNING << "routing_manager_base::insert_subscription(" + << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << _event << "]" + << " received subscription for unknown (unrequested / " + << "unoffered) event. Creating placeholder event holding " + << "subscription until event is requested/offered."; + is_inserted = create_placeholder_event_and_subscribe(_service, + _instance, _eventgroup, _event, _client); + } + } else { // subscribe to all events of the eventgroup + std::shared_ptr<eventgroupinfo> its_eventgroup + = find_eventgroup(_service, _instance, _eventgroup); + bool create_place_holder(false); + if (its_eventgroup) { + std::set<std::shared_ptr<event>> its_events = its_eventgroup->get_events(); + if (!its_events.size()) { + create_place_holder = true; + } else { + for (auto e : its_events) { + is_inserted = e->add_subscriber(_eventgroup, _client) || is_inserted; + } } + } else { + create_place_holder = true; + } + if (create_place_holder) { + VSOMEIP_WARNING << "routing_manager_base::insert_subscription(" + << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << _event << "]" + << " received subscription for unknown (unrequested / " + << "unoffered) eventgroup. Creating placeholder event holding " + << "subscription until event is requested/offered."; + is_inserted = create_placeholder_event_and_subscribe(_service, + _instance, _eventgroup, _event, _client); } } - - eventgroup_clients_[_service][_instance][_eventgroup].insert(_client); - return true; + return is_inserted; } std::shared_ptr<deserializer> routing_manager_base::get_deserializer() { @@ -934,33 +969,63 @@ void routing_manager_base::send_pending_subscriptions(service_t _service, if (ps.service_ == _service && ps.instance_ == _instance && ps.major_ == _major) { send_subscribe(client_, ps.service_, ps.instance_, - ps.eventgroup_, ps.major_, ps.subscription_type_); + ps.eventgroup_, ps.major_, ps.event_, ps.subscription_type_); } } } void routing_manager_base::remove_pending_subscription(service_t _service, - instance_t _instance) { - auto it = pending_subscriptions_.begin(); - while (it != pending_subscriptions_.end()) { - if (it->service_ == _service - && it->instance_ == _instance) { - break; + instance_t _instance, eventgroup_t _eventgroup, event_t _event) { + if (_eventgroup == 0xFFFF) { + for (auto it = pending_subscriptions_.begin(); + it != pending_subscriptions_.end();) { + if (it->service_ == _service + && it->instance_ == _instance) { + it = pending_subscriptions_.erase(it); + } else { + it++; + } + } + } else if (_event == ANY_EVENT) { + for (auto it = pending_subscriptions_.begin(); + it != pending_subscriptions_.end();) { + if (it->service_ == _service + && it->instance_ == _instance + && it->eventgroup_ == _eventgroup) { + it = pending_subscriptions_.erase(it); + } else { + it++; + } + } + } else { + for (auto it = pending_subscriptions_.begin(); + it != pending_subscriptions_.end();) { + if (it->service_ == _service + && it->instance_ == _instance + && it->eventgroup_ == _eventgroup + && it->event_ == _event) { + it = pending_subscriptions_.erase(it); + break; + } else { + it++; + } } - it++; } - if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it); } -std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::get_subscriptions(const client_t _client) { +std::set<std::tuple<service_t, instance_t, eventgroup_t>> +routing_manager_base::get_subscriptions(const client_t _client) { std::set<std::tuple<service_t, instance_t, eventgroup_t>> result; - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - for (auto its_service : eventgroup_clients_) { + std::lock_guard<std::mutex> its_lock(events_mutex_); + for (auto its_service : events_) { for (auto its_instance : its_service.second) { - for (auto its_eventgroup : its_instance.second) { - auto its_client = its_eventgroup.second.find(_client); - if (its_client != its_eventgroup.second.end()) { - result.insert(std::make_tuple(its_service.first, its_instance.first, its_eventgroup.first)); + for (auto its_event : its_instance.second) { + auto its_eventgroups = its_event.second->get_eventgroups(_client); + for (auto e : its_eventgroups) { + result.insert(std::make_tuple( + its_service.first, + its_instance.first, + e)); } } } @@ -968,5 +1033,4 @@ std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base:: return result; } - } // namespace vsomeip |