diff options
Diffstat (limited to 'implementation/runtime/src/application_impl.cpp')
-rw-r--r-- | implementation/runtime/src/application_impl.cpp | 311 |
1 files changed, 181 insertions, 130 deletions
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 777c99a..d4d4ed7 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -242,17 +242,16 @@ void application_impl::start() { stopped_called_ = false; VSOMEIP_INFO << "Starting vsomeip application \"" << name_ << "\" using " << std::dec << io_thread_count << " threads"; + + start_caller_id_ = std::this_thread::get_id(); { - std::lock_guard<std::mutex> its_lock(handlers_mutex_); + std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); is_dispatching_ = true; + auto its_main_dispatcher = std::make_shared<std::thread>( + std::bind(&application_impl::main_dispatch, shared_from_this())); + dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher; } - start_caller_id_ = std::this_thread::get_id(); - - auto its_main_dispatcher = std::make_shared<std::thread>( - std::bind(&application_impl::main_dispatch, this)); - dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher; - if (stop_thread_.joinable()) { stop_thread_.join(); } @@ -264,7 +263,13 @@ void application_impl::start() { for (size_t i = 0; i < io_thread_count - 1; i++) { std::shared_ptr<std::thread> its_thread = std::make_shared<std::thread>([this, i] { + try { io_.run(); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "application_impl::start() " + "catched exception:" << e.what(); + throw; + } }); io_threads_.insert(its_thread); } @@ -273,8 +278,12 @@ void application_impl::start() { app_counter_mutex__.lock(); app_counter__++; app_counter_mutex__.unlock(); - - io_.run(); + try { + io_.run(); + } catch (const std::exception &e) { + VSOMEIP_ERROR << "application_impl::start() catched exception:" << e.what(); + throw; + } if (stop_thread_.joinable()) { stop_thread_.join(); @@ -384,22 +393,30 @@ void application_impl::subscribe(service_t _service, instance_t _instance, bool send_back_cached_group(false); check_send_back_cached_event(_service, _instance, _event, _eventgroup, &send_back_cached, &send_back_cached_group); + if (send_back_cached) { send_back_cached_event(_service, _instance, _event); } else if(send_back_cached_group) { - send_back_cached_eventgroup(_service, _instance, _event); + send_back_cached_eventgroup(_service, _instance, _eventgroup); } routing_->subscribe(client_, _service, _instance, _eventgroup, _major, - _subscription_type); + _event, _subscription_type); } } void application_impl::unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - remove_subscription(_service, _instance, _eventgroup); + remove_subscription(_service, _instance, _eventgroup, ANY_EVENT); + if (routing_) + routing_->unsubscribe(client_, _service, _instance, _eventgroup, ANY_EVENT); +} + +void application_impl::unsubscribe(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, event_t _event) { + remove_subscription(_service, _instance, _eventgroup, _event); if (routing_) - routing_->unsubscribe(client_, _service, _instance, _eventgroup); + routing_->unsubscribe(client_, _service, _instance, _eventgroup, _event); } bool application_impl::is_available( @@ -1047,13 +1064,15 @@ void application_impl::on_availability(service_t _service, instance_t _instance, } } if (!_is_available) { - std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_); - auto found_service = event_subscriptions_.find(_service); - if (found_service != event_subscriptions_.end()) { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + auto found_service = subscriptions_.find(_service); + if (found_service != subscriptions_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - for (auto &e : found_instance->second) { - e.second = false; + for (auto &event : found_instance->second) { + for (auto &eventgroup : event.second) { + eventgroup.second = false; + } } } } @@ -1065,34 +1084,13 @@ void application_impl::on_availability(service_t _service, instance_t _instance, } void application_impl::on_message(const std::shared_ptr<message> &&_message) { - service_t its_service = _message->get_service(); - instance_t its_instance = _message->get_instance(); - method_t its_method = _message->get_method(); + const service_t its_service = _message->get_service(); + const instance_t its_instance = _message->get_instance(); + const method_t its_method = _message->get_method(); if (_message->get_message_type() == message_type_e::MT_NOTIFICATION) { - std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_); - auto found_service = event_subscriptions_.find(its_service); - if(found_service != event_subscriptions_.end()) { - auto found_instance = found_service->second.find(its_instance); - if (found_instance != found_service->second.end()) { - auto its_event = found_instance->second.find(its_method); - if (its_event != found_instance->second.end()) { - its_event->second = true; - } else { - // received a event which nobody yet subscribed to - event_subscriptions_[its_service][its_instance][its_method] = true; - // check if someone subscribed to ANY_EVENT - auto its_any_event = found_instance->second.find(ANY_EVENT); - if(its_any_event == found_instance->second.end()) { - return; - } - } - } else { - // received a event from a service instance which nobody yet subscribed to - return; - } - } else { - // received a event from a service which nobody yet subscribed to + if (!check_for_active_subscription(its_service, its_instance, + static_cast<event_t>(its_method))) { return; } } @@ -1152,17 +1150,19 @@ void application_impl::on_message(const std::shared_ptr<message> &&_message) { } if (its_handlers.size()) { - std::lock_guard<std::mutex> its_lock(handlers_mutex_); - for (const auto &its_handler : its_handlers) { - auto handler = its_handler.handler_; - std::shared_ptr<sync_handler> its_sync_handler = - std::make_shared<sync_handler>([handler, _message]() { - handler(std::move(_message)); - }); - handlers_.push_back(its_sync_handler); + { + std::lock_guard<std::mutex> its_lock(handlers_mutex_); + for (const auto &its_handler : its_handlers) { + auto handler = its_handler.handler_; + std::shared_ptr<sync_handler> its_sync_handler = + std::make_shared<sync_handler>([handler, _message]() { + handler(std::move(_message)); + }); + handlers_.push_back(its_sync_handler); + } } + dispatcher_condition_.notify_one(); } - dispatcher_condition_.notify_one(); } } @@ -1196,19 +1196,20 @@ void application_impl::main_dispatch() { remove_elapsed_dispatchers(); +#ifdef _WIN32 if(!is_dispatching_) { its_lock.unlock(); -#ifdef _WIN32 return; -#endif } +#endif } } } + its_lock.unlock(); } void application_impl::dispatch() { - std::thread::id its_id = std::this_thread::get_id(); + const std::thread::id its_id = std::this_thread::get_id(); while (is_active_dispatcher(its_id)) { std::unique_lock<std::mutex> its_lock(handlers_mutex_); if (is_dispatching_ && handlers_.empty()) { @@ -1231,35 +1232,44 @@ void application_impl::dispatch() { } } } - - std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); - elapsed_dispatchers_.insert(std::this_thread::get_id()); + { + std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); + elapsed_dispatchers_.insert(its_id); + } } void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { - std::thread::id its_id = std::this_thread::get_id(); + const std::thread::id its_id = std::this_thread::get_id(); boost::asio::steady_timer its_dispatcher_timer(io_); its_dispatcher_timer.expires_from_now(std::chrono::milliseconds(max_dispatch_time_)); its_dispatcher_timer.async_wait([this, its_id](const boost::system::error_code &_error) { if (!_error) { VSOMEIP_INFO << "Blocking call detected. Client=" << std::hex << get_client(); - std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); - blocked_dispatchers_.insert(its_id); - - if (has_active_dispatcher()) { + bool active_dispatcher_available(false); + { + std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); + blocked_dispatchers_.insert(its_id); + active_dispatcher_available = has_active_dispatcher(); + } + if (active_dispatcher_available) { + std::lock_guard<std::mutex> its_lock(handlers_mutex_); dispatcher_condition_.notify_all(); - } else { + } else if (is_dispatching_) { // If possible, create a new dispatcher thread to unblock. - // If this is _not_ possible, dispatching is blocked until at least - // one of the active handler calls returns. + // If this is _not_ possible, dispatching is blocked until + // at least one of the active handler calls returns. + std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); if (dispatchers_.size() < max_dispatchers_) { auto its_dispatcher = std::make_shared<std::thread>( - std::bind(&application_impl::dispatch, this)); + std::bind(&application_impl::dispatch, shared_from_this())); dispatchers_[its_dispatcher->get_id()] = its_dispatcher; } else { VSOMEIP_ERROR << "Maximum number of dispatchers exceeded."; } + } else { + VSOMEIP_INFO << "Won't start new dispatcher thread as Client=" + << std::hex << get_client() << " is shutting down"; } } }); @@ -1273,7 +1283,7 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { } bool application_impl::has_active_dispatcher() { - for (auto d : dispatchers_) { + for (const auto &d : dispatchers_) { if (blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() && elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) { return true; @@ -1282,9 +1292,9 @@ bool application_impl::has_active_dispatcher() { return false; } -bool application_impl::is_active_dispatcher(std::thread::id &_id) { +bool application_impl::is_active_dispatcher(const std::thread::id &_id) { std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); - for (auto d : dispatchers_) { + for (const auto &d : dispatchers_) { if (d.first != _id && blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() && elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) { @@ -1328,7 +1338,7 @@ void application_impl::clear_all_handler() { members_.clear(); } { - std::unique_lock<std::mutex> its_lock(handlers_mutex_); + std::lock_guard<std::mutex> its_lock(handlers_mutex_); handlers_.clear(); } } @@ -1344,12 +1354,17 @@ void application_impl::shutdown() { stop_cv_.wait(its_lock); } } + std::map<std::thread::id, std::shared_ptr<std::thread>> its_dispatchers; { - std::lock_guard<std::mutex> its_lock(handlers_mutex_); + std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); + its_dispatchers = dispatchers_; + } + { + std::lock_guard<std::mutex> its_handler_lock(handlers_mutex_); is_dispatching_ = false; + dispatcher_condition_.notify_all(); } - dispatcher_condition_.notify_all(); - for (auto its_dispatcher : dispatchers_) { + for (auto its_dispatcher : its_dispatchers) { if (its_dispatcher.second->get_id() != stop_caller_id_) { if (its_dispatcher.second->joinable()) { its_dispatcher.second->join(); @@ -1392,7 +1407,7 @@ bool application_impl::is_routing() const { void application_impl::send_back_cached_event(service_t _service, instance_t _instance, event_t _event) { - std::shared_ptr<event> its_event = routing_->get_event(_service, + std::shared_ptr<event> its_event = routing_->find_event(_service, _instance, _event); if (its_event && its_event->is_field() && its_event->is_set()) { std::shared_ptr<message> its_message = runtime_->create_notification(); @@ -1402,6 +1417,11 @@ void application_impl::send_back_cached_event(service_t _service, its_message->set_payload(its_event->get_payload()); its_message->set_initial(true); on_message(std::move(its_message)); + VSOMEIP_INFO << "Sending back cached event (" + << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _event << "]"; } } @@ -1413,12 +1433,20 @@ void application_impl::send_back_cached_eventgroup(service_t _service, for(const auto &its_event : its_events) { if (its_event && its_event->is_field() && its_event->is_set()) { std::shared_ptr<message> its_message = runtime_->create_notification(); + const event_t its_event_id(its_event->get_event()); its_message->set_service(_service); - its_message->set_method(its_event->get_event()); + its_message->set_method(its_event_id); its_message->set_instance(_instance); its_message->set_payload(its_event->get_payload()); its_message->set_initial(true); on_message(std::move(its_message)); + VSOMEIP_INFO << "Sending back cached event (" + << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event_id + << "] from eventgroup " + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup; } } } @@ -1432,91 +1460,114 @@ void application_impl::check_send_back_cached_event( service_t _service, instance_t _instance, event_t _event, eventgroup_t _eventgroup, bool *_send_back_cached_event, bool *_send_back_cached_eventgroup) { - std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_); - - bool already_subscibed(false); - auto found_service = eventgroup_subscriptions_.find(_service); - if(found_service != eventgroup_subscriptions_.end()) { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + *_send_back_cached_event = false; + *_send_back_cached_eventgroup = false; + bool already_subscribed(false); + auto found_service = subscriptions_.find(_service); + if(found_service != subscriptions_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - already_subscibed = true; - } - } - } - - if (already_subscibed) { - auto found_service = event_subscriptions_.find(_service); - if(found_service != event_subscriptions_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - auto found_event = found_instance->second.find(_event); - if (found_event != found_instance->second.end()) { - if(found_event->second) { - // initial values for this event have already been sent, - // send back cached value + auto found_event = found_instance->second.find(_event); + if (found_event != found_instance->second.end()) { + auto found_eventgroup = found_event->second.find(_eventgroup); + if (found_eventgroup != found_event->second.end()) { + already_subscribed = true; + if (found_eventgroup->second) { + // initial values for this event have already been + // received, send back cached value if(_event == ANY_EVENT) { *_send_back_cached_eventgroup = true; } else { *_send_back_cached_event = true; } } - return; } } } } - event_subscriptions_[_service][_instance][_event] = false; - eventgroup_subscriptions_[_service][_instance].insert(_eventgroup); + + if (!already_subscribed) { + subscriptions_[_service][_instance][_event][_eventgroup] = false; + } } void application_impl::remove_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup) { - std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_); + eventgroup_t _eventgroup, + event_t _event) { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); - auto found_service1 = eventgroup_subscriptions_.find(_service); - if(found_service1 != eventgroup_subscriptions_.end()) { - auto found_instance = found_service1->second.find(_instance); - if (found_instance != found_service1->second.end()) { - auto found_eventgroup = found_instance->second.find(_eventgroup); - if (found_eventgroup != found_instance->second.end()) { - found_instance->second.erase(_eventgroup); - if (!found_instance->second.size()) { - found_service1->second.erase(_instance); - if (!found_service1->second.size()) { - eventgroup_subscriptions_.erase(_service); + auto found_service = subscriptions_.find(_service); + if(found_service != subscriptions_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_event = found_instance->second.find(_event); + if (found_event != found_instance->second.end()) { + if (found_event->second.erase(_eventgroup)) { + if (!found_event->second.size()) { + found_instance->second.erase(_event); + if (!found_instance->second.size()) { + found_service->second.erase(_instance); + if (!found_service->second.size()) { + subscriptions_.erase(_service); + } + } } } } } } +} - auto found_service = event_subscriptions_.find(_service); - if (found_service != event_subscriptions_.end()) { +bool application_impl::check_for_active_subscription(service_t _service, + instance_t _instance, + event_t _event) { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + auto found_service = subscriptions_.find(_service); + if(found_service != subscriptions_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { - if (routing_) { - std::set<std::shared_ptr<event>> its_events = - routing_->find_events(_service, _instance, - _eventgroup); - for (const auto &e : its_events) { - const event_t its_event(e->get_event()); - auto found_event = found_instance->second.find(its_event); - if (found_event != found_instance->second.end()) { - found_instance->second.erase(its_event); + auto found_event = found_instance->second.find(_event); + if (found_event != found_instance->second.end()) { + if (found_event->second.size()) { + for (auto &eventgroup : found_event->second) { + eventgroup.second = true; } + return true; } - if (!found_instance->second.size()) { - found_service->second.erase(_instance); - if (!found_service->second.size()) { - event_subscriptions_.erase(_service); + } else { + // Received a event which nobody yet explicitly subscribed to. + // Check if someone subscribed to ANY_EVENT for one of + // the received event's eventgroups + auto found_any_event = found_instance->second.find(ANY_EVENT); + if (found_any_event != found_instance->second.end()) { + if (routing_) { + std::shared_ptr<event> its_event = routing_->find_event( + _service, _instance, _event); + if (its_event) { + for (const auto eg : its_event->get_eventgroups()) { + auto found_eventgroup = found_any_event->second.find(eg); + if (found_eventgroup != found_any_event->second.end()) { + // set the flag for initial event received to true + // even if we might not already received all of the + // eventgroups events. + found_eventgroup->second = true; + return true; + } + } + } } } } } } + // Return false if an event was received from: + // - a service which nobody yet subscribed to + // - a service instance which nobody yet subscribed to + // - a service instance and nobody yet subscribed to one of the event's + // eventgroups + return false; } } // namespace vsomeip |