diff options
Diffstat (limited to 'implementation/routing/src/event.cpp')
-rw-r--r-- | implementation/routing/src/event.cpp | 615 |
1 files changed, 461 insertions, 154 deletions
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index b63022c..e4eed17 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -1,10 +1,12 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +#include <chrono> #include <iomanip> #include <sstream> +#include <thread> #include <vsomeip/constants.hpp> #include <vsomeip/defines.hpp> @@ -21,72 +23,104 @@ namespace vsomeip_v3 { -event::event(routing_manager *_routing, bool _is_shadow) : - routing_(_routing), - message_(runtime::get()->create_notification()), - type_(event_type_e::ET_EVENT), - cycle_timer_(_routing->get_io()), - cycle_(std::chrono::milliseconds::zero()), - change_resets_cycle_(false), - is_updating_on_change_(true), - is_set_(false), - is_provided_(false), - is_shadow_(_is_shadow), - is_cache_placeholder_(false), - epsilon_change_func_(std::bind(&event::compare, this, +event::event(routing_manager *_routing, bool _is_shadow) + : routing_(_routing), + current_(runtime::get()->create_notification()), + update_(runtime::get()->create_notification()), + type_(event_type_e::ET_EVENT), + cycle_timer_(_routing->get_io()), + cycle_(std::chrono::milliseconds::zero()), + change_resets_cycle_(false), + is_updating_on_change_(true), + is_set_(false), + is_provided_(false), + is_shadow_(_is_shadow), + is_cache_placeholder_(false), + epsilon_change_func_(std::bind(&event::has_changed, this, std::placeholders::_1, std::placeholders::_2)), - reliability_(reliability_type_e::RT_UNKNOWN) { + reliability_(reliability_type_e::RT_UNKNOWN) { + } -service_t event::get_service() const { - return (message_->get_service()); +service_t +event::get_service() const { + + return (current_->get_service()); } -void event::set_service(service_t _service) { - message_->set_service(_service); +void +event::set_service(service_t _service) { + + current_->set_service(_service); + update_->set_service(_service); } -instance_t event::get_instance() const { - return (message_->get_instance()); +instance_t +event::get_instance() const { + + return (current_->get_instance()); } -void event::set_instance(instance_t _instance) { - message_->set_instance(_instance); +void +event::set_instance(instance_t _instance) { + + current_->set_instance(_instance); + update_->set_instance(_instance); } -major_version_t event::get_version() const { - return message_->get_interface_version(); +major_version_t +event::get_version() const { + + return current_->get_interface_version(); } -void event::set_version(major_version_t _major) { - message_->set_interface_version(_major); +void +event::set_version(major_version_t _major) { + + current_->set_interface_version(_major); + update_->set_interface_version(_major); } -event_t event::get_event() const { - return (message_->get_method()); +event_t +event::get_event() const { + + return (current_->get_method()); } -void event::set_event(event_t _event) { - message_->set_method(_event); +void +event::set_event(event_t _event) { + + current_->set_method(_event); + update_->set_method(_event); } -event_type_e event::get_type() const { +event_type_e +event::get_type() const { + return (type_); } -void event::set_type(const event_type_e _type) { +void +event::set_type(const event_type_e _type) { + type_ = _type; } -bool event::is_field() const { +bool +event::is_field() const { + return (type_ == event_type_e::ET_FIELD); } -bool event::is_provided() const { +bool +event::is_provided() const { + return (is_provided_); } -void event::set_provided(bool _is_provided) { +void +event::set_provided(bool _is_provided) { + is_provided_ = _is_provided; } @@ -94,122 +128,138 @@ bool event::is_set() const { return is_set_; } -const std::shared_ptr<payload> event::get_payload() const { +std::shared_ptr<payload> +event::get_payload() const { + std::lock_guard<std::mutex> its_lock(mutex_); - return (message_->get_payload()); + return (current_->get_payload()); } -bool event::set_payload_dont_notify(const std::shared_ptr<payload> &_payload) { +void +event::update_payload() { + std::lock_guard<std::mutex> its_lock(mutex_); - if (is_cache_placeholder_) { - reset_payload(_payload); - is_set_ = true; - } else { - if (set_payload_helper(_payload, false)) { - reset_payload(_payload); - } else { - return false; - } - } - return true; + update_payload_unlocked(); +} + +void +event::update_payload_unlocked() { + + current_->set_payload(update_->get_payload()); } -void event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) { +void +event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) { + std::lock_guard<std::mutex> its_lock(mutex_); - if (is_provided_) { - if (set_payload_helper(_payload, _force)) { - reset_payload(_payload); - if (is_updating_on_change_) { - if (change_resets_cycle_) - stop_cycle(); + if (is_provided_ && prepare_update_payload_unlocked(_payload, _force)) { + if (is_updating_on_change_) { + if (change_resets_cycle_) + stop_cycle(); - notify(); + notify(_force); - if (change_resets_cycle_) - start_cycle(); - } + if (change_resets_cycle_) + start_cycle(); + + update_payload_unlocked(); } } else { - VSOMEIP_INFO << "Can't set payload for event " + VSOMEIP_INFO << "Cannot set payload for event [" << std::hex << std::setw(4) << std::setfill('0') - << get_service() << "." << get_instance() << "." << get_event() - << " as it isn't provided"; + << current_->get_service() << "." + << current_->get_instance() << "." + << current_->get_method() + << "]. It isn't provided"; } } -void event::set_payload(const std::shared_ptr<payload> &_payload, client_t _client, +void +event::set_payload(const std::shared_ptr<payload> &_payload, client_t _client, bool _force) { + std::lock_guard<std::mutex> its_lock(mutex_); - if (is_provided_) { - if (set_payload_helper(_payload, _force)) { - reset_payload(_payload); - if (is_updating_on_change_) { - notify_one_unlocked(_client); - } + if (is_provided_ && prepare_update_payload_unlocked(_payload, _force)) { + if (is_updating_on_change_) { + notify_one_unlocked(_client, _force); + update_payload_unlocked(); } } else { - VSOMEIP_INFO << "Can't set payload for event " + VSOMEIP_INFO << "Cannot set payload for event [" << std::hex << std::setw(4) << std::setfill('0') - << get_service() << "." << get_instance() << "." << get_event() - << ". It isn't provided"; + << current_->get_service() << "." + << current_->get_instance() << "." + << current_->get_method() + << "]. It isn't provided"; } } -void event::set_payload(const std::shared_ptr<payload> &_payload, +void +event::set_payload(const std::shared_ptr<payload> &_payload, const client_t _client, - const std::shared_ptr<endpoint_definition>& _target, - bool _force) { + const std::shared_ptr<endpoint_definition> &_target) { + std::lock_guard<std::mutex> its_lock(mutex_); - if (is_provided_) { - if (set_payload_helper(_payload, _force)) { - reset_payload(_payload); - if (is_updating_on_change_) { - notify_one_unlocked(_client, _target); - } + if (is_provided_ && prepare_update_payload_unlocked(_payload, false)) { + if (is_updating_on_change_) { + notify_one_unlocked(_client, _target); + update_payload_unlocked(); } } else { - VSOMEIP_INFO << "Can't set payload for event " + VSOMEIP_INFO << "Cannot set payload for event [" << std::hex << std::setw(4) << std::setfill('0') - << get_service() << "." << get_instance() << "." << get_event() - << ". It isn't provided"; + << current_->get_service() << "." + << current_->get_instance() << "." + << current_->get_method() + << "]. It isn't provided"; } } -bool event::set_payload_notify_pending(const std::shared_ptr<payload> &_payload) { +bool +event::set_payload_notify_pending(const std::shared_ptr<payload> &_payload) { + std::lock_guard<std::mutex> its_lock(mutex_); - if (!is_set_ && is_provided_) { - reset_payload(_payload); + if (is_provided_ && !is_set_) { + + update_->set_payload(_payload); + is_set_ = true; // Send pending initial events. for (const auto &its_target : pending_) { - message_->set_session(routing_->get_session()); + set_session(); routing_->send_to(VSOMEIP_ROUTING_CLIENT, - its_target, message_); + its_target, update_); } pending_.clear(); - return true; + update_payload_unlocked(); + + return (true); } - return false; + return (false); } -void event::unset_payload(bool _force) { +void +event::unset_payload(bool _force) { + std::lock_guard<std::mutex> its_lock(mutex_); if (_force) { is_set_ = false; stop_cycle(); - message_->set_payload(std::make_shared<payload_impl>()); + current_->set_payload(std::make_shared<payload_impl>()); } else { if (is_provided_) { is_set_ = false; stop_cycle(); - message_->set_payload(std::make_shared<payload_impl>()); + current_->set_payload(std::make_shared<payload_impl>()); } } } -void event::set_update_cycle(std::chrono::milliseconds &_cycle) { +void +event::set_update_cycle(std::chrono::milliseconds &_cycle) { + if (is_provided_) { std::lock_guard<std::mutex> its_lock(mutex_); stop_cycle(); @@ -218,24 +268,33 @@ void event::set_update_cycle(std::chrono::milliseconds &_cycle) { } } -void event::set_change_resets_cycle(bool _change_resets_cycle) { +void +event::set_change_resets_cycle(bool _change_resets_cycle) { + change_resets_cycle_ = _change_resets_cycle; } -void event::set_update_on_change(bool _is_active) { +void +event::set_update_on_change(bool _is_active) { + if (is_provided_) { is_updating_on_change_ = _is_active; } } -void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func) { +void +event::set_epsilon_change_function( + const epsilon_change_func_t &_epsilon_change_func) { + + std::lock_guard<std::mutex> its_lock(mutex_); if (_epsilon_change_func) { - std::lock_guard<std::mutex> its_lock(mutex_); epsilon_change_func_ = _epsilon_change_func; } } -const std::set<eventgroup_t> event::get_eventgroups() const { +std::set<eventgroup_t> +event::get_eventgroups() const { + std::set<eventgroup_t> its_eventgroups; { std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); @@ -246,7 +305,9 @@ const std::set<eventgroup_t> event::get_eventgroups() const { return its_eventgroups; } -std::set<eventgroup_t> event::get_eventgroups(client_t _client) const { +std::set<eventgroup_t> +event::get_eventgroups(client_t _client) const { + std::set<eventgroup_t> its_eventgroups; std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); @@ -257,23 +318,29 @@ std::set<eventgroup_t> event::get_eventgroups(client_t _client) const { return its_eventgroups; } -void event::add_eventgroup(eventgroup_t _eventgroup) { +void +event::add_eventgroup(eventgroup_t _eventgroup) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); if (eventgroups_.find(_eventgroup) == eventgroups_.end()) eventgroups_[_eventgroup] = std::set<client_t>(); } -void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) { +void +event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); for (auto e : _eventgroups) eventgroups_[e] = std::set<client_t>(); } -void event::update_cbk(boost::system::error_code const &_error) { +void +event::update_cbk(boost::system::error_code const &_error) { + if (!_error) { std::lock_guard<std::mutex> its_lock(mutex_); cycle_timer_.expires_from_now(cycle_); - notify(); + notify(true); auto its_handler = std::bind(&event::update_cbk, shared_from_this(), std::placeholders::_1); @@ -281,10 +348,12 @@ void event::update_cbk(boost::system::error_code const &_error) { } } -void event::notify() { +void +event::notify(bool _force) { + if (is_set_) { - message_->set_session(routing_->get_session()); - routing_->send(VSOMEIP_ROUTING_CLIENT, message_); + set_session(); + routing_->send(VSOMEIP_ROUTING_CLIENT, update_, _force); } else { VSOMEIP_INFO << __func__ << ": Notifying " @@ -294,8 +363,10 @@ void event::notify() { } } -void event::notify_one(client_t _client, +void +event::notify_one(client_t _client, const std::shared_ptr<endpoint_definition> &_target) { + if (_target) { std::lock_guard<std::mutex> its_lock(mutex_); notify_one_unlocked(_client, _target); @@ -308,12 +379,14 @@ void event::notify_one(client_t _client, } } -void event::notify_one_unlocked(client_t _client, +void +event::notify_one_unlocked(client_t _client, const std::shared_ptr<endpoint_definition> &_target) { + if (_target) { if (is_set_) { - message_->set_session(routing_->get_session()); - routing_->send_to(_client, _target, message_); + set_session(); + routing_->send_to(_client, _target, update_); } else { VSOMEIP_INFO << __func__ << ": Notifying " @@ -331,15 +404,23 @@ void event::notify_one_unlocked(client_t _client, } } -void event::notify_one(client_t _client) { +void +event::notify_one(client_t _client, bool _force) { + std::lock_guard<std::mutex> its_lock(mutex_); - notify_one_unlocked(_client); + notify_one_unlocked(_client, _force); } -void event::notify_one_unlocked(client_t _client) { +void +event::notify_one_unlocked(client_t _client, bool _force) { + if (is_set_) { - message_->set_session(routing_->get_session()); - routing_->send(_client, message_); + set_session(); + { + std::lock_guard<std::mutex> its_last_forwarded_guard(last_forwarded_mutex_); + last_forwarded_[_client] = std::chrono::steady_clock::now(); + } + routing_->send(_client, update_, _force); } else { VSOMEIP_INFO << __func__ << ": Notifying " @@ -350,28 +431,46 @@ void event::notify_one_unlocked(client_t _client) { } } -bool event::set_payload_helper(const std::shared_ptr<payload> &_payload, bool _force) { - std::shared_ptr<payload> its_payload = message_->get_payload(); - bool is_change(type_ != event_type_e::ET_FIELD); - if (!is_change) { - is_change = _force || epsilon_change_func_(its_payload, _payload); - } - return is_change; +bool +event::prepare_update_payload(const std::shared_ptr<payload> &_payload, + bool _force) { + + std::lock_guard<std::mutex> its_lock(mutex_); + return (prepare_update_payload_unlocked(_payload, _force)); } -void event::reset_payload(const std::shared_ptr<payload> &_payload) { - std::shared_ptr<payload> its_new_payload +bool +event::prepare_update_payload_unlocked( + const std::shared_ptr<payload> &_payload, bool _force) { + + // Copy payload to avoid manipulation from the outside + std::shared_ptr<payload> its_payload = runtime::get()->create_payload( - _payload->get_data(), _payload->get_length()); - message_->set_payload(its_new_payload); + _payload->get_data(), _payload->get_length()); + + bool is_change = has_changed(current_->get_payload(), its_payload); + if (!_force + && type_ == event_type_e::ET_FIELD + && cycle_ == std::chrono::milliseconds::zero() + && !is_change) { + + return (false); + } + + if (is_change) + update_->set_payload(its_payload); if (!is_set_) start_cycle(); is_set_ = true; + + return (true); } -void event::add_ref(client_t _client, bool _is_provided) { +void +event::add_ref(client_t _client, bool _is_provided) { + std::lock_guard<std::mutex> its_lock(refs_mutex_); auto its_client = refs_.find(_client); if (its_client == refs_.end()) { @@ -386,7 +485,9 @@ void event::add_ref(client_t _client, bool _is_provided) { } } -void event::remove_ref(client_t _client, bool _is_provided) { +void +event::remove_ref(client_t _client, bool _is_provided) { + std::lock_guard<std::mutex> its_lock(refs_mutex_); auto its_client = refs_.find(_client); if (its_client != refs_.end()) { @@ -403,24 +504,126 @@ void event::remove_ref(client_t _client, bool _is_provided) { } } -bool event::has_ref() { +bool +event::has_ref() { + std::lock_guard<std::mutex> its_lock(refs_mutex_); return refs_.size() != 0; } -bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force) { +bool +event::add_subscriber(eventgroup_t _eventgroup, + const std::shared_ptr<debounce_filter_t> &_filter, + client_t _client, bool _force) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); bool ret = false; if (_force // remote events managed by rm_impl || is_provided_ // events provided by rm_proxies || is_shadow_ // local events managed by rm_impl || is_cache_placeholder_) { + + if (_filter) { + VSOMEIP_WARNING << "Using client [" + << std::hex << std::setw(4) << std::setfill('0') + << _client + << "] specific filter configuration for SOME/IP event " + << get_service() << "." << get_instance() << "." << get_event() << "."; + std::stringstream its_filter_parameters; + its_filter_parameters << "(on_change=" + << std::boolalpha << _filter->on_change_ + << ", interval=" << std::dec << _filter->interval_ + << ", on_change_resets_interval=" + << std::boolalpha << _filter->on_change_resets_interval_ + << ", ignore=[ "; + for (auto i : _filter->ignore_) + its_filter_parameters << "(" << std::dec << i.first << ", " + << std::hex << std::setw(2) << std::setfill('0') + << (int)i.second << ") "; + its_filter_parameters << "])"; + VSOMEIP_INFO << "Filter parameters: " + << its_filter_parameters.str(); + + filters_[_client] = [_filter, _client, this]( + const std::shared_ptr<payload> &_old, + const std::shared_ptr<payload> &_new) { + + bool is_changed(false), is_elapsed(false); + + // Check whether we should forward because of changed data + if (_filter->on_change_) { + length_t its_min_length, its_max_length; + + if (_old->get_length() < _new->get_length()) { + its_min_length = _old->get_length(); + its_max_length = _new->get_length(); + } else { + its_min_length = _new->get_length(); + its_max_length = _old->get_length(); + } + + // Check whether all additional bytes (if any) are excluded + for (length_t i = its_min_length; i < its_max_length; i++) { + auto j = _filter->ignore_.find(i); + // A change is detected when an additional byte is not + // excluded at all or if its exclusion does not cover all + // bits + if (j == _filter->ignore_.end() || j->second != 0xFF) { + is_changed = true; + break; + } + } + + if (!is_changed) { + const byte_t *its_old = _old->get_data(); + const byte_t *its_new = _new->get_data(); + for (length_t i = 0; i < its_min_length; i++) { + auto j = _filter->ignore_.find(i); + if (j == _filter->ignore_.end()) { + if (its_old[i] != its_new[i]) { + is_changed = true; + break; + } + } else if (j->second != 0xFF) { + if ((its_old[i] & ~(j->second)) != (its_new[i] & ~(j->second))) { + is_changed = true; + break; + } + } + } + } + } + + if (_filter->interval_ > -1) { + // Check whether we should forward because of the elapsed time since + // we did last time + std::chrono::steady_clock::time_point its_current + = std::chrono::steady_clock::now(); + + std::lock_guard<std::mutex> its_last_forwarded_guard(last_forwarded_mutex_); + is_elapsed = (last_forwarded_.find(_client) == last_forwarded_.end()); + if (!is_elapsed) { + std::int64_t elapsed = std::chrono::duration_cast<std::chrono::milliseconds>( + its_current - last_forwarded_[_client]).count(); + is_elapsed = (elapsed >= _filter->interval_); + } + + if (is_elapsed || (is_changed && _filter->on_change_resets_interval_)) + last_forwarded_[_client] = its_current; + } + + return (is_changed || is_elapsed); + }; + } else { + filters_.erase(_client); + } + ret = eventgroups_[_eventgroup].insert(_client).second; + } else { VSOMEIP_WARNING << __func__ << ": Didnt' insert client " - << std::hex << std::setw(4) << std::setfill('0') - << _client - << " to eventgroup " + << std::hex << std::setw(4) << std::setfill('0') << _client + << " to eventgroup 0x" << std::hex << std::setw(4) << std::setfill('0') << get_service() << "." << get_instance() << "." << _eventgroup; @@ -428,14 +631,21 @@ bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _for return ret; } -void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) { +void +event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); auto find_eventgroup = eventgroups_.find(_eventgroup); if (find_eventgroup != eventgroups_.end()) find_eventgroup->second.erase(_client); + + std::lock_guard<std::mutex> its_last_forwarded_guard(last_forwarded_mutex_); + last_forwarded_.erase(_client); } -bool event::has_subscriber(eventgroup_t _eventgroup, client_t _client) { +bool +event::has_subscriber(eventgroup_t _eventgroup, client_t _client) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); auto find_eventgroup = eventgroups_.find(_eventgroup); if (find_eventgroup != eventgroups_.end()) { @@ -449,7 +659,9 @@ bool event::has_subscriber(eventgroup_t _eventgroup, client_t _client) { return false; } -std::set<client_t> event::get_subscribers() { +std::set<client_t> +event::get_subscribers() { + std::set<client_t> its_subscribers; std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); for (const auto &e : eventgroups_) @@ -457,13 +669,79 @@ std::set<client_t> event::get_subscribers() { return its_subscribers; } -void event::clear_subscribers() { +std::set<client_t> +event::get_filtered_subscribers(bool _force) { + + std::set<client_t> its_subscribers(get_subscribers()); + std::set<client_t> its_filtered_subscribers; + + std::shared_ptr<payload> its_payload, its_payload_update; + { + its_payload = current_->get_payload(); + its_payload_update = update_->get_payload(); + } + + if (filters_.empty()) { + + bool must_forward = (type_ != event_type_e::ET_FIELD + || _force + || epsilon_change_func_(its_payload, its_payload_update)); + + if (must_forward) + return (its_subscribers); + + } else { + byte_t is_allowed(0xff); + + std::lock_guard<std::mutex> its_lock(filters_mutex_); + for (const auto s : its_subscribers) { + + auto its_specific = filters_.find(s); + if (its_specific != filters_.end()) { + if (its_specific->second(its_payload, its_payload_update)) + its_filtered_subscribers.insert(s); + } else { + if (is_allowed == 0xff) { + is_allowed = (type_ != event_type_e::ET_FIELD + || _force + || epsilon_change_func_(its_payload, its_payload_update) + ? 0x01 : 0x00); + } + + if (is_allowed == 0x01) + its_filtered_subscribers.insert(s); + } + } + } + + return (its_filtered_subscribers); +} + +std::set<client_t> +event::update_and_get_filtered_subscribers( + const std::shared_ptr<payload> &_payload, bool _is_from_remote) { + + std::lock_guard<std::mutex> its_lock(mutex_); + + (void)prepare_update_payload_unlocked(_payload, true); + auto its_subscribers = get_filtered_subscribers(!_is_from_remote); + if (_is_from_remote) + update_payload_unlocked(); + + return (its_subscribers); +} + +void +event::clear_subscribers() { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); for (auto &e : eventgroups_) e.second.clear(); } -bool event::has_ref(client_t _client, bool _is_provided) { +bool +event::has_ref(client_t _client, bool _is_provided) { + std::lock_guard<std::mutex> its_lock(refs_mutex_); auto its_client = refs_.find(_client); if (its_client != refs_.end()) { @@ -477,24 +755,35 @@ bool event::has_ref(client_t _client, bool _is_provided) { return false; } -bool event::is_shadow() const { +bool +event::is_shadow() const { + return is_shadow_; } -void event::set_shadow(bool _shadow) { +void +event::set_shadow(bool _shadow) { + is_shadow_ = _shadow; } -bool event::is_cache_placeholder() const { +bool +event::is_cache_placeholder() const { + return is_cache_placeholder_; } -void event::set_cache_placeholder(bool _is_cache_place_holder) { +void +event::set_cache_placeholder(bool _is_cache_place_holder) { + is_cache_placeholder_ = _is_cache_place_holder; } -void event::start_cycle() { - if (std::chrono::milliseconds::zero() != cycle_) { +void +event::start_cycle() { + + if (!is_shadow_ + && std::chrono::milliseconds::zero() != cycle_) { cycle_timer_.expires_from_now(cycle_); auto its_handler = std::bind(&event::update_cbk, shared_from_this(), @@ -503,15 +792,20 @@ void event::start_cycle() { } } -void event::stop_cycle() { - if (std::chrono::milliseconds::zero() != cycle_) { +void +event::stop_cycle() { + + if (!is_shadow_ + && std::chrono::milliseconds::zero() != cycle_) { boost::system::error_code ec; cycle_timer_.cancel(ec); } } -bool event::compare(const std::shared_ptr<payload> &_lhs, +bool +event::has_changed(const std::shared_ptr<payload> &_lhs, const std::shared_ptr<payload> &_rhs) const { + bool is_change = (_lhs->get_length() != _rhs->get_length()); if (!is_change) { std::size_t its_pos = 0; @@ -522,10 +816,12 @@ bool event::compare(const std::shared_ptr<payload> &_lhs, its_pos++; } } - return is_change; + return (is_change); } -std::set<client_t> event::get_subscribers(eventgroup_t _eventgroup) { +std::set<client_t> +event::get_subscribers(eventgroup_t _eventgroup) { + std::set<client_t> its_subscribers; std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); auto found_eventgroup = eventgroups_.find(_eventgroup); @@ -535,7 +831,9 @@ std::set<client_t> event::get_subscribers(eventgroup_t _eventgroup) { return its_subscribers; } -bool event::is_subscribed(client_t _client) { +bool +event::is_subscribed(client_t _client) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); for (const auto &egp : eventgroups_) { if (egp.second.find(_client) != egp.second.end()) { @@ -547,18 +845,27 @@ bool event::is_subscribed(client_t _client) { reliability_type_e event::get_reliability() const { + return reliability_; } void event::set_reliability(const reliability_type_e _reliability) { + reliability_ = _reliability; } void event::remove_pending(const std::shared_ptr<endpoint_definition> &_target) { + std::lock_guard<std::mutex> its_lock(mutex_); pending_.erase(_target); } +void +event::set_session() { + + update_->set_session(routing_->get_session(false)); +} + } // namespace vsomeip_v3 |