diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2017-02-28 03:57:20 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2017-02-28 03:57:20 -0800 |
commit | 07d7573c007322be07689575ce5d73c45f030d6d (patch) | |
tree | 6a6acbf7536de451391d9d50b7f5cdde49d67fd8 /implementation/routing | |
parent | 1a230558936ec84b4fb44b2346dc5ae52d6f2805 (diff) | |
download | vSomeIP-07d7573c007322be07689575ce5d73c45f030d6d.tar.gz |
vSomeIP 2.5.32.5.3
Diffstat (limited to 'implementation/routing')
19 files changed, 578 insertions, 284 deletions
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index 6e1de4a..e127807 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -10,6 +10,7 @@ #include <memory> #include <mutex> #include <set> +#include <atomic> #include <boost/asio/io_service.hpp> #include <boost/asio/ip/address.hpp> @@ -93,6 +94,8 @@ public: bool is_cache_placeholder() const; void set_cache_placeholder(bool _is_cache_place_holder); + bool has_ref(client_t _client, bool _is_provided); + private: void update_cbk(boost::system::error_code const &_error); void notify(bool _flush); @@ -111,25 +114,27 @@ private: mutable std::mutex mutex_; std::shared_ptr<message> message_; - bool is_field_; + std::atomic<bool> is_field_; boost::asio::steady_timer cycle_timer_; std::chrono::milliseconds cycle_; - bool change_resets_cycle_; - bool is_updating_on_change_; + std::atomic<bool> change_resets_cycle_; + + std::atomic<bool> is_updating_on_change_; + std::mutex eventgroups_mutex_; std::set<eventgroup_t> eventgroups_; - bool is_set_; - bool is_provided_; + std::atomic<bool> is_set_; + std::atomic<bool> is_provided_; std::mutex refs_mutex_; std::map<client_t, std::map<bool, uint32_t>> refs_; - bool is_shadow_; + std::atomic<bool> is_shadow_; - bool is_cache_placeholder_; + std::atomic<bool> is_cache_placeholder_; epsilon_change_func_t epsilon_change_func_; }; diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp index fc3d321..ae3ab87 100644 --- a/implementation/routing/include/eventgroupinfo.hpp +++ b/implementation/routing/include/eventgroupinfo.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -10,6 +10,8 @@ #include <list> #include <memory> #include <set> +#include <mutex> +#include <atomic> #include <boost/asio/ip/address.hpp> @@ -73,17 +75,21 @@ public: VSOMEIP_EXPORT void set_threshold(uint8_t _threshold); private: - major_version_t major_; - ttl_t ttl_; + std::atomic<major_version_t> major_; + std::atomic<ttl_t> ttl_; + mutable std::mutex address_mutex_; boost::asio::ip::address address_; uint16_t port_; + mutable std::mutex events_mutex_; std::set<std::shared_ptr<event> > events_; + mutable std::mutex targets_mutex_; std::list<target_t> targets_; + mutable std::mutex multicast_targets_mutex_; std::list<target_t> multicast_targets_; - uint8_t threshold_; + std::atomic<uint8_t> threshold_; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager.hpp b/implementation/routing/include/routing_manager.hpp index 1fc63d4..4911701 100644 --- a/implementation/routing/include/routing_manager.hpp +++ b/implementation/routing/include/routing_manager.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. diff --git a/implementation/routing/include/routing_manager_adapter.hpp b/implementation/routing/include/routing_manager_adapter.hpp index 0dfe703..3d81120 100644 --- a/implementation/routing/include/routing_manager_adapter.hpp +++ b/implementation/routing/include/routing_manager_adapter.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp index 75d42c4..d3c0e04 100644 --- a/implementation/routing/include/routing_manager_base.hpp +++ b/implementation/routing/include/routing_manager_base.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -113,7 +113,7 @@ public: endpoint *_receiver, const boost::asio::ip::address &_remote_address, std::uint16_t _remote_port) = 0; -#ifndef WIN32 +#ifndef _WIN32 virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid); #endif @@ -177,10 +177,17 @@ protected: void send_pending_notify_ones(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client); + void unset_all_eventpayloads(service_t _service, instance_t _instance); + void unset_all_eventpayloads(service_t _service, instance_t _instance, + eventgroup_t _eventgroup); + private: std::shared_ptr<endpoint> create_local_unlocked(client_t _client); std::shared_ptr<endpoint> find_local_unlocked(client_t _client); + std::set<std::tuple<service_t, instance_t, eventgroup_t>> + get_subscriptions(const client_t _client); + protected: routing_manager_host *host_; boost::asio::io_service &io_; diff --git a/implementation/routing/include/routing_manager_host.hpp b/implementation/routing/include/routing_manager_host.hpp index 27a1b1e..bbea181 100644 --- a/implementation/routing/include/routing_manager_host.hpp +++ b/implementation/routing/include/routing_manager_host.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index 6e6bd45..d29a3d7 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -94,6 +94,13 @@ public: bool send_to(const std::shared_ptr<endpoint_definition> &_target, const byte_t *_data, uint32_t _size, uint16_t _sd_port); + void register_event(client_t _client, service_t _service, + instance_t _instance, event_t _event, + const std::set<eventgroup_t> &_eventgroups, bool _is_field, + std::chrono::milliseconds _cycle, bool _change_resets_cycle, + epsilon_change_func_t _epsilon_change_func, + bool _is_provided, bool _is_shadow, bool _is_cache_placeholder); + void register_shadow_event(client_t _client, service_t _service, instance_t _instance, event_t _event, const std::set<eventgroup_t> &_eventgroups, @@ -304,6 +311,12 @@ private: void start_ip_routing(); + void requested_service_add(client_t _client, service_t _service, + instance_t _instance, major_version_t _major, + minor_version_t _minor); + void requested_service_remove(client_t _client, service_t _service, + instance_t _instance); + std::shared_ptr<routing_manager_stub> stub_; std::shared_ptr<sd::service_discovery> discovery_; @@ -354,11 +367,12 @@ private: bool if_state_running_; std::mutex pending_sd_offers_mutex_; std::vector<std::pair<service_t, instance_t>> pending_sd_offers_; -#ifndef WIN32 +#ifndef _WIN32 std::shared_ptr<netlink_connector> netlink_connector_; #endif #ifndef WITHOUT_SYSTEMD + std::mutex watchdog_timer_mutex_; boost::asio::steady_timer watchdog_timer_; void watchdog_cbk(boost::system::error_code const &_error); #endif diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index 37ec8b5..27c071d 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -197,12 +197,37 @@ private: std::set<eventgroup_t> eventgroups_; bool operator<(const event_data_t &_other) const { - return (service_ < _other.service_ - || (service_ == _other.service_ - && instance_ < _other.instance_) - || (service_ == _other.service_ - && instance_ == _other.instance_ - && event_ < _other.event_)); + if (service_ < _other.service_) { + return true; + } + if (service_ == _other.service_ && instance_ < _other.instance_) { + return true; + } + if (service_ == _other.service_ && instance_ == _other.instance_ + && event_ < _other.event_) { + return true; + } + if (service_ == _other.service_ && instance_ == _other.instance_ + && event_ == _other.event_ + && is_provided_ != _other.is_provided_) { + return true; + } + if (service_ == _other.service_ + && instance_ == _other.instance_ + && event_ == _other.event_ + && is_provided_ == _other.is_provided_ + && is_field_ != _other.is_field_) { + return true; + } + if (service_ == _other.service_ + && instance_ == _other.instance_ + && event_ == _other.event_ + && is_provided_ == _other.is_provided_ + && is_field_ == _other.is_field_ + && eventgroups_ < _other.eventgroups_) { + return true; + } + return false; } }; std::set<event_data_t> pending_event_registrations_; @@ -218,7 +243,7 @@ private: std::map<service_t, std::map<instance_t, std::map<eventgroup_t, uint32_t > > > remote_subscriber_count_; - mutable std::recursive_mutex sender_mutex_; + mutable std::mutex sender_mutex_; boost::asio::steady_timer register_application_timer_; diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp index 9cc0255..3cf89a7 100644 --- a/implementation/routing/include/routing_manager_stub.hpp +++ b/implementation/routing/include/routing_manager_stub.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -82,7 +82,7 @@ public: bool is_registered(client_t _client) const; void deregister_erroneous_client(client_t _client); client_t get_client() const; -#ifndef WIN32 +#ifndef _WIN32 virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid); #endif private: @@ -112,6 +112,7 @@ private: private: routing_manager_stub_host *host_; boost::asio::io_service &io_; + std::mutex watchdog_timer_mutex_; boost::asio::steady_timer watchdog_timer_; std::string endpoint_path_; diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp index 65bb28b..cb8e83c 100644 --- a/implementation/routing/include/routing_manager_stub_host.hpp +++ b/implementation/routing/include/routing_manager_stub_host.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. diff --git a/implementation/routing/include/serviceinfo.hpp b/implementation/routing/include/serviceinfo.hpp index c890435..7eab1ec 100644 --- a/implementation/routing/include/serviceinfo.hpp +++ b/implementation/routing/include/serviceinfo.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -63,6 +63,7 @@ private: std::shared_ptr<endpoint> reliable_; std::shared_ptr<endpoint> unreliable_; + mutable std::mutex endpoint_mutex_; std::mutex requesters_mutex_; std::set<client_t> requesters_; diff --git a/implementation/routing/include/types.hpp b/implementation/routing/include/types.hpp index c483aee..9b803a5 100644 --- a/implementation/routing/include/types.hpp +++ b/implementation/routing/include/types.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index b6947d0..5ede55e 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -173,6 +173,7 @@ void event::unset_payload(bool _force) { void event::set_update_cycle(std::chrono::milliseconds &_cycle) { if (is_provided_) { + std::lock_guard<std::mutex> its_lock(mutex_); stop_cycle(); cycle_ = _cycle; start_cycle(); @@ -190,8 +191,10 @@ void event::set_update_on_change(bool _is_active) { } void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func) { - if (_epsilon_change_func) + if (_epsilon_change_func) { + std::lock_guard<std::mutex> its_lock(mutex_); epsilon_change_func_ = _epsilon_change_func; + } } const std::set<eventgroup_t> & event::get_eventgroups() const { @@ -199,15 +202,18 @@ const std::set<eventgroup_t> & event::get_eventgroups() const { } void event::add_eventgroup(eventgroup_t _eventgroup) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); eventgroups_.insert(_eventgroup); } void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); eventgroups_ = _eventgroups; } void event::update_cbk(boost::system::error_code const &_error) { if (!_error) { + std::lock_guard<std::mutex> its_lock(mutex_); cycle_timer_.expires_from_now(cycle_); notify(true); std::function<void(boost::system::error_code const &)> its_handler = @@ -302,6 +308,20 @@ bool event::has_ref() { return refs_.size() != 0; } +bool event::has_ref(client_t _client, bool _is_provided) { + std::lock_guard<std::mutex> its_lock(refs_mutex_); + auto its_client = refs_.find(_client); + if (its_client != refs_.end()) { + auto its_provided = its_client->second.find(_is_provided); + if (its_provided != its_client->second.end()) { + if(its_provided->second > 0) { + return true; + } + } + } + return false; +} + bool event::is_shadow() const { return is_shadow_; } diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index 3b3e657..0a32861 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -41,6 +41,7 @@ void eventgroupinfo::set_ttl(ttl_t _ttl) { } bool eventgroupinfo::is_multicast() const { + std::lock_guard<std::mutex> its_lock(address_mutex_); return address_.is_multicast(); } @@ -52,7 +53,7 @@ bool eventgroupinfo::is_sending_multicast() const { bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address, uint16_t &_port) const { - + std::lock_guard<std::mutex> its_lock(address_mutex_); if (address_.is_multicast()) { _address = address_; _port = port_; @@ -63,28 +64,34 @@ bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address, void eventgroupinfo::set_multicast(const boost::asio::ip::address &_address, uint16_t _port) { + std::lock_guard<std::mutex> its_lock(address_mutex_); address_ = _address; port_ = _port; } const std::set<std::shared_ptr<event> > eventgroupinfo::get_events() const { + std::lock_guard<std::mutex> its_lock(events_mutex_); return events_; } void eventgroupinfo::add_event(std::shared_ptr<event> _event) { + std::lock_guard<std::mutex> its_lock(events_mutex_); events_.insert(_event); } void eventgroupinfo::remove_event(std::shared_ptr<event> _event) { + std::lock_guard<std::mutex> its_lock(events_mutex_); events_.erase(_event); } const std::list<eventgroupinfo::target_t> eventgroupinfo::get_targets() const { + std::lock_guard<std::mutex> its_lock(targets_mutex_); return targets_; } uint32_t eventgroupinfo::get_unreliable_target_count() const { uint32_t _count(0); + std::lock_guard<std::mutex> its_lock(targets_mutex_); for (auto i = targets_.begin(); i != targets_.end(); i++) { if (!i->endpoint_->is_reliable()) { _count++; @@ -94,6 +101,7 @@ uint32_t eventgroupinfo::get_unreliable_target_count() const { } void eventgroupinfo::add_multicast_target(const eventgroupinfo::target_t &_multicast_target) { + std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_); if (std::find(multicast_targets_.begin(), multicast_targets_.end(), _multicast_target) == multicast_targets_.end()) { multicast_targets_.push_back(_multicast_target); @@ -101,14 +109,17 @@ void eventgroupinfo::add_multicast_target(const eventgroupinfo::target_t &_multi } void eventgroupinfo::clear_multicast_targets() { + std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_); multicast_targets_.clear(); } const std::list<eventgroupinfo::target_t> eventgroupinfo::get_multicast_targets() const { + std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_); return multicast_targets_; } bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target) { + std::lock_guard<std::mutex> its_lock(targets_mutex_); std::size_t its_size = targets_.size(); if (std::find(targets_.begin(), targets_.end(), _target) == targets_.end()) { targets_.push_back(_target); @@ -116,18 +127,30 @@ bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target) { return (its_size != targets_.size()); } -bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target, const eventgroupinfo::target_t &_subscriber) { - std::size_t its_size = targets_.size(); - if (std::find(targets_.begin(), targets_.end(), _subscriber) == targets_.end()) { - targets_.push_back(_subscriber); +bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target, + const eventgroupinfo::target_t &_subscriber) { + bool add(false); + bool ret(false); + { + std::lock_guard<std::mutex> its_lock(targets_mutex_); + std::size_t its_size = targets_.size(); + if (std::find(targets_.begin(), targets_.end(), _subscriber) + == targets_.end()) { + targets_.push_back(_subscriber); + add = true; + } + ret = (its_size != targets_.size()); + } + if (add) { add_multicast_target(_target); } - return (its_size != targets_.size()); + return ret; } bool eventgroupinfo::update_target( const std::shared_ptr<endpoint_definition> &_target, const std::chrono::steady_clock::time_point &_expiration) { + std::lock_guard<std::mutex> its_lock(targets_mutex_); for (auto i = targets_.begin(); i != targets_.end(); i++) { if (i->endpoint_->get_address() == _target->get_address() && i->endpoint_->get_port() == _target->get_port()) { @@ -140,6 +163,7 @@ bool eventgroupinfo::update_target( bool eventgroupinfo::remove_target( const std::shared_ptr<endpoint_definition> &_target) { + std::lock_guard<std::mutex> its_lock(targets_mutex_); std::size_t its_size = targets_.size(); for (auto i = targets_.begin(); i != targets_.end(); i++) { @@ -153,6 +177,7 @@ bool eventgroupinfo::remove_target( } void eventgroupinfo::clear_targets() { + std::lock_guard<std::mutex> its_lock(targets_mutex_); targets_.clear(); } diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 751a588..ac5da9c 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -116,7 +116,8 @@ void routing_manager_base::request_service(client_t _client, service_t _service, auto its_info = find_service(_service, _instance); if (its_info) { if ((_major == its_info->get_major() - || DEFAULT_MAJOR == its_info->get_major()) + || DEFAULT_MAJOR == its_info->get_major() + || ANY_MAJOR == _major) && (_minor <= its_info->get_minor() || DEFAULT_MINOR == its_info->get_minor() || _minor == ANY_MINOR)) { @@ -332,6 +333,15 @@ void routing_manager_base::unsubscribe(client_t _client, service_t _service, auto its_group = its_instance->second.find(_eventgroup); if (its_group != its_instance->second.end()) { its_group->second.erase(_client); + if (!its_group->second.size()) { + its_instance->second.erase(_eventgroup); + if (!its_instance->second.size()) { + its_service->second.erase(_instance); + if (!its_service->second.size()) { + eventgroup_clients_.erase(_service); + } + } + } } } } @@ -433,6 +443,40 @@ void routing_manager_base::send_pending_notify_ones(service_t _service, instance } } +void routing_manager_base::unset_all_eventpayloads(service_t _service, + instance_t _instance) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + const auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + const auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (const auto &eventgroupinfo : found_instance->second) { + for (const auto &event : eventgroupinfo.second->get_events()) { + event->unset_payload(true); + } + } + } + } +} + +void routing_manager_base::unset_all_eventpayloads(service_t _service, + instance_t _instance, + eventgroup_t _eventgroup) { + std::lock_guard<std::mutex> its_lock(eventgroups_mutex_); + const auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + const auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + const auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + for (const auto &event : found_eventgroup->second->get_events()) { + event->unset_payload(true); + } + } + } + } +} + bool routing_manager_base::send(client_t its_client, std::shared_ptr<message> _message, bool _flush) { @@ -543,7 +587,7 @@ std::shared_ptr<endpoint> routing_manager_base::create_local_unlocked(client_t _ std::stringstream its_path; its_path << VSOMEIP_BASE_PATH << std::hex << _client; -#ifdef WIN32 +#ifdef _WIN32 boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); int port = VSOMEIP_INTERNAL_BASE_PORT + _client; VSOMEIP_INFO << "Connecting to [" @@ -554,7 +598,7 @@ std::shared_ptr<endpoint> routing_manager_base::create_local_unlocked(client_t _ #endif std::shared_ptr<local_client_endpoint_impl> its_endpoint = std::make_shared< local_client_endpoint_impl>(shared_from_this(), -#ifdef WIN32 +#ifdef _WIN32 boost::asio::ip::tcp::endpoint(address, port) #else boost::asio::local::stream_protocol::endpoint(its_path.str()) @@ -609,6 +653,14 @@ std::shared_ptr<endpoint> routing_manager_base::find_or_create_local(client_t _c } void routing_manager_base::remove_local(client_t _client) { + auto subscriptions = get_subscriptions(_client); + for (auto its_subscription : subscriptions) { + host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription), + std::get<2>(its_subscription), _client, false); + routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription), + std::get<1>(its_subscription), std::get<2>(its_subscription)); + } + std::shared_ptr<endpoint> its_endpoint(find_local(_client)); if (its_endpoint) { its_endpoint->stop(); @@ -634,40 +686,6 @@ void routing_manager_base::remove_local(client_t _client) { local_services_.erase(si.first); } } - // delete client's subscriptions if he didn't unsubscribe properly - { - std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); - for (auto service_it = eventgroup_clients_.begin(); - service_it != eventgroup_clients_.end();) { - for (auto instance_it = service_it->second.begin(); - instance_it != service_it->second.end();) { - for (auto eventgroup_it = instance_it->second.begin(); - eventgroup_it != instance_it->second.end();) { - if (eventgroup_it->second.erase(_client) > 0) { - if (eventgroup_it->second.size() == 0) { - eventgroup_it = instance_it->second.erase(eventgroup_it); - } else { - ++eventgroup_it; - } - } else { - ++eventgroup_it; - } - } - - if (instance_it->second.size() == 0) { - instance_it = service_it->second.erase(instance_it); - } else { - ++instance_it; - } - } - - if (service_it->second.size() == 0) { - service_it = eventgroup_clients_.erase(service_it); - } else { - ++service_it; - } - } - } } std::shared_ptr<endpoint> routing_manager_base::find_local(service_t _service, @@ -904,7 +922,7 @@ void routing_manager_base::put_deserializer(std::shared_ptr<deserializer> _deser deserializer_condition_.notify_one(); } -#ifndef WIN32 +#ifndef _WIN32 bool routing_manager_base::check_credentials(client_t _client, uid_t _uid, gid_t _gid) { return configuration_->check_credentials(_client, _uid, _gid); } @@ -934,5 +952,21 @@ void routing_manager_base::remove_pending_subscription(service_t _service, if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it); } +std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::get_subscriptions(const client_t _client) { + std::set<std::tuple<service_t, instance_t, eventgroup_t>> result; + std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_); + for (auto its_service : eventgroup_clients_) { + for (auto its_instance : its_service.second) { + for (auto its_eventgroup : its_instance.second) { + auto its_client = its_eventgroup.second.find(_client); + if (its_client != its_eventgroup.second.end()) { + result.insert(std::make_tuple(its_service.first, its_instance.first, its_eventgroup.first)); + } + } + } + } + return result; +} + } // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 00108a2..e78f400 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -92,7 +92,7 @@ void routing_manager_impl::init() { } void routing_manager_impl::start() { -#ifndef WIN32 +#ifndef _WIN32 netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(), configuration_->get_unicast_address()); netlink_connector_->register_net_if_changes_handler( @@ -115,9 +115,12 @@ void routing_manager_impl::start() { } #ifndef WITHOUT_SYSTEMD - watchdog_timer_.expires_from_now(std::chrono::seconds(0)); - watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, - this, std::placeholders::_1)); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.expires_from_now(std::chrono::seconds(0)); + watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, + this, std::placeholders::_1)); + } #endif } @@ -126,14 +129,17 @@ void routing_manager_impl::stop() { std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_); version_log_timer_.cancel(); } -#ifndef WIN32 +#ifndef _WIN32 if (netlink_connector_) { netlink_connector_->stop(); } #endif #ifndef WITHOUT_SYSTEMD - watchdog_timer_.cancel(); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.cancel(); + } sd_notify(0, "STOPPING=1"); #endif @@ -251,10 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, auto its_info = find_service(_service, _instance); if (!its_info) { - { - std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); - requested_services_[_client][_service][_instance].insert({ _major, _minor }); - } + requested_service_add(_client, _service, _instance, _major, _minor); if (discovery_) { if (!configuration_->is_local_service(_service, _instance)) { // Non local service instance ~> tell SD to find it! @@ -270,16 +273,14 @@ void routing_manager_impl::request_service(client_t _client, service_t _service, } } else { if ((_major == its_info->get_major() - || DEFAULT_MAJOR == its_info->get_major()) + || DEFAULT_MAJOR == its_info->get_major() + || ANY_MAJOR == _major) && (_minor <= its_info->get_minor() || DEFAULT_MINOR == its_info->get_minor() || _minor == ANY_MINOR)) { if(!its_info->is_local()) { - { - std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); - requested_services_[_client][_service][_instance].insert({ _major, _minor }); - its_info->add_client(_client); - } + requested_service_add(_client, _service, _instance, _major, _minor); + its_info->add_client(_client); find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT); if (_use_exclusive_proxy) { std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true); @@ -310,20 +311,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; routing_manager_base::release_service(_client, _service, _instance); - - { - std::lock_guard<std::mutex> its_lock(requested_services_mutex_); - auto its_client = requested_services_.find(_client); - if (its_client != requested_services_.end()) { - auto its_service = its_client->second.find(_service); - if (its_service != its_client->second.end()) { - auto its_instance = its_service->second.find(_instance); - if (its_instance != its_service->second.end()) { - its_service->second.erase(_instance); - } - } - } - } + requested_service_remove(_client, _service, _instance); std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance)); if(its_info && !its_info->is_local()) { @@ -339,6 +327,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, clear_service_info(_service, _instance, false); clear_identified_clients(_service, _instance); clear_identifying_clients( _service, _instance); + unset_all_eventpayloads(_service, _instance); } else { remove_identified_client(_service, _instance, _client); remove_identifying_client(_service, _instance, _client); @@ -462,9 +451,15 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, _eventgroup); if (found_eventgroup != found_instance->second.end()) { found_eventgroup->second.erase(_client); - if (0 == found_eventgroup->second.size()) { + if (!found_eventgroup->second.size()) { last_subscriber_removed = true; - eventgroup_clients_.erase(_eventgroup); + found_instance->second.erase(_eventgroup); + if (!found_service->second.size()) { + found_service->second.erase(_instance); + if (!found_service->second.size()) { + eventgroup_clients_.erase(_service); + } + } } } } @@ -487,6 +482,9 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, } } } + if (last_subscriber_removed) { + unset_all_eventpayloads(_service, _instance, _eventgroup); + } if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) { // for normal subscribers only unsubscribe via SD if last // subscriber was removed @@ -751,6 +749,27 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_ return false; } +void routing_manager_impl::register_event( + client_t _client, service_t _service, instance_t _instance, + event_t _event, const std::set<eventgroup_t> &_eventgroups, + bool _is_field, std::chrono::milliseconds _cycle, + bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func, + bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { + auto its_event = find_event(_service, _instance, _event); + bool is_first(false); + if (its_event && !its_event->has_ref(_client, _is_provided)) { + is_first = true; + } else { + is_first = true; + } + if (is_first) { + routing_manager_base::register_event(_client, _service, _instance, + _event, _eventgroups, _is_field, _cycle, _change_resets_cycle, + _epsilon_change_func, _is_provided, _is_shadow, + _is_cache_placeholder); + } +} + void routing_manager_impl::register_shadow_event(client_t _client, service_t _service, instance_t _instance, event_t _event, const std::set<eventgroup_t> &_eventgroups, @@ -1549,7 +1568,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( its_endpoint->enable_magic_cookies(); } } else { -#ifndef WIN32 +#ifndef _WIN32 if (its_unicast.is_v4()) { its_unicast = boost::asio::ip::address_v4::any(); } else if (its_unicast.is_v6()) { @@ -1929,7 +1948,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (found_instance != found_service->second.end()) { for (const auto &major_minor_pair : found_instance->second) { if ((major_minor_pair.first == _major - || _major == DEFAULT_MAJOR) + || _major == DEFAULT_MAJOR + || major_minor_pair.first == ANY_MAJOR) && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { @@ -1969,7 +1989,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info( if (found_instance != found_service->second.end()) { for (const auto &major_minor_pair : found_instance->second) { if ((major_minor_pair.first == _major - || _major == DEFAULT_MAJOR) + || _major == DEFAULT_MAJOR + || major_minor_pair.first == ANY_MAJOR) && (major_minor_pair.second <= _minor || _minor == DEFAULT_MINOR || major_minor_pair.second == ANY_MINOR)) { @@ -2339,14 +2360,14 @@ void routing_manager_impl::on_subscribe( get_client(_subscriber); } } - stub_->send_subscribe(find_local(_service, _instance), - client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true); // send initial events if we already have a cached field (is_set) for (auto its_event : its_eventgroup->get_events()) { if (its_event->is_field() && its_event->is_set()) { its_event->notify_one(_subscriber, true); // TODO: use _flush parameter to send all event at once } } + stub_->send_subscribe(find_local(_service, _instance), + client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true); } } } @@ -2397,25 +2418,27 @@ void routing_manager_impl::on_subscribe_ack(service_t _service, instance_t _instance, const boost::asio::ip::address &_address, uint16_t _port) { - if (multicast_info.find(_service) != multicast_info.end()) { - if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { - auto endpoint_def = multicast_info[_service][_instance]; - if (endpoint_def->get_address() == _address && - endpoint_def->get_port() == _port) { - - // Multicast info and endpoint already created before - // This can happen when more than one client subscribe on the same instance! - return; + { + std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_); + if (multicast_info.find(_service) != multicast_info.end()) { + if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { + auto endpoint_def = multicast_info[_service][_instance]; + if (endpoint_def->get_address() == _address && + endpoint_def->get_port() == _port) { + + // Multicast info and endpoint already created before + // This can happen when more than one client subscribe on the same instance! + return; + } } } - } - - // Save multicast info to be able to delete the endpoint - // as soon as the instance stops offering its service - std::shared_ptr<endpoint_definition> endpoint_def = - endpoint_definition::get(_address, _port, false); - multicast_info[_service][_instance] = endpoint_def; + // Save multicast info to be able to delete the endpoint + // as soon as the instance stops offering its service + std::shared_ptr<endpoint_definition> endpoint_def = + endpoint_definition::get(_address, _port, false); + multicast_info[_service][_instance] = endpoint_def; + } bool is_someip = configuration_->is_someip(_service, _instance); // Create multicast endpoint & join multicase group @@ -2645,12 +2668,19 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) { std::string address = multicast_info[_service][_instance]->get_address().to_string(); uint16_t port = multicast_info[_service][_instance]->get_port(); - auto multicast_endpoint = server_endpoints_[port][false]; - multicast_endpoint->leave(address); - multicast_endpoint->stop(); - server_endpoints_[port].erase(false); - if (server_endpoints_[port].find(true) == server_endpoints_[port].end()) { - server_endpoints_.erase(port); + std::shared_ptr<endpoint> multicast_endpoint; + auto found_port = server_endpoints_.find(port); + if (found_port != server_endpoints_.end()) { + auto found_unreliable = found_port->second.find(false); + if (found_unreliable != found_port->second.end()) { + multicast_endpoint = found_unreliable->second; + multicast_endpoint->leave(address); + multicast_endpoint->stop(); + server_endpoints_[port].erase(false); + } + if (found_port->second.find(true) == found_port->second.end()) { + server_endpoints_.erase(port); + } } multicast_info[_service].erase(_instance); if (0 >= multicast_info[_service].size()) { @@ -2659,7 +2689,7 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc // Clear service_instances_ for multicase endpoint if (1 >= service_instances_[_service].size()) { service_instances_.erase(_service); - } else { + } else if (multicast_endpoint) { service_instances_[_service].erase(multicast_endpoint.get()); } } @@ -3032,6 +3062,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error) } if (has_interval) { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2)); watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk, this, std::placeholders::_1)); @@ -3501,7 +3532,7 @@ void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _availa if (_available) { VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running."; start_ip_routing(); -#ifndef WIN32 +#ifndef _WIN32 if (netlink_connector_) { netlink_connector_->unregister_net_if_changes_handler(); } @@ -3526,4 +3557,36 @@ void routing_manager_impl::start_ip_routing() { pending_sd_offers_.clear(); } +void routing_manager_impl::requested_service_add(client_t _client, + service_t _service, + instance_t _instance, + major_version_t _major, + minor_version_t _minor) { + std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); + requested_services_[_client][_service][_instance].insert({ _major, _minor }); +} + +void routing_manager_impl::requested_service_remove(client_t _client, + service_t _service, + instance_t _instance) { + std::lock_guard<std::mutex> ist_lock(requested_services_mutex_); + auto found_client = requested_services_.find(_client); + if (found_client != requested_services_.end()) { + auto found_service = found_client->second.find(_service); + if (found_service != found_client->second.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + // delete all requested major/minor versions + found_service->second.erase(_instance); + if (!found_service->second.size()) { + found_client->second.erase(_service); + if (!found_client->second.size()) { + requested_services_.erase(client_); + } + } + } + } + } +} + } // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index a000211..e12e962 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -10,7 +10,7 @@ #include <future> #include <forward_list> -#ifndef WIN32 +#ifndef _WIN32 // for umask #include <sys/types.h> #include <sys/stat.h> @@ -54,7 +54,7 @@ void routing_manager_proxy::init() { routing_manager_base::init(); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); sender_ = create_local(VSOMEIP_ROUTING_CLIENT); } @@ -64,24 +64,24 @@ void routing_manager_proxy::init() { void routing_manager_proxy::start() { is_started_ = true; - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (!sender_) { - // application has been stopped and started again - sender_ = create_local(VSOMEIP_ROUTING_CLIENT); - } - } if (!receiver_) { // application has been stopped and started again init_receiver(); } - - if (sender_) { - sender_->start(); + if (receiver_) { + receiver_->start(); } - if (receiver_) - receiver_->start(); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (!sender_) { + // application has been stopped and started again + sender_ = create_local(VSOMEIP_ROUTING_CLIENT); + } + if (sender_) { + sender_->start(); + } + } } void routing_manager_proxy::stop() { @@ -114,9 +114,15 @@ void routing_manager_proxy::stop() { if (receiver_) { receiver_->stop(); } + receiver_ = nullptr; - if (sender_) { - sender_->stop(); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->stop(); + } + // delete the sender + sender_ = nullptr; } for (auto client: get_connected_clients()) { @@ -125,15 +131,9 @@ void routing_manager_proxy::stop() { } } - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - // delete the sender - sender_ = nullptr; - } - receiver_ = nullptr; std::stringstream its_client; its_client << VSOMEIP_BASE_PATH << std::hex << client_; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(its_client.str().c_str()); #else if (-1 == ::unlink(its_client.str().c_str())) { @@ -187,7 +187,7 @@ void routing_manager_proxy::send_offer_service(client_t _client, sizeof(_minor)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -226,7 +226,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client, sizeof(_minor)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -287,31 +287,39 @@ void routing_manager_proxy::register_event(client_t _client, std::chrono::milliseconds _cycle, bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func, bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) { - (void)_is_shadow; (void)_is_cache_placeholder; - routing_manager_base::register_event(_client, _service, _instance, - _event,_eventgroups, _is_field, - _cycle, _change_resets_cycle, - _epsilon_change_func, - _is_provided); - + const event_data_t registration = { + _service, + _instance, + _event, + _is_field, + _is_provided, + _eventgroups + }; + bool is_first(false); { std::lock_guard<std::mutex> its_lock(state_mutex_); - if (state_ == inner_state_type_e::ST_REGISTERED) { + is_first = pending_event_registrations_.find(registration) + == pending_event_registrations_.end(); + if (is_first) { + pending_event_registrations_.insert(registration); + } + } + if (is_first) { + routing_manager_base::register_event(_client, _service, _instance, + _event,_eventgroups, _is_field, + _cycle, _change_resets_cycle, + _epsilon_change_func, + _is_provided); + } + { + std::lock_guard<std::mutex> its_lock(state_mutex_); + if (state_ == inner_state_type_e::ST_REGISTERED && is_first) { send_register_event(client_, _service, _instance, _event, _eventgroups, _is_field, _is_provided); } - event_data_t registration = { - _service, - _instance, - _event, - _is_field, - _is_provided, - _eventgroups - }; - pending_event_registrations_.insert(registration); } } @@ -344,7 +352,7 @@ void routing_manager_proxy::unregister_event(client_t _client, = static_cast<byte_t>(_is_provided); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -418,11 +426,9 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service, auto its_target = find_or_create_local(target_client); its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -445,16 +451,16 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber, + sizeof(_subscriber)); auto its_target = find_local(_subscriber); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -477,16 +483,16 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber, + sizeof(_subscriber)); auto its_target = find_local(_subscriber); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); } } } @@ -497,7 +503,6 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service, { std::lock_guard<std::mutex> its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED) { - bool is_remote(false); byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; @@ -513,19 +518,16 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service, sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &is_remote, - sizeof(is_remote)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = 0; // is_local auto its_target = find_local(_service, _instance); if (its_target) { its_target->send(its_command, sizeof(its_command)); } else { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_command, sizeof(its_command)); - } - }; + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_command, sizeof(its_command)); + } } } remove_pending_subscription(_service, _instance); @@ -591,11 +593,18 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data, _instance, _flush, _reliable, VSOMEIP_SEND); } } - // If no direct endpoint could be found/is connected + // If no direct endpoint could be found // or for notifications ~> route to routing_manager_stub - if (!its_target || !its_target->is_connected()) { +#ifdef USE_DLT + bool message_to_stub(false); +#endif + if (!its_target) { + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { its_target = sender_; +#ifdef USE_DLT + message_to_stub = true; +#endif } else { return false; } @@ -615,7 +624,7 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data, } } #ifdef USE_DLT - else if (its_target != sender_) { + else if (!message_to_stub) { const uint16_t its_data_size = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size); @@ -656,8 +665,11 @@ bool routing_manager_proxy::send_to( } void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) { - if (_endpoint != sender_) { - return; + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (_endpoint != sender_) { + return; + } } is_connected_ = true; if (is_connected_ && is_started_) { @@ -668,7 +680,10 @@ void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) { } void routing_manager_proxy::on_disconnect(std::shared_ptr<endpoint> _endpoint) { - is_connected_ = !(_endpoint == sender_); + { + std::lock_guard<std::mutex> its_lock(sender_mutex_); + is_connected_ = !(_endpoint == sender_); + } if (!is_connected_) { host_->on_state(state_type_e::ST_DEREGISTERED); } @@ -717,8 +732,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, instance_t its_instance; eventgroup_t its_eventgroup; major_version_t its_major; - bool is_remote_subscriber; + uint8_t is_remote_subscriber; client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host()); + client_t its_subscriber; if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; @@ -818,6 +834,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE: + if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -878,6 +898,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_UNSUBSCRIBE: + if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -905,14 +929,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_NACK: + if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); - on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup); + on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -921,14 +951,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_ACK: + if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_ACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); - on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup); + on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -1160,20 +1196,16 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, } } - if (clients_to_delete.size()) { - std::async(std::launch::async, [this, clients_to_delete, restart_sender] () { - for (const auto client : clients_to_delete) { - if (client != VSOMEIP_ROUTING_CLIENT) { - remove_local(client); - } - } - }); + for (const auto client : clients_to_delete) { + if (client != VSOMEIP_ROUTING_CLIENT) { + remove_local(client); + } } if (restart_sender && is_started_) { VSOMEIP_INFO << std::hex << "Application/Client " << get_client() <<": Reconnecting to routing manager."; - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->start(); } @@ -1191,7 +1223,7 @@ void routing_manager_proxy::register_application() { if (is_connected_) { std::lock_guard<std::mutex> its_state_lock(state_mutex_); - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { { state_ = inner_state_type_e::ST_REGISTERING; @@ -1219,7 +1251,7 @@ void routing_manager_proxy::deregister_application() { sizeof(its_size)); if (is_connected_) { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(&its_command[0], uint32_t(its_command.size())); } @@ -1234,11 +1266,9 @@ void routing_manager_proxy::send_pong() const { sizeof(client_t)); if (is_connected_) { - { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->send(its_pong, sizeof(its_pong)); - } + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->send(its_pong, sizeof(its_pong)); } } } @@ -1268,7 +1298,7 @@ void routing_manager_proxy::send_request_service(client_t _client, service_t _se sizeof(_use_exclusive_proxy)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -1294,7 +1324,7 @@ void routing_manager_proxy::send_release_service(client_t _client, service_t _se sizeof(_instance)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, sizeof(its_command)); } @@ -1336,7 +1366,7 @@ void routing_manager_proxy::send_register_event(client_t _client, } { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, uint32_t(VSOMEIP_REGISTER_EVENT_COMMAND_SIZE @@ -1361,8 +1391,7 @@ void routing_manager_proxy::on_subscribe_nack(client_t _client, void routing_manager_proxy::on_identify_response(client_t _client, service_t _service, instance_t _instance, bool _reliable) { - static const uint32_t size = uint32_t(VSOMEIP_COMMAND_HEADER_SIZE + sizeof(service_t) + sizeof(instance_t) - + sizeof(bool)); + static const uint32_t size = VSOMEIP_ID_RESPONSE_COMMAND_SIZE; byte_t its_command[size]; uint32_t its_size = size - VSOMEIP_COMMAND_HEADER_SIZE; its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_ID_RESPONSE; @@ -1377,7 +1406,7 @@ void routing_manager_proxy::on_identify_response(client_t _client, service_t _se std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_reliable, sizeof(_reliable)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, size); } @@ -1454,7 +1483,7 @@ void routing_manager_proxy::send_pending_commands() { void routing_manager_proxy::init_receiver() { std::stringstream its_client; its_client << VSOMEIP_BASE_PATH << std::hex << client_; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(its_client.str().c_str()); int port = VSOMEIP_INTERNAL_BASE_PORT + client_; #else @@ -1472,14 +1501,14 @@ void routing_manager_proxy::init_receiver() { #endif try { receiver_ = std::make_shared<local_server_endpoint_impl>(shared_from_this(), -#ifdef WIN32 +#ifdef _WIN32 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), #else boost::asio::local::stream_protocol::endpoint(its_client.str()), #endif io_, configuration_->get_max_message_size_local(), configuration_->get_buffer_shrink_threshold()); -#ifdef WIN32 +#ifdef _WIN32 VSOMEIP_INFO << "Listening at " << port; #else VSOMEIP_INFO << "Listening at " << its_client.str(); @@ -1488,7 +1517,7 @@ void routing_manager_proxy::init_receiver() { host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); VSOMEIP_ERROR << "Client ID: " << std::hex << client_ << ": " << e.what(); } -#ifndef WIN32 +#ifndef _WIN32 ::umask(previous_mask); #endif } @@ -1514,7 +1543,7 @@ void routing_manager_proxy::notify_remote_initally(service_t _service, instance_ std::lock_guard<std::mutex> its_lock(serialize_mutex_); if (serializer_->serialize(its_notification.get())) { { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { send_local(sender_, VSOMEIP_ROUTING_CLIENT, serializer_->get_data(), serializer_->get_size(), _instance, true, false, VSOMEIP_NOTIFY); @@ -1573,7 +1602,7 @@ void routing_manager_proxy::register_application_timeout_cbk( } } if (register_again) { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); VSOMEIP_WARNING << std::hex << "Client 0x" << get_client() << " register timeout!" << " : Restart route to stub!"; if (sender_) { @@ -1592,7 +1621,7 @@ void routing_manager_proxy::send_registered_ack() { std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0, sizeof(uint32_t)); { - std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_); + std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { sender_->send(its_command, VSOMEIP_COMMAND_HEADER_SIZE); } diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index ee75ec7..c048fe6 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -8,7 +8,7 @@ #include <iomanip> #include <forward_list> -#ifndef WIN32 +#ifndef _WIN32 // for umask #include <sys/types.h> #include <sys/stat.h> @@ -101,12 +101,15 @@ void routing_manager_stub::stop() { client_registration_thread_->join(); } - watchdog_timer_.cancel(); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.cancel(); + } if( !is_socket_activated_) { endpoint_->stop(); endpoint_ = nullptr; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(endpoint_path_.c_str()); #else if (-1 == ::unlink(endpoint_path_.c_str())) { @@ -119,7 +122,7 @@ void routing_manager_stub::stop() { if(local_receiver_) { local_receiver_->stop(); local_receiver_ = nullptr; -#ifdef WIN32 +#ifdef _WIN32 ::_unlink(local_receiver_path_.c_str()); #else if (-1 == ::unlink(local_receiver_path_.c_str())) { @@ -206,6 +209,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, bool is_remote_subscriber(false); client_t its_client_from_header; client_t its_target_client; + client_t its_subscriber; its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; std::memcpy(&its_client, &_data[VSOMEIP_COMMAND_CLIENT_POS], @@ -255,6 +259,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_OFFER_SERVICE: + if (_size != VSOMEIP_OFFER_SERVICE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a OFFER_SERVICE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -269,6 +277,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_STOP_OFFER_SERVICE: + if (_size != VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a STOP_OFFER_SERVICE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -284,6 +296,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE: + if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -309,6 +325,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_UNSUBSCRIBE: + if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a UNSUBSCRIBE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], @@ -320,13 +340,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_ACK: + if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a SUBSCRIBE_ACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); - host_->on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); + host_->on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -335,13 +361,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_SUBSCRIBE_NACK: + if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a SUBSCRIBE_NACK command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); - host_->on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup); + std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_subscriber)); + host_->on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." @@ -420,6 +452,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; } case VSOMEIP_REQUEST_SERVICE: + if (_size != VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a REQUEST_SERVICE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -443,6 +479,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_RELEASE_SERVICE: + if (_size != VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a RELEASE_SERVICE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -452,6 +492,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_REGISTER_EVENT: + if (_size < VSOMEIP_REGISTER_EVENT_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a REGISTER_EVENT command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); @@ -490,6 +534,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_UNREGISTER_EVENT: + if (_size != VSOMEIP_UNREGISTER_EVENT_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a UNREGISTER_EVENT command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -516,6 +564,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, break; case VSOMEIP_ID_RESPONSE: + if (_size != VSOMEIP_ID_RESPONSE_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a ID_RESPONSE command with wrong size ~> skip!"; + break; + } std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], sizeof(its_service)); std::memcpy(&its_instance, @@ -602,15 +654,13 @@ void routing_manager_stub::client_registration_func(void) { } else { on_deregister_application(r.first); } - { - // Inform (de)registered client. All others will be informed after - // the client acknowledged its registered state! - // Don't inform client if we deregister because of an client - // endpoint error to avoid writing in an already closed socket + // Inform (de)registered client. All others will be informed after + // the client acknowledged its registered state! + // Don't inform client if we deregister because of an client + // endpoint error to avoid writing in an already closed socket + if (b != registration_type_e::DEREGISTER_ERROR_CASE) { std::lock_guard<std::mutex> its_guard(routing_info_mutex_); - if (b != registration_type_e::DEREGISTER_ERROR_CASE) { - send_routing_info(r.first); - } + send_routing_info(r.first); } if (b != registration_type_e::REGISTER) { { @@ -649,7 +699,7 @@ void routing_manager_stub::init_routing_endpoint() { } else if (num_fd == 1) { native_socket_fd = SD_LISTEN_FDS_START + 0; VSOMEIP_INFO << "Using native socket created by systemd socket activation! fd: " << native_socket_fd; - #ifndef WIN32 + #ifndef _WIN32 try { endpoint_ = std::make_shared < local_server_endpoint_impl @@ -666,7 +716,7 @@ void routing_manager_stub::init_routing_endpoint() { #endif is_socket_activated_ = true; } else { - #if WIN32 + #if _WIN32 ::_unlink(endpoint_path_.c_str()); int port = VSOMEIP_INTERNAL_BASE_PORT; VSOMEIP_INFO << "Routing endpoint at " << port; @@ -684,7 +734,7 @@ void routing_manager_stub::init_routing_endpoint() { endpoint_ = std::make_shared < local_server_endpoint_impl > (shared_from_this(), - #ifdef WIN32 + #ifdef _WIN32 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), #else boost::asio::local::stream_protocol::endpoint(endpoint_path_), @@ -697,7 +747,7 @@ void routing_manager_stub::init_routing_endpoint() { VSOMEIP_ERROR << "routing_manager_stub::init_routing_endpoint Client ID: " << std::hex << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); } - #ifndef WIN32 + #ifndef _WIN32 ::umask(previous_mask); #endif is_socket_activated_ = false; @@ -952,7 +1002,7 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi client_t this_client = get_client(); its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SUBSCRIBE_ACK; std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &this_client, - sizeof(_client)); + sizeof(this_client)); std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, @@ -961,6 +1011,8 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client, + sizeof(_client)); its_endpoint->send(&its_command[0], sizeof(its_command), true); } @@ -978,7 +1030,7 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv client_t this_client = get_client(); its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SUBSCRIBE_NACK; std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &this_client, - sizeof(_client)); + sizeof(this_client)); std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, @@ -987,6 +1039,8 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client, + sizeof(_client)); its_endpoint->send(&its_command[0], sizeof(its_command), true); } @@ -1024,7 +1078,8 @@ void routing_manager_stub::on_pong(client_t _client) { if (found_info != routing_info_.end()) { found_info->second.first = 0; } else { - VSOMEIP_ERROR << "Received PONG from unregistered application!"; + VSOMEIP_ERROR << "Received PONG from unregistered application: " + << std::hex << std::setw(4) << std::setfill('0') << _client; } } remove_from_pinged_clients(_client); @@ -1032,17 +1087,21 @@ void routing_manager_stub::on_pong(client_t _client) { } void routing_manager_stub::start_watchdog() { - // Divide / 2 as start and check sleep each - watchdog_timer_.expires_from_now( - std::chrono::milliseconds(configuration_->get_watchdog_timeout() / 2)); std::function<void(boost::system::error_code const &)> its_callback = [this](boost::system::error_code const &_error) { if (!_error) - check_watchdog(); + check_watchdog(); }; + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + // Divide / 2 as start and check sleep each + watchdog_timer_.expires_from_now( + std::chrono::milliseconds( + configuration_->get_watchdog_timeout() / 2)); - watchdog_timer_.async_wait(its_callback); + watchdog_timer_.async_wait(its_callback); + } } void routing_manager_stub::check_watchdog() { @@ -1054,8 +1113,6 @@ void routing_manager_stub::check_watchdog() { } broadcast_ping(); - watchdog_timer_.expires_from_now( - std::chrono::milliseconds(configuration_->get_watchdog_timeout() / 2)); std::function<void(boost::system::error_code const &)> its_callback = [this](boost::system::error_code const &_error) { @@ -1077,8 +1134,13 @@ void routing_manager_stub::check_watchdog() { } start_watchdog(); }; - - watchdog_timer_.async_wait(its_callback); + { + std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_); + watchdog_timer_.expires_from_now( + std::chrono::milliseconds( + configuration_->get_watchdog_timeout() / 2)); + watchdog_timer_.async_wait(its_callback); + } } void routing_manager_stub::create_local_receiver() { @@ -1088,7 +1150,7 @@ void routing_manager_stub::create_local_receiver() { std::stringstream its_local_receiver_path; its_local_receiver_path << VSOMEIP_BASE_PATH << std::hex << host_->get_client(); local_receiver_path_ = its_local_receiver_path.str(); -#if WIN32 +#if _WIN32 ::_unlink(local_receiver_path_.c_str()); int port = VSOMEIP_INTERNAL_BASE_PORT; #else @@ -1109,7 +1171,7 @@ void routing_manager_stub::create_local_receiver() { local_receiver_ = std::make_shared < local_server_endpoint_impl > (shared_from_this(), - #ifdef WIN32 + #ifdef _WIN32 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port + host_->get_client()), #else boost::asio::local::stream_protocol::endpoint(local_receiver_path_), @@ -1122,7 +1184,7 @@ void routing_manager_stub::create_local_receiver() { VSOMEIP_ERROR << "routing_manager_stub::_local_receiver Client ID: " << std::hex << VSOMEIP_ROUTING_CLIENT << ": " << e.what(); } -#ifndef WIN32 +#ifndef _WIN32 ::umask(previous_mask); #endif local_receiver_->start(); @@ -1293,7 +1355,7 @@ client_t routing_manager_stub::get_client() const { return host_->get_client(); } -#ifndef WIN32 +#ifndef _WIN32 bool routing_manager_stub::check_credentials(client_t _client, uid_t _uid, gid_t _gid) { return configuration_->check_credentials(_client, _uid, _gid); } diff --git a/implementation/routing/src/serviceinfo.cpp b/implementation/routing/src/serviceinfo.cpp index d497fe1..3afc937 100644 --- a/implementation/routing/src/serviceinfo.cpp +++ b/implementation/routing/src/serviceinfo.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -64,11 +64,13 @@ void serviceinfo::set_precise_ttl(std::chrono::milliseconds _precise_ttl) { } std::shared_ptr<endpoint> serviceinfo::get_endpoint(bool _reliable) const { + std::lock_guard<std::mutex> its_lock(endpoint_mutex_); return (_reliable ? reliable_ : unreliable_); } void serviceinfo::set_endpoint(std::shared_ptr<endpoint> _endpoint, bool _reliable) { + std::lock_guard<std::mutex> its_lock(endpoint_mutex_); if (_reliable) { reliable_ = _endpoint; } else { |