summaryrefslogtreecommitdiff
path: root/implementation/runtime/src/application_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/runtime/src/application_impl.cpp')
-rw-r--r--implementation/runtime/src/application_impl.cpp311
1 files changed, 181 insertions, 130 deletions
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index 777c99a..d4d4ed7 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -242,17 +242,16 @@ void application_impl::start() {
stopped_called_ = false;
VSOMEIP_INFO << "Starting vsomeip application \"" << name_ << "\" using "
<< std::dec << io_thread_count << " threads";
+
+ start_caller_id_ = std::this_thread::get_id();
{
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
is_dispatching_ = true;
+ auto its_main_dispatcher = std::make_shared<std::thread>(
+ std::bind(&application_impl::main_dispatch, shared_from_this()));
+ dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
}
- start_caller_id_ = std::this_thread::get_id();
-
- auto its_main_dispatcher = std::make_shared<std::thread>(
- std::bind(&application_impl::main_dispatch, this));
- dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
-
if (stop_thread_.joinable()) {
stop_thread_.join();
}
@@ -264,7 +263,13 @@ void application_impl::start() {
for (size_t i = 0; i < io_thread_count - 1; i++) {
std::shared_ptr<std::thread> its_thread
= std::make_shared<std::thread>([this, i] {
+ try {
io_.run();
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "application_impl::start() "
+ "catched exception:" << e.what();
+ throw;
+ }
});
io_threads_.insert(its_thread);
}
@@ -273,8 +278,12 @@ void application_impl::start() {
app_counter_mutex__.lock();
app_counter__++;
app_counter_mutex__.unlock();
-
- io_.run();
+ try {
+ io_.run();
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "application_impl::start() catched exception:" << e.what();
+ throw;
+ }
if (stop_thread_.joinable()) {
stop_thread_.join();
@@ -384,22 +393,30 @@ void application_impl::subscribe(service_t _service, instance_t _instance,
bool send_back_cached_group(false);
check_send_back_cached_event(_service, _instance, _event, _eventgroup,
&send_back_cached, &send_back_cached_group);
+
if (send_back_cached) {
send_back_cached_event(_service, _instance, _event);
} else if(send_back_cached_group) {
- send_back_cached_eventgroup(_service, _instance, _event);
+ send_back_cached_eventgroup(_service, _instance, _eventgroup);
}
routing_->subscribe(client_, _service, _instance, _eventgroup, _major,
- _subscription_type);
+ _event, _subscription_type);
}
}
void application_impl::unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup) {
- remove_subscription(_service, _instance, _eventgroup);
+ remove_subscription(_service, _instance, _eventgroup, ANY_EVENT);
+ if (routing_)
+ routing_->unsubscribe(client_, _service, _instance, _eventgroup, ANY_EVENT);
+}
+
+void application_impl::unsubscribe(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event) {
+ remove_subscription(_service, _instance, _eventgroup, _event);
if (routing_)
- routing_->unsubscribe(client_, _service, _instance, _eventgroup);
+ routing_->unsubscribe(client_, _service, _instance, _eventgroup, _event);
}
bool application_impl::is_available(
@@ -1047,13 +1064,15 @@ void application_impl::on_availability(service_t _service, instance_t _instance,
}
}
if (!_is_available) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
- auto found_service = event_subscriptions_.find(_service);
- if (found_service != event_subscriptions_.end()) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ auto found_service = subscriptions_.find(_service);
+ if (found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- for (auto &e : found_instance->second) {
- e.second = false;
+ for (auto &event : found_instance->second) {
+ for (auto &eventgroup : event.second) {
+ eventgroup.second = false;
+ }
}
}
}
@@ -1065,34 +1084,13 @@ void application_impl::on_availability(service_t _service, instance_t _instance,
}
void application_impl::on_message(const std::shared_ptr<message> &&_message) {
- service_t its_service = _message->get_service();
- instance_t its_instance = _message->get_instance();
- method_t its_method = _message->get_method();
+ const service_t its_service = _message->get_service();
+ const instance_t its_instance = _message->get_instance();
+ const method_t its_method = _message->get_method();
if (_message->get_message_type() == message_type_e::MT_NOTIFICATION) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
- auto found_service = event_subscriptions_.find(its_service);
- if(found_service != event_subscriptions_.end()) {
- auto found_instance = found_service->second.find(its_instance);
- if (found_instance != found_service->second.end()) {
- auto its_event = found_instance->second.find(its_method);
- if (its_event != found_instance->second.end()) {
- its_event->second = true;
- } else {
- // received a event which nobody yet subscribed to
- event_subscriptions_[its_service][its_instance][its_method] = true;
- // check if someone subscribed to ANY_EVENT
- auto its_any_event = found_instance->second.find(ANY_EVENT);
- if(its_any_event == found_instance->second.end()) {
- return;
- }
- }
- } else {
- // received a event from a service instance which nobody yet subscribed to
- return;
- }
- } else {
- // received a event from a service which nobody yet subscribed to
+ if (!check_for_active_subscription(its_service, its_instance,
+ static_cast<event_t>(its_method))) {
return;
}
}
@@ -1152,17 +1150,19 @@ void application_impl::on_message(const std::shared_ptr<message> &&_message) {
}
if (its_handlers.size()) {
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
- for (const auto &its_handler : its_handlers) {
- auto handler = its_handler.handler_;
- std::shared_ptr<sync_handler> its_sync_handler =
- std::make_shared<sync_handler>([handler, _message]() {
- handler(std::move(_message));
- });
- handlers_.push_back(its_sync_handler);
+ {
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ for (const auto &its_handler : its_handlers) {
+ auto handler = its_handler.handler_;
+ std::shared_ptr<sync_handler> its_sync_handler =
+ std::make_shared<sync_handler>([handler, _message]() {
+ handler(std::move(_message));
+ });
+ handlers_.push_back(its_sync_handler);
+ }
}
+ dispatcher_condition_.notify_one();
}
- dispatcher_condition_.notify_one();
}
}
@@ -1196,19 +1196,20 @@ void application_impl::main_dispatch() {
remove_elapsed_dispatchers();
+#ifdef _WIN32
if(!is_dispatching_) {
its_lock.unlock();
-#ifdef _WIN32
return;
-#endif
}
+#endif
}
}
}
+ its_lock.unlock();
}
void application_impl::dispatch() {
- std::thread::id its_id = std::this_thread::get_id();
+ const std::thread::id its_id = std::this_thread::get_id();
while (is_active_dispatcher(its_id)) {
std::unique_lock<std::mutex> its_lock(handlers_mutex_);
if (is_dispatching_ && handlers_.empty()) {
@@ -1231,35 +1232,44 @@ void application_impl::dispatch() {
}
}
}
-
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- elapsed_dispatchers_.insert(std::this_thread::get_id());
+ {
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ elapsed_dispatchers_.insert(its_id);
+ }
}
void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
- std::thread::id its_id = std::this_thread::get_id();
+ const std::thread::id its_id = std::this_thread::get_id();
boost::asio::steady_timer its_dispatcher_timer(io_);
its_dispatcher_timer.expires_from_now(std::chrono::milliseconds(max_dispatch_time_));
its_dispatcher_timer.async_wait([this, its_id](const boost::system::error_code &_error) {
if (!_error) {
VSOMEIP_INFO << "Blocking call detected. Client=" << std::hex << get_client();
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- blocked_dispatchers_.insert(its_id);
-
- if (has_active_dispatcher()) {
+ bool active_dispatcher_available(false);
+ {
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ blocked_dispatchers_.insert(its_id);
+ active_dispatcher_available = has_active_dispatcher();
+ }
+ if (active_dispatcher_available) {
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
dispatcher_condition_.notify_all();
- } else {
+ } else if (is_dispatching_) {
// If possible, create a new dispatcher thread to unblock.
- // If this is _not_ possible, dispatching is blocked until at least
- // one of the active handler calls returns.
+ // If this is _not_ possible, dispatching is blocked until
+ // at least one of the active handler calls returns.
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
if (dispatchers_.size() < max_dispatchers_) {
auto its_dispatcher = std::make_shared<std::thread>(
- std::bind(&application_impl::dispatch, this));
+ std::bind(&application_impl::dispatch, shared_from_this()));
dispatchers_[its_dispatcher->get_id()] = its_dispatcher;
} else {
VSOMEIP_ERROR << "Maximum number of dispatchers exceeded.";
}
+ } else {
+ VSOMEIP_INFO << "Won't start new dispatcher thread as Client="
+ << std::hex << get_client() << " is shutting down";
}
}
});
@@ -1273,7 +1283,7 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
}
bool application_impl::has_active_dispatcher() {
- for (auto d : dispatchers_) {
+ for (const auto &d : dispatchers_) {
if (blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() &&
elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
return true;
@@ -1282,9 +1292,9 @@ bool application_impl::has_active_dispatcher() {
return false;
}
-bool application_impl::is_active_dispatcher(std::thread::id &_id) {
+bool application_impl::is_active_dispatcher(const std::thread::id &_id) {
std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- for (auto d : dispatchers_) {
+ for (const auto &d : dispatchers_) {
if (d.first != _id &&
blocked_dispatchers_.find(d.first) == blocked_dispatchers_.end() &&
elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
@@ -1328,7 +1338,7 @@ void application_impl::clear_all_handler() {
members_.clear();
}
{
- std::unique_lock<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(handlers_mutex_);
handlers_.clear();
}
}
@@ -1344,12 +1354,17 @@ void application_impl::shutdown() {
stop_cv_.wait(its_lock);
}
}
+ std::map<std::thread::id, std::shared_ptr<std::thread>> its_dispatchers;
{
- std::lock_guard<std::mutex> its_lock(handlers_mutex_);
+ std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+ its_dispatchers = dispatchers_;
+ }
+ {
+ std::lock_guard<std::mutex> its_handler_lock(handlers_mutex_);
is_dispatching_ = false;
+ dispatcher_condition_.notify_all();
}
- dispatcher_condition_.notify_all();
- for (auto its_dispatcher : dispatchers_) {
+ for (auto its_dispatcher : its_dispatchers) {
if (its_dispatcher.second->get_id() != stop_caller_id_) {
if (its_dispatcher.second->joinable()) {
its_dispatcher.second->join();
@@ -1392,7 +1407,7 @@ bool application_impl::is_routing() const {
void application_impl::send_back_cached_event(service_t _service,
instance_t _instance,
event_t _event) {
- std::shared_ptr<event> its_event = routing_->get_event(_service,
+ std::shared_ptr<event> its_event = routing_->find_event(_service,
_instance, _event);
if (its_event && its_event->is_field() && its_event->is_set()) {
std::shared_ptr<message> its_message = runtime_->create_notification();
@@ -1402,6 +1417,11 @@ void application_impl::send_back_cached_event(service_t _service,
its_message->set_payload(its_event->get_payload());
its_message->set_initial(true);
on_message(std::move(its_message));
+ VSOMEIP_INFO << "Sending back cached event ("
+ << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
}
}
@@ -1413,12 +1433,20 @@ void application_impl::send_back_cached_eventgroup(service_t _service,
for(const auto &its_event : its_events) {
if (its_event && its_event->is_field() && its_event->is_set()) {
std::shared_ptr<message> its_message = runtime_->create_notification();
+ const event_t its_event_id(its_event->get_event());
its_message->set_service(_service);
- its_message->set_method(its_event->get_event());
+ its_message->set_method(its_event_id);
its_message->set_instance(_instance);
its_message->set_payload(its_event->get_payload());
its_message->set_initial(true);
on_message(std::move(its_message));
+ VSOMEIP_INFO << "Sending back cached event ("
+ << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event_id
+ << "] from eventgroup "
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
}
}
}
@@ -1432,91 +1460,114 @@ void application_impl::check_send_back_cached_event(
service_t _service, instance_t _instance, event_t _event,
eventgroup_t _eventgroup, bool *_send_back_cached_event,
bool *_send_back_cached_eventgroup) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
-
- bool already_subscibed(false);
- auto found_service = eventgroup_subscriptions_.find(_service);
- if(found_service != eventgroup_subscriptions_.end()) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ *_send_back_cached_event = false;
+ *_send_back_cached_eventgroup = false;
+ bool already_subscribed(false);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- already_subscibed = true;
- }
- }
- }
-
- if (already_subscibed) {
- auto found_service = event_subscriptions_.find(_service);
- if(found_service != event_subscriptions_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_event = found_instance->second.find(_event);
- if (found_event != found_instance->second.end()) {
- if(found_event->second) {
- // initial values for this event have already been sent,
- // send back cached value
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ auto found_eventgroup = found_event->second.find(_eventgroup);
+ if (found_eventgroup != found_event->second.end()) {
+ already_subscribed = true;
+ if (found_eventgroup->second) {
+ // initial values for this event have already been
+ // received, send back cached value
if(_event == ANY_EVENT) {
*_send_back_cached_eventgroup = true;
} else {
*_send_back_cached_event = true;
}
}
- return;
}
}
}
}
- event_subscriptions_[_service][_instance][_event] = false;
- eventgroup_subscriptions_[_service][_instance].insert(_eventgroup);
+
+ if (!already_subscribed) {
+ subscriptions_[_service][_instance][_event][_eventgroup] = false;
+ }
}
void application_impl::remove_subscription(service_t _service,
instance_t _instance,
- eventgroup_t _eventgroup) {
- std::lock_guard<std::mutex> its_lock(event_subscriptions_mutex_);
+ eventgroup_t _eventgroup,
+ event_t _event) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
- auto found_service1 = eventgroup_subscriptions_.find(_service);
- if(found_service1 != eventgroup_subscriptions_.end()) {
- auto found_instance = found_service1->second.find(_instance);
- if (found_instance != found_service1->second.end()) {
- auto found_eventgroup = found_instance->second.find(_eventgroup);
- if (found_eventgroup != found_instance->second.end()) {
- found_instance->second.erase(_eventgroup);
- if (!found_instance->second.size()) {
- found_service1->second.erase(_instance);
- if (!found_service1->second.size()) {
- eventgroup_subscriptions_.erase(_service);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ if (found_event->second.erase(_eventgroup)) {
+ if (!found_event->second.size()) {
+ found_instance->second.erase(_event);
+ if (!found_instance->second.size()) {
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ subscriptions_.erase(_service);
+ }
+ }
}
}
}
}
}
+}
- auto found_service = event_subscriptions_.find(_service);
- if (found_service != event_subscriptions_.end()) {
+bool application_impl::check_for_active_subscription(service_t _service,
+ instance_t _instance,
+ event_t _event) {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+ auto found_service = subscriptions_.find(_service);
+ if(found_service != subscriptions_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
- if (routing_) {
- std::set<std::shared_ptr<event>> its_events =
- routing_->find_events(_service, _instance,
- _eventgroup);
- for (const auto &e : its_events) {
- const event_t its_event(e->get_event());
- auto found_event = found_instance->second.find(its_event);
- if (found_event != found_instance->second.end()) {
- found_instance->second.erase(its_event);
+ auto found_event = found_instance->second.find(_event);
+ if (found_event != found_instance->second.end()) {
+ if (found_event->second.size()) {
+ for (auto &eventgroup : found_event->second) {
+ eventgroup.second = true;
}
+ return true;
}
- if (!found_instance->second.size()) {
- found_service->second.erase(_instance);
- if (!found_service->second.size()) {
- event_subscriptions_.erase(_service);
+ } else {
+ // Received a event which nobody yet explicitly subscribed to.
+ // Check if someone subscribed to ANY_EVENT for one of
+ // the received event's eventgroups
+ auto found_any_event = found_instance->second.find(ANY_EVENT);
+ if (found_any_event != found_instance->second.end()) {
+ if (routing_) {
+ std::shared_ptr<event> its_event = routing_->find_event(
+ _service, _instance, _event);
+ if (its_event) {
+ for (const auto eg : its_event->get_eventgroups()) {
+ auto found_eventgroup = found_any_event->second.find(eg);
+ if (found_eventgroup != found_any_event->second.end()) {
+ // set the flag for initial event received to true
+ // even if we might not already received all of the
+ // eventgroups events.
+ found_eventgroup->second = true;
+ return true;
+ }
+ }
+ }
}
}
}
}
}
+ // Return false if an event was received from:
+ // - a service which nobody yet subscribed to
+ // - a service instance which nobody yet subscribed to
+ // - a service instance and nobody yet subscribed to one of the event's
+ // eventgroups
+ return false;
}
} // namespace vsomeip