summaryrefslogtreecommitdiff
path: root/implementation/routing
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2017-02-28 03:57:20 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2017-02-28 03:57:20 -0800
commit07d7573c007322be07689575ce5d73c45f030d6d (patch)
tree6a6acbf7536de451391d9d50b7f5cdde49d67fd8 /implementation/routing
parent1a230558936ec84b4fb44b2346dc5ae52d6f2805 (diff)
downloadvSomeIP-07d7573c007322be07689575ce5d73c45f030d6d.tar.gz
vSomeIP 2.5.32.5.3
Diffstat (limited to 'implementation/routing')
-rw-r--r--implementation/routing/include/event.hpp21
-rw-r--r--implementation/routing/include/eventgroupinfo.hpp14
-rw-r--r--implementation/routing/include/routing_manager.hpp2
-rw-r--r--implementation/routing/include/routing_manager_adapter.hpp2
-rw-r--r--implementation/routing/include/routing_manager_base.hpp11
-rw-r--r--implementation/routing/include/routing_manager_host.hpp2
-rw-r--r--implementation/routing/include/routing_manager_impl.hpp18
-rw-r--r--implementation/routing/include/routing_manager_proxy.hpp41
-rw-r--r--implementation/routing/include/routing_manager_stub.hpp5
-rw-r--r--implementation/routing/include/routing_manager_stub_host.hpp2
-rw-r--r--implementation/routing/include/serviceinfo.hpp3
-rw-r--r--implementation/routing/include/types.hpp2
-rw-r--r--implementation/routing/src/event.cpp24
-rw-r--r--implementation/routing/src/eventgroupinfo.cpp39
-rw-r--r--implementation/routing/src/routing_manager_base.cpp112
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp187
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp241
-rw-r--r--implementation/routing/src/routing_manager_stub.cpp132
-rw-r--r--implementation/routing/src/serviceinfo.cpp4
19 files changed, 578 insertions, 284 deletions
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp
index 6e1de4a..e127807 100644
--- a/implementation/routing/include/event.hpp
+++ b/implementation/routing/include/event.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -10,6 +10,7 @@
#include <memory>
#include <mutex>
#include <set>
+#include <atomic>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/address.hpp>
@@ -93,6 +94,8 @@ public:
bool is_cache_placeholder() const;
void set_cache_placeholder(bool _is_cache_place_holder);
+ bool has_ref(client_t _client, bool _is_provided);
+
private:
void update_cbk(boost::system::error_code const &_error);
void notify(bool _flush);
@@ -111,25 +114,27 @@ private:
mutable std::mutex mutex_;
std::shared_ptr<message> message_;
- bool is_field_;
+ std::atomic<bool> is_field_;
boost::asio::steady_timer cycle_timer_;
std::chrono::milliseconds cycle_;
- bool change_resets_cycle_;
- bool is_updating_on_change_;
+ std::atomic<bool> change_resets_cycle_;
+
+ std::atomic<bool> is_updating_on_change_;
+ std::mutex eventgroups_mutex_;
std::set<eventgroup_t> eventgroups_;
- bool is_set_;
- bool is_provided_;
+ std::atomic<bool> is_set_;
+ std::atomic<bool> is_provided_;
std::mutex refs_mutex_;
std::map<client_t, std::map<bool, uint32_t>> refs_;
- bool is_shadow_;
+ std::atomic<bool> is_shadow_;
- bool is_cache_placeholder_;
+ std::atomic<bool> is_cache_placeholder_;
epsilon_change_func_t epsilon_change_func_;
};
diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp
index fc3d321..ae3ab87 100644
--- a/implementation/routing/include/eventgroupinfo.hpp
+++ b/implementation/routing/include/eventgroupinfo.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -10,6 +10,8 @@
#include <list>
#include <memory>
#include <set>
+#include <mutex>
+#include <atomic>
#include <boost/asio/ip/address.hpp>
@@ -73,17 +75,21 @@ public:
VSOMEIP_EXPORT void set_threshold(uint8_t _threshold);
private:
- major_version_t major_;
- ttl_t ttl_;
+ std::atomic<major_version_t> major_;
+ std::atomic<ttl_t> ttl_;
+ mutable std::mutex address_mutex_;
boost::asio::ip::address address_;
uint16_t port_;
+ mutable std::mutex events_mutex_;
std::set<std::shared_ptr<event> > events_;
+ mutable std::mutex targets_mutex_;
std::list<target_t> targets_;
+ mutable std::mutex multicast_targets_mutex_;
std::list<target_t> multicast_targets_;
- uint8_t threshold_;
+ std::atomic<uint8_t> threshold_;
};
} // namespace vsomeip
diff --git a/implementation/routing/include/routing_manager.hpp b/implementation/routing/include/routing_manager.hpp
index 1fc63d4..4911701 100644
--- a/implementation/routing/include/routing_manager.hpp
+++ b/implementation/routing/include/routing_manager.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
diff --git a/implementation/routing/include/routing_manager_adapter.hpp b/implementation/routing/include/routing_manager_adapter.hpp
index 0dfe703..3d81120 100644
--- a/implementation/routing/include/routing_manager_adapter.hpp
+++ b/implementation/routing/include/routing_manager_adapter.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp
index 75d42c4..d3c0e04 100644
--- a/implementation/routing/include/routing_manager_base.hpp
+++ b/implementation/routing/include/routing_manager_base.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -113,7 +113,7 @@ public:
endpoint *_receiver,
const boost::asio::ip::address &_remote_address,
std::uint16_t _remote_port) = 0;
-#ifndef WIN32
+#ifndef _WIN32
virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid);
#endif
@@ -177,10 +177,17 @@ protected:
void send_pending_notify_ones(service_t _service, instance_t _instance,
eventgroup_t _eventgroup, client_t _client);
+ void unset_all_eventpayloads(service_t _service, instance_t _instance);
+ void unset_all_eventpayloads(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup);
+
private:
std::shared_ptr<endpoint> create_local_unlocked(client_t _client);
std::shared_ptr<endpoint> find_local_unlocked(client_t _client);
+ std::set<std::tuple<service_t, instance_t, eventgroup_t>>
+ get_subscriptions(const client_t _client);
+
protected:
routing_manager_host *host_;
boost::asio::io_service &io_;
diff --git a/implementation/routing/include/routing_manager_host.hpp b/implementation/routing/include/routing_manager_host.hpp
index 27a1b1e..bbea181 100644
--- a/implementation/routing/include/routing_manager_host.hpp
+++ b/implementation/routing/include/routing_manager_host.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp
index 6e6bd45..d29a3d7 100644
--- a/implementation/routing/include/routing_manager_impl.hpp
+++ b/implementation/routing/include/routing_manager_impl.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -94,6 +94,13 @@ public:
bool send_to(const std::shared_ptr<endpoint_definition> &_target,
const byte_t *_data, uint32_t _size, uint16_t _sd_port);
+ void register_event(client_t _client, service_t _service,
+ instance_t _instance, event_t _event,
+ const std::set<eventgroup_t> &_eventgroups, bool _is_field,
+ std::chrono::milliseconds _cycle, bool _change_resets_cycle,
+ epsilon_change_func_t _epsilon_change_func,
+ bool _is_provided, bool _is_shadow, bool _is_cache_placeholder);
+
void register_shadow_event(client_t _client, service_t _service,
instance_t _instance, event_t _event,
const std::set<eventgroup_t> &_eventgroups,
@@ -304,6 +311,12 @@ private:
void start_ip_routing();
+ void requested_service_add(client_t _client, service_t _service,
+ instance_t _instance, major_version_t _major,
+ minor_version_t _minor);
+ void requested_service_remove(client_t _client, service_t _service,
+ instance_t _instance);
+
std::shared_ptr<routing_manager_stub> stub_;
std::shared_ptr<sd::service_discovery> discovery_;
@@ -354,11 +367,12 @@ private:
bool if_state_running_;
std::mutex pending_sd_offers_mutex_;
std::vector<std::pair<service_t, instance_t>> pending_sd_offers_;
-#ifndef WIN32
+#ifndef _WIN32
std::shared_ptr<netlink_connector> netlink_connector_;
#endif
#ifndef WITHOUT_SYSTEMD
+ std::mutex watchdog_timer_mutex_;
boost::asio::steady_timer watchdog_timer_;
void watchdog_cbk(boost::system::error_code const &_error);
#endif
diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp
index 37ec8b5..27c071d 100644
--- a/implementation/routing/include/routing_manager_proxy.hpp
+++ b/implementation/routing/include/routing_manager_proxy.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -197,12 +197,37 @@ private:
std::set<eventgroup_t> eventgroups_;
bool operator<(const event_data_t &_other) const {
- return (service_ < _other.service_
- || (service_ == _other.service_
- && instance_ < _other.instance_)
- || (service_ == _other.service_
- && instance_ == _other.instance_
- && event_ < _other.event_));
+ if (service_ < _other.service_) {
+ return true;
+ }
+ if (service_ == _other.service_ && instance_ < _other.instance_) {
+ return true;
+ }
+ if (service_ == _other.service_ && instance_ == _other.instance_
+ && event_ < _other.event_) {
+ return true;
+ }
+ if (service_ == _other.service_ && instance_ == _other.instance_
+ && event_ == _other.event_
+ && is_provided_ != _other.is_provided_) {
+ return true;
+ }
+ if (service_ == _other.service_
+ && instance_ == _other.instance_
+ && event_ == _other.event_
+ && is_provided_ == _other.is_provided_
+ && is_field_ != _other.is_field_) {
+ return true;
+ }
+ if (service_ == _other.service_
+ && instance_ == _other.instance_
+ && event_ == _other.event_
+ && is_provided_ == _other.is_provided_
+ && is_field_ == _other.is_field_
+ && eventgroups_ < _other.eventgroups_) {
+ return true;
+ }
+ return false;
}
};
std::set<event_data_t> pending_event_registrations_;
@@ -218,7 +243,7 @@ private:
std::map<service_t,
std::map<instance_t, std::map<eventgroup_t, uint32_t > > > remote_subscriber_count_;
- mutable std::recursive_mutex sender_mutex_;
+ mutable std::mutex sender_mutex_;
boost::asio::steady_timer register_application_timer_;
diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp
index 9cc0255..3cf89a7 100644
--- a/implementation/routing/include/routing_manager_stub.hpp
+++ b/implementation/routing/include/routing_manager_stub.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -82,7 +82,7 @@ public:
bool is_registered(client_t _client) const;
void deregister_erroneous_client(client_t _client);
client_t get_client() const;
-#ifndef WIN32
+#ifndef _WIN32
virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid);
#endif
private:
@@ -112,6 +112,7 @@ private:
private:
routing_manager_stub_host *host_;
boost::asio::io_service &io_;
+ std::mutex watchdog_timer_mutex_;
boost::asio::steady_timer watchdog_timer_;
std::string endpoint_path_;
diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp
index 65bb28b..cb8e83c 100644
--- a/implementation/routing/include/routing_manager_stub_host.hpp
+++ b/implementation/routing/include/routing_manager_stub_host.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
diff --git a/implementation/routing/include/serviceinfo.hpp b/implementation/routing/include/serviceinfo.hpp
index c890435..7eab1ec 100644
--- a/implementation/routing/include/serviceinfo.hpp
+++ b/implementation/routing/include/serviceinfo.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -63,6 +63,7 @@ private:
std::shared_ptr<endpoint> reliable_;
std::shared_ptr<endpoint> unreliable_;
+ mutable std::mutex endpoint_mutex_;
std::mutex requesters_mutex_;
std::set<client_t> requesters_;
diff --git a/implementation/routing/include/types.hpp b/implementation/routing/include/types.hpp
index c483aee..9b803a5 100644
--- a/implementation/routing/include/types.hpp
+++ b/implementation/routing/include/types.hpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp
index b6947d0..5ede55e 100644
--- a/implementation/routing/src/event.cpp
+++ b/implementation/routing/src/event.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -173,6 +173,7 @@ void event::unset_payload(bool _force) {
void event::set_update_cycle(std::chrono::milliseconds &_cycle) {
if (is_provided_) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
stop_cycle();
cycle_ = _cycle;
start_cycle();
@@ -190,8 +191,10 @@ void event::set_update_on_change(bool _is_active) {
}
void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func) {
- if (_epsilon_change_func)
+ if (_epsilon_change_func) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
epsilon_change_func_ = _epsilon_change_func;
+ }
}
const std::set<eventgroup_t> & event::get_eventgroups() const {
@@ -199,15 +202,18 @@ const std::set<eventgroup_t> & event::get_eventgroups() const {
}
void event::add_eventgroup(eventgroup_t _eventgroup) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
eventgroups_.insert(_eventgroup);
}
void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
eventgroups_ = _eventgroups;
}
void event::update_cbk(boost::system::error_code const &_error) {
if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
cycle_timer_.expires_from_now(cycle_);
notify(true);
std::function<void(boost::system::error_code const &)> its_handler =
@@ -302,6 +308,20 @@ bool event::has_ref() {
return refs_.size() != 0;
}
+bool event::has_ref(client_t _client, bool _is_provided) {
+ std::lock_guard<std::mutex> its_lock(refs_mutex_);
+ auto its_client = refs_.find(_client);
+ if (its_client != refs_.end()) {
+ auto its_provided = its_client->second.find(_is_provided);
+ if (its_provided != its_client->second.end()) {
+ if(its_provided->second > 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
bool event::is_shadow() const {
return is_shadow_;
}
diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp
index 3b3e657..0a32861 100644
--- a/implementation/routing/src/eventgroupinfo.cpp
+++ b/implementation/routing/src/eventgroupinfo.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -41,6 +41,7 @@ void eventgroupinfo::set_ttl(ttl_t _ttl) {
}
bool eventgroupinfo::is_multicast() const {
+ std::lock_guard<std::mutex> its_lock(address_mutex_);
return address_.is_multicast();
}
@@ -52,7 +53,7 @@ bool eventgroupinfo::is_sending_multicast() const {
bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address,
uint16_t &_port) const {
-
+ std::lock_guard<std::mutex> its_lock(address_mutex_);
if (address_.is_multicast()) {
_address = address_;
_port = port_;
@@ -63,28 +64,34 @@ bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address,
void eventgroupinfo::set_multicast(const boost::asio::ip::address &_address,
uint16_t _port) {
+ std::lock_guard<std::mutex> its_lock(address_mutex_);
address_ = _address;
port_ = _port;
}
const std::set<std::shared_ptr<event> > eventgroupinfo::get_events() const {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
return events_;
}
void eventgroupinfo::add_event(std::shared_ptr<event> _event) {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
events_.insert(_event);
}
void eventgroupinfo::remove_event(std::shared_ptr<event> _event) {
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
events_.erase(_event);
}
const std::list<eventgroupinfo::target_t> eventgroupinfo::get_targets() const {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
return targets_;
}
uint32_t eventgroupinfo::get_unreliable_target_count() const {
uint32_t _count(0);
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
for (auto i = targets_.begin(); i != targets_.end(); i++) {
if (!i->endpoint_->is_reliable()) {
_count++;
@@ -94,6 +101,7 @@ uint32_t eventgroupinfo::get_unreliable_target_count() const {
}
void eventgroupinfo::add_multicast_target(const eventgroupinfo::target_t &_multicast_target) {
+ std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_);
if (std::find(multicast_targets_.begin(), multicast_targets_.end(), _multicast_target)
== multicast_targets_.end()) {
multicast_targets_.push_back(_multicast_target);
@@ -101,14 +109,17 @@ void eventgroupinfo::add_multicast_target(const eventgroupinfo::target_t &_multi
}
void eventgroupinfo::clear_multicast_targets() {
+ std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_);
multicast_targets_.clear();
}
const std::list<eventgroupinfo::target_t> eventgroupinfo::get_multicast_targets() const {
+ std::lock_guard<std::mutex> its_lock(multicast_targets_mutex_);
return multicast_targets_;
}
bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target) {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
std::size_t its_size = targets_.size();
if (std::find(targets_.begin(), targets_.end(), _target) == targets_.end()) {
targets_.push_back(_target);
@@ -116,18 +127,30 @@ bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target) {
return (its_size != targets_.size());
}
-bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target, const eventgroupinfo::target_t &_subscriber) {
- std::size_t its_size = targets_.size();
- if (std::find(targets_.begin(), targets_.end(), _subscriber) == targets_.end()) {
- targets_.push_back(_subscriber);
+bool eventgroupinfo::add_target(const eventgroupinfo::target_t &_target,
+ const eventgroupinfo::target_t &_subscriber) {
+ bool add(false);
+ bool ret(false);
+ {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
+ std::size_t its_size = targets_.size();
+ if (std::find(targets_.begin(), targets_.end(), _subscriber)
+ == targets_.end()) {
+ targets_.push_back(_subscriber);
+ add = true;
+ }
+ ret = (its_size != targets_.size());
+ }
+ if (add) {
add_multicast_target(_target);
}
- return (its_size != targets_.size());
+ return ret;
}
bool eventgroupinfo::update_target(
const std::shared_ptr<endpoint_definition> &_target,
const std::chrono::steady_clock::time_point &_expiration) {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
for (auto i = targets_.begin(); i != targets_.end(); i++) {
if (i->endpoint_->get_address() == _target->get_address() &&
i->endpoint_->get_port() == _target->get_port()) {
@@ -140,6 +163,7 @@ bool eventgroupinfo::update_target(
bool eventgroupinfo::remove_target(
const std::shared_ptr<endpoint_definition> &_target) {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
std::size_t its_size = targets_.size();
for (auto i = targets_.begin(); i != targets_.end(); i++) {
@@ -153,6 +177,7 @@ bool eventgroupinfo::remove_target(
}
void eventgroupinfo::clear_targets() {
+ std::lock_guard<std::mutex> its_lock(targets_mutex_);
targets_.clear();
}
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index 751a588..ac5da9c 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -116,7 +116,8 @@ void routing_manager_base::request_service(client_t _client, service_t _service,
auto its_info = find_service(_service, _instance);
if (its_info) {
if ((_major == its_info->get_major()
- || DEFAULT_MAJOR == its_info->get_major())
+ || DEFAULT_MAJOR == its_info->get_major()
+ || ANY_MAJOR == _major)
&& (_minor <= its_info->get_minor()
|| DEFAULT_MINOR == its_info->get_minor()
|| _minor == ANY_MINOR)) {
@@ -332,6 +333,15 @@ void routing_manager_base::unsubscribe(client_t _client, service_t _service,
auto its_group = its_instance->second.find(_eventgroup);
if (its_group != its_instance->second.end()) {
its_group->second.erase(_client);
+ if (!its_group->second.size()) {
+ its_instance->second.erase(_eventgroup);
+ if (!its_instance->second.size()) {
+ its_service->second.erase(_instance);
+ if (!its_service->second.size()) {
+ eventgroup_clients_.erase(_service);
+ }
+ }
+ }
}
}
}
@@ -433,6 +443,40 @@ void routing_manager_base::send_pending_notify_ones(service_t _service, instance
}
}
+void routing_manager_base::unset_all_eventpayloads(service_t _service,
+ instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ for (const auto &eventgroupinfo : found_instance->second) {
+ for (const auto &event : eventgroupinfo.second->get_events()) {
+ event->unset_payload(true);
+ }
+ }
+ }
+ }
+}
+
+void routing_manager_base::unset_all_eventpayloads(service_t _service,
+ instance_t _instance,
+ eventgroup_t _eventgroup) {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ const auto found_service = eventgroups_.find(_service);
+ if (found_service != eventgroups_.end()) {
+ const auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ const auto found_eventgroup = found_instance->second.find(_eventgroup);
+ if (found_eventgroup != found_instance->second.end()) {
+ for (const auto &event : found_eventgroup->second->get_events()) {
+ event->unset_payload(true);
+ }
+ }
+ }
+ }
+}
+
bool routing_manager_base::send(client_t its_client,
std::shared_ptr<message> _message,
bool _flush) {
@@ -543,7 +587,7 @@ std::shared_ptr<endpoint> routing_manager_base::create_local_unlocked(client_t _
std::stringstream its_path;
its_path << VSOMEIP_BASE_PATH << std::hex << _client;
-#ifdef WIN32
+#ifdef _WIN32
boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1");
int port = VSOMEIP_INTERNAL_BASE_PORT + _client;
VSOMEIP_INFO << "Connecting to ["
@@ -554,7 +598,7 @@ std::shared_ptr<endpoint> routing_manager_base::create_local_unlocked(client_t _
#endif
std::shared_ptr<local_client_endpoint_impl> its_endpoint = std::make_shared<
local_client_endpoint_impl>(shared_from_this(),
-#ifdef WIN32
+#ifdef _WIN32
boost::asio::ip::tcp::endpoint(address, port)
#else
boost::asio::local::stream_protocol::endpoint(its_path.str())
@@ -609,6 +653,14 @@ std::shared_ptr<endpoint> routing_manager_base::find_or_create_local(client_t _c
}
void routing_manager_base::remove_local(client_t _client) {
+ auto subscriptions = get_subscriptions(_client);
+ for (auto its_subscription : subscriptions) {
+ host_->on_subscription(std::get<0>(its_subscription), std::get<1>(its_subscription),
+ std::get<2>(its_subscription), _client, false);
+ routing_manager_base::unsubscribe(_client, std::get<0>(its_subscription),
+ std::get<1>(its_subscription), std::get<2>(its_subscription));
+ }
+
std::shared_ptr<endpoint> its_endpoint(find_local(_client));
if (its_endpoint) {
its_endpoint->stop();
@@ -634,40 +686,6 @@ void routing_manager_base::remove_local(client_t _client) {
local_services_.erase(si.first);
}
}
- // delete client's subscriptions if he didn't unsubscribe properly
- {
- std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
- for (auto service_it = eventgroup_clients_.begin();
- service_it != eventgroup_clients_.end();) {
- for (auto instance_it = service_it->second.begin();
- instance_it != service_it->second.end();) {
- for (auto eventgroup_it = instance_it->second.begin();
- eventgroup_it != instance_it->second.end();) {
- if (eventgroup_it->second.erase(_client) > 0) {
- if (eventgroup_it->second.size() == 0) {
- eventgroup_it = instance_it->second.erase(eventgroup_it);
- } else {
- ++eventgroup_it;
- }
- } else {
- ++eventgroup_it;
- }
- }
-
- if (instance_it->second.size() == 0) {
- instance_it = service_it->second.erase(instance_it);
- } else {
- ++instance_it;
- }
- }
-
- if (service_it->second.size() == 0) {
- service_it = eventgroup_clients_.erase(service_it);
- } else {
- ++service_it;
- }
- }
- }
}
std::shared_ptr<endpoint> routing_manager_base::find_local(service_t _service,
@@ -904,7 +922,7 @@ void routing_manager_base::put_deserializer(std::shared_ptr<deserializer> _deser
deserializer_condition_.notify_one();
}
-#ifndef WIN32
+#ifndef _WIN32
bool routing_manager_base::check_credentials(client_t _client, uid_t _uid, gid_t _gid) {
return configuration_->check_credentials(_client, _uid, _gid);
}
@@ -934,5 +952,21 @@ void routing_manager_base::remove_pending_subscription(service_t _service,
if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it);
}
+std::set<std::tuple<service_t, instance_t, eventgroup_t>> routing_manager_base::get_subscriptions(const client_t _client) {
+ std::set<std::tuple<service_t, instance_t, eventgroup_t>> result;
+ std::lock_guard<std::mutex> its_lock(eventgroup_clients_mutex_);
+ for (auto its_service : eventgroup_clients_) {
+ for (auto its_instance : its_service.second) {
+ for (auto its_eventgroup : its_instance.second) {
+ auto its_client = its_eventgroup.second.find(_client);
+ if (its_client != its_eventgroup.second.end()) {
+ result.insert(std::make_tuple(its_service.first, its_instance.first, its_eventgroup.first));
+ }
+ }
+ }
+ }
+ return result;
+}
+
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 00108a2..e78f400 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -92,7 +92,7 @@ void routing_manager_impl::init() {
}
void routing_manager_impl::start() {
-#ifndef WIN32
+#ifndef _WIN32
netlink_connector_ = std::make_shared<netlink_connector>(host_->get_io(),
configuration_->get_unicast_address());
netlink_connector_->register_net_if_changes_handler(
@@ -115,9 +115,12 @@ void routing_manager_impl::start() {
}
#ifndef WITHOUT_SYSTEMD
- watchdog_timer_.expires_from_now(std::chrono::seconds(0));
- watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
- this, std::placeholders::_1));
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.expires_from_now(std::chrono::seconds(0));
+ watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
+ this, std::placeholders::_1));
+ }
#endif
}
@@ -126,14 +129,17 @@ void routing_manager_impl::stop() {
std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
version_log_timer_.cancel();
}
-#ifndef WIN32
+#ifndef _WIN32
if (netlink_connector_) {
netlink_connector_->stop();
}
#endif
#ifndef WITHOUT_SYSTEMD
- watchdog_timer_.cancel();
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.cancel();
+ }
sd_notify(0, "STOPPING=1");
#endif
@@ -251,10 +257,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
auto its_info = find_service(_service, _instance);
if (!its_info) {
- {
- std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
- requested_services_[_client][_service][_instance].insert({ _major, _minor });
- }
+ requested_service_add(_client, _service, _instance, _major, _minor);
if (discovery_) {
if (!configuration_->is_local_service(_service, _instance)) {
// Non local service instance ~> tell SD to find it!
@@ -270,16 +273,14 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
}
} else {
if ((_major == its_info->get_major()
- || DEFAULT_MAJOR == its_info->get_major())
+ || DEFAULT_MAJOR == its_info->get_major()
+ || ANY_MAJOR == _major)
&& (_minor <= its_info->get_minor()
|| DEFAULT_MINOR == its_info->get_minor()
|| _minor == ANY_MINOR)) {
if(!its_info->is_local()) {
- {
- std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
- requested_services_[_client][_service][_instance].insert({ _major, _minor });
- its_info->add_client(_client);
- }
+ requested_service_add(_client, _service, _instance, _major, _minor);
+ its_info->add_client(_client);
find_or_create_remote_client(_service, _instance, true, VSOMEIP_ROUTING_CLIENT);
if (_use_exclusive_proxy) {
std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(true);
@@ -310,20 +311,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
routing_manager_base::release_service(_client, _service, _instance);
-
- {
- std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
- auto its_client = requested_services_.find(_client);
- if (its_client != requested_services_.end()) {
- auto its_service = its_client->second.find(_service);
- if (its_service != its_client->second.end()) {
- auto its_instance = its_service->second.find(_instance);
- if (its_instance != its_service->second.end()) {
- its_service->second.erase(_instance);
- }
- }
- }
- }
+ requested_service_remove(_client, _service, _instance);
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
if(its_info && !its_info->is_local()) {
@@ -339,6 +327,7 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
clear_service_info(_service, _instance, false);
clear_identified_clients(_service, _instance);
clear_identifying_clients( _service, _instance);
+ unset_all_eventpayloads(_service, _instance);
} else {
remove_identified_client(_service, _instance, _client);
remove_identifying_client(_service, _instance, _client);
@@ -462,9 +451,15 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
_eventgroup);
if (found_eventgroup != found_instance->second.end()) {
found_eventgroup->second.erase(_client);
- if (0 == found_eventgroup->second.size()) {
+ if (!found_eventgroup->second.size()) {
last_subscriber_removed = true;
- eventgroup_clients_.erase(_eventgroup);
+ found_instance->second.erase(_eventgroup);
+ if (!found_service->second.size()) {
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ eventgroup_clients_.erase(_service);
+ }
+ }
}
}
}
@@ -487,6 +482,9 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service,
}
}
}
+ if (last_subscriber_removed) {
+ unset_all_eventpayloads(_service, _instance, _eventgroup);
+ }
if (subscriber == VSOMEIP_ROUTING_CLIENT && last_subscriber_removed) {
// for normal subscribers only unsubscribe via SD if last
// subscriber was removed
@@ -751,6 +749,27 @@ bool routing_manager_impl::send_to(const std::shared_ptr<endpoint_definition> &_
return false;
}
+void routing_manager_impl::register_event(
+ client_t _client, service_t _service, instance_t _instance,
+ event_t _event, const std::set<eventgroup_t> &_eventgroups,
+ bool _is_field, std::chrono::milliseconds _cycle,
+ bool _change_resets_cycle, epsilon_change_func_t _epsilon_change_func,
+ bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
+ auto its_event = find_event(_service, _instance, _event);
+ bool is_first(false);
+ if (its_event && !its_event->has_ref(_client, _is_provided)) {
+ is_first = true;
+ } else {
+ is_first = true;
+ }
+ if (is_first) {
+ routing_manager_base::register_event(_client, _service, _instance,
+ _event, _eventgroups, _is_field, _cycle, _change_resets_cycle,
+ _epsilon_change_func, _is_provided, _is_shadow,
+ _is_cache_placeholder);
+ }
+}
+
void routing_manager_impl::register_shadow_event(client_t _client,
service_t _service, instance_t _instance,
event_t _event, const std::set<eventgroup_t> &_eventgroups,
@@ -1549,7 +1568,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
its_endpoint->enable_magic_cookies();
}
} else {
-#ifndef WIN32
+#ifndef _WIN32
if (its_unicast.is_v4()) {
its_unicast = boost::asio::ip::address_v4::any();
} else if (its_unicast.is_v6()) {
@@ -1929,7 +1948,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (found_instance != found_service->second.end()) {
for (const auto &major_minor_pair : found_instance->second) {
if ((major_minor_pair.first == _major
- || _major == DEFAULT_MAJOR)
+ || _major == DEFAULT_MAJOR
+ || major_minor_pair.first == ANY_MAJOR)
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
@@ -1969,7 +1989,8 @@ std::chrono::milliseconds routing_manager_impl::add_routing_info(
if (found_instance != found_service->second.end()) {
for (const auto &major_minor_pair : found_instance->second) {
if ((major_minor_pair.first == _major
- || _major == DEFAULT_MAJOR)
+ || _major == DEFAULT_MAJOR
+ || major_minor_pair.first == ANY_MAJOR)
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
@@ -2339,14 +2360,14 @@ void routing_manager_impl::on_subscribe(
get_client(_subscriber);
}
}
- stub_->send_subscribe(find_local(_service, _instance),
- client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true);
// send initial events if we already have a cached field (is_set)
for (auto its_event : its_eventgroup->get_events()) {
if (its_event->is_field() && its_event->is_set()) {
its_event->notify_one(_subscriber, true); // TODO: use _flush parameter to send all event at once
}
}
+ stub_->send_subscribe(find_local(_service, _instance),
+ client, _service, _instance, _eventgroup, its_eventgroup->get_major(), true);
}
}
}
@@ -2397,25 +2418,27 @@ void routing_manager_impl::on_subscribe_ack(service_t _service,
instance_t _instance, const boost::asio::ip::address &_address,
uint16_t _port) {
- if (multicast_info.find(_service) != multicast_info.end()) {
- if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
- auto endpoint_def = multicast_info[_service][_instance];
- if (endpoint_def->get_address() == _address &&
- endpoint_def->get_port() == _port) {
-
- // Multicast info and endpoint already created before
- // This can happen when more than one client subscribe on the same instance!
- return;
+ {
+ std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
+ if (multicast_info.find(_service) != multicast_info.end()) {
+ if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
+ auto endpoint_def = multicast_info[_service][_instance];
+ if (endpoint_def->get_address() == _address &&
+ endpoint_def->get_port() == _port) {
+
+ // Multicast info and endpoint already created before
+ // This can happen when more than one client subscribe on the same instance!
+ return;
+ }
}
}
- }
-
- // Save multicast info to be able to delete the endpoint
- // as soon as the instance stops offering its service
- std::shared_ptr<endpoint_definition> endpoint_def =
- endpoint_definition::get(_address, _port, false);
- multicast_info[_service][_instance] = endpoint_def;
+ // Save multicast info to be able to delete the endpoint
+ // as soon as the instance stops offering its service
+ std::shared_ptr<endpoint_definition> endpoint_def =
+ endpoint_definition::get(_address, _port, false);
+ multicast_info[_service][_instance] = endpoint_def;
+ }
bool is_someip = configuration_->is_someip(_service, _instance);
// Create multicast endpoint & join multicase group
@@ -2645,12 +2668,19 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc
if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
std::string address = multicast_info[_service][_instance]->get_address().to_string();
uint16_t port = multicast_info[_service][_instance]->get_port();
- auto multicast_endpoint = server_endpoints_[port][false];
- multicast_endpoint->leave(address);
- multicast_endpoint->stop();
- server_endpoints_[port].erase(false);
- if (server_endpoints_[port].find(true) == server_endpoints_[port].end()) {
- server_endpoints_.erase(port);
+ std::shared_ptr<endpoint> multicast_endpoint;
+ auto found_port = server_endpoints_.find(port);
+ if (found_port != server_endpoints_.end()) {
+ auto found_unreliable = found_port->second.find(false);
+ if (found_unreliable != found_port->second.end()) {
+ multicast_endpoint = found_unreliable->second;
+ multicast_endpoint->leave(address);
+ multicast_endpoint->stop();
+ server_endpoints_[port].erase(false);
+ }
+ if (found_port->second.find(true) == found_port->second.end()) {
+ server_endpoints_.erase(port);
+ }
}
multicast_info[_service].erase(_instance);
if (0 >= multicast_info[_service].size()) {
@@ -2659,7 +2689,7 @@ void routing_manager_impl::clear_multicast_endpoints(service_t _service, instanc
// Clear service_instances_ for multicase endpoint
if (1 >= service_instances_[_service].size()) {
service_instances_.erase(_service);
- } else {
+ } else if (multicast_endpoint) {
service_instances_[_service].erase(multicast_endpoint.get());
}
}
@@ -3032,6 +3062,7 @@ void routing_manager_impl::watchdog_cbk(boost::system::error_code const &_error)
}
if (has_interval) {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
watchdog_timer_.expires_from_now(std::chrono::microseconds(its_interval / 2));
watchdog_timer_.async_wait(std::bind(&routing_manager_impl::watchdog_cbk,
this, std::placeholders::_1));
@@ -3501,7 +3532,7 @@ void routing_manager_impl::on_net_if_state_changed(std::string _if, bool _availa
if (_available) {
VSOMEIP_INFO << "Network interface \"" << _if << "\" is up and running.";
start_ip_routing();
-#ifndef WIN32
+#ifndef _WIN32
if (netlink_connector_) {
netlink_connector_->unregister_net_if_changes_handler();
}
@@ -3526,4 +3557,36 @@ void routing_manager_impl::start_ip_routing() {
pending_sd_offers_.clear();
}
+void routing_manager_impl::requested_service_add(client_t _client,
+ service_t _service,
+ instance_t _instance,
+ major_version_t _major,
+ minor_version_t _minor) {
+ std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
+ requested_services_[_client][_service][_instance].insert({ _major, _minor });
+}
+
+void routing_manager_impl::requested_service_remove(client_t _client,
+ service_t _service,
+ instance_t _instance) {
+ std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
+ auto found_client = requested_services_.find(_client);
+ if (found_client != requested_services_.end()) {
+ auto found_service = found_client->second.find(_service);
+ if (found_service != found_client->second.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ // delete all requested major/minor versions
+ found_service->second.erase(_instance);
+ if (!found_service->second.size()) {
+ found_client->second.erase(_service);
+ if (!found_client->second.size()) {
+ requested_services_.erase(client_);
+ }
+ }
+ }
+ }
+ }
+}
+
} // namespace vsomeip
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index a000211..e12e962 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -10,7 +10,7 @@
#include <future>
#include <forward_list>
-#ifndef WIN32
+#ifndef _WIN32
// for umask
#include <sys/types.h>
#include <sys/stat.h>
@@ -54,7 +54,7 @@ void routing_manager_proxy::init() {
routing_manager_base::init();
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
}
@@ -64,24 +64,24 @@ void routing_manager_proxy::init() {
void routing_manager_proxy::start() {
is_started_ = true;
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (!sender_) {
- // application has been stopped and started again
- sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
- }
- }
if (!receiver_) {
// application has been stopped and started again
init_receiver();
}
-
- if (sender_) {
- sender_->start();
+ if (receiver_) {
+ receiver_->start();
}
- if (receiver_)
- receiver_->start();
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (!sender_) {
+ // application has been stopped and started again
+ sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
+ }
+ if (sender_) {
+ sender_->start();
+ }
+ }
}
void routing_manager_proxy::stop() {
@@ -114,9 +114,15 @@ void routing_manager_proxy::stop() {
if (receiver_) {
receiver_->stop();
}
+ receiver_ = nullptr;
- if (sender_) {
- sender_->stop();
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->stop();
+ }
+ // delete the sender
+ sender_ = nullptr;
}
for (auto client: get_connected_clients()) {
@@ -125,15 +131,9 @@ void routing_manager_proxy::stop() {
}
}
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- // delete the sender
- sender_ = nullptr;
- }
- receiver_ = nullptr;
std::stringstream its_client;
its_client << VSOMEIP_BASE_PATH << std::hex << client_;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(its_client.str().c_str());
#else
if (-1 == ::unlink(its_client.str().c_str())) {
@@ -187,7 +187,7 @@ void routing_manager_proxy::send_offer_service(client_t _client,
sizeof(_minor));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -226,7 +226,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client,
sizeof(_minor));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -287,31 +287,39 @@ void routing_manager_proxy::register_event(client_t _client,
std::chrono::milliseconds _cycle, bool _change_resets_cycle,
epsilon_change_func_t _epsilon_change_func,
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
-
(void)_is_shadow;
(void)_is_cache_placeholder;
- routing_manager_base::register_event(_client, _service, _instance,
- _event,_eventgroups, _is_field,
- _cycle, _change_resets_cycle,
- _epsilon_change_func,
- _is_provided);
-
+ const event_data_t registration = {
+ _service,
+ _instance,
+ _event,
+ _is_field,
+ _is_provided,
+ _eventgroups
+ };
+ bool is_first(false);
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
- if (state_ == inner_state_type_e::ST_REGISTERED) {
+ is_first = pending_event_registrations_.find(registration)
+ == pending_event_registrations_.end();
+ if (is_first) {
+ pending_event_registrations_.insert(registration);
+ }
+ }
+ if (is_first) {
+ routing_manager_base::register_event(_client, _service, _instance,
+ _event,_eventgroups, _is_field,
+ _cycle, _change_resets_cycle,
+ _epsilon_change_func,
+ _is_provided);
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ if (state_ == inner_state_type_e::ST_REGISTERED && is_first) {
send_register_event(client_, _service, _instance,
_event, _eventgroups, _is_field, _is_provided);
}
- event_data_t registration = {
- _service,
- _instance,
- _event,
- _is_field,
- _is_provided,
- _eventgroups
- };
- pending_event_registrations_.insert(registration);
}
}
@@ -344,7 +352,7 @@ void routing_manager_proxy::unregister_event(client_t _client,
= static_cast<byte_t>(_is_provided);
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -418,11 +426,9 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service,
auto its_target = find_or_create_local(target_client);
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -445,16 +451,16 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
+ sizeof(_subscriber));
auto its_target = find_local(_subscriber);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -477,16 +483,16 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
+ sizeof(_subscriber));
auto its_target = find_local(_subscriber);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -497,7 +503,6 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
if (state_ == inner_state_type_e::ST_REGISTERED) {
- bool is_remote(false);
byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE
- VSOMEIP_COMMAND_HEADER_SIZE;
@@ -513,19 +518,16 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &is_remote,
- sizeof(is_remote));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = 0; // is_local
auto its_target = find_local(_service, _instance);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
- };
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
+ }
}
}
remove_pending_subscription(_service, _instance);
@@ -591,11 +593,18 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data,
_instance, _flush, _reliable, VSOMEIP_SEND);
}
}
- // If no direct endpoint could be found/is connected
+ // If no direct endpoint could be found
// or for notifications ~> route to routing_manager_stub
- if (!its_target || !its_target->is_connected()) {
+#ifdef USE_DLT
+ bool message_to_stub(false);
+#endif
+ if (!its_target) {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
its_target = sender_;
+#ifdef USE_DLT
+ message_to_stub = true;
+#endif
} else {
return false;
}
@@ -615,7 +624,7 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data,
}
}
#ifdef USE_DLT
- else if (its_target != sender_) {
+ else if (!message_to_stub) {
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
@@ -656,8 +665,11 @@ bool routing_manager_proxy::send_to(
}
void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) {
- if (_endpoint != sender_) {
- return;
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (_endpoint != sender_) {
+ return;
+ }
}
is_connected_ = true;
if (is_connected_ && is_started_) {
@@ -668,7 +680,10 @@ void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) {
}
void routing_manager_proxy::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
- is_connected_ = !(_endpoint == sender_);
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ is_connected_ = !(_endpoint == sender_);
+ }
if (!is_connected_) {
host_->on_state(state_type_e::ST_DEREGISTERED);
}
@@ -717,8 +732,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
instance_t its_instance;
eventgroup_t its_eventgroup;
major_version_t its_major;
- bool is_remote_subscriber;
+ uint8_t is_remote_subscriber;
client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host());
+ client_t its_subscriber;
if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) {
its_command = _data[VSOMEIP_COMMAND_TYPE_POS];
@@ -818,6 +834,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE:
+ if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -878,6 +898,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_UNSUBSCRIBE:
+ if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -905,14 +929,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_NACK:
+ if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
- on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup);
+ on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -921,14 +951,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_ACK:
+ if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_ACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
- on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup);
+ on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -1160,20 +1196,16 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
}
}
- if (clients_to_delete.size()) {
- std::async(std::launch::async, [this, clients_to_delete, restart_sender] () {
- for (const auto client : clients_to_delete) {
- if (client != VSOMEIP_ROUTING_CLIENT) {
- remove_local(client);
- }
- }
- });
+ for (const auto client : clients_to_delete) {
+ if (client != VSOMEIP_ROUTING_CLIENT) {
+ remove_local(client);
+ }
}
if (restart_sender && is_started_) {
VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
<<": Reconnecting to routing manager.";
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->start();
}
@@ -1191,7 +1223,7 @@ void routing_manager_proxy::register_application() {
if (is_connected_) {
std::lock_guard<std::mutex> its_state_lock(state_mutex_);
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
{
state_ = inner_state_type_e::ST_REGISTERING;
@@ -1219,7 +1251,7 @@ void routing_manager_proxy::deregister_application() {
sizeof(its_size));
if (is_connected_)
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(&its_command[0], uint32_t(its_command.size()));
}
@@ -1234,11 +1266,9 @@ void routing_manager_proxy::send_pong() const {
sizeof(client_t));
if (is_connected_) {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_pong, sizeof(its_pong));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_pong, sizeof(its_pong));
}
}
}
@@ -1268,7 +1298,7 @@ void routing_manager_proxy::send_request_service(client_t _client, service_t _se
sizeof(_use_exclusive_proxy));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -1294,7 +1324,7 @@ void routing_manager_proxy::send_release_service(client_t _client, service_t _se
sizeof(_instance));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -1336,7 +1366,7 @@ void routing_manager_proxy::send_register_event(client_t _client,
}
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command,
uint32_t(VSOMEIP_REGISTER_EVENT_COMMAND_SIZE
@@ -1361,8 +1391,7 @@ void routing_manager_proxy::on_subscribe_nack(client_t _client,
void routing_manager_proxy::on_identify_response(client_t _client, service_t _service,
instance_t _instance, bool _reliable) {
- static const uint32_t size = uint32_t(VSOMEIP_COMMAND_HEADER_SIZE + sizeof(service_t) + sizeof(instance_t)
- + sizeof(bool));
+ static const uint32_t size = VSOMEIP_ID_RESPONSE_COMMAND_SIZE;
byte_t its_command[size];
uint32_t its_size = size - VSOMEIP_COMMAND_HEADER_SIZE;
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_ID_RESPONSE;
@@ -1377,7 +1406,7 @@ void routing_manager_proxy::on_identify_response(client_t _client, service_t _se
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_reliable,
sizeof(_reliable));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, size);
}
@@ -1454,7 +1483,7 @@ void routing_manager_proxy::send_pending_commands() {
void routing_manager_proxy::init_receiver() {
std::stringstream its_client;
its_client << VSOMEIP_BASE_PATH << std::hex << client_;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(its_client.str().c_str());
int port = VSOMEIP_INTERNAL_BASE_PORT + client_;
#else
@@ -1472,14 +1501,14 @@ void routing_manager_proxy::init_receiver() {
#endif
try {
receiver_ = std::make_shared<local_server_endpoint_impl>(shared_from_this(),
-#ifdef WIN32
+#ifdef _WIN32
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
#else
boost::asio::local::stream_protocol::endpoint(its_client.str()),
#endif
io_, configuration_->get_max_message_size_local(),
configuration_->get_buffer_shrink_threshold());
-#ifdef WIN32
+#ifdef _WIN32
VSOMEIP_INFO << "Listening at " << port;
#else
VSOMEIP_INFO << "Listening at " << its_client.str();
@@ -1488,7 +1517,7 @@ void routing_manager_proxy::init_receiver() {
host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
VSOMEIP_ERROR << "Client ID: " << std::hex << client_ << ": " << e.what();
}
-#ifndef WIN32
+#ifndef _WIN32
::umask(previous_mask);
#endif
}
@@ -1514,7 +1543,7 @@ void routing_manager_proxy::notify_remote_initally(service_t _service, instance_
std::lock_guard<std::mutex> its_lock(serialize_mutex_);
if (serializer_->serialize(its_notification.get())) {
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
send_local(sender_, VSOMEIP_ROUTING_CLIENT, serializer_->get_data(),
serializer_->get_size(), _instance, true, false, VSOMEIP_NOTIFY);
@@ -1573,7 +1602,7 @@ void routing_manager_proxy::register_application_timeout_cbk(
}
}
if (register_again) {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
VSOMEIP_WARNING << std::hex << "Client 0x" << get_client() << " register timeout!"
<< " : Restart route to stub!";
if (sender_) {
@@ -1592,7 +1621,7 @@ void routing_manager_proxy::send_registered_ack() {
std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0,
sizeof(uint32_t));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, VSOMEIP_COMMAND_HEADER_SIZE);
}
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp
index ee75ec7..c048fe6 100644
--- a/implementation/routing/src/routing_manager_stub.cpp
+++ b/implementation/routing/src/routing_manager_stub.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -8,7 +8,7 @@
#include <iomanip>
#include <forward_list>
-#ifndef WIN32
+#ifndef _WIN32
// for umask
#include <sys/types.h>
#include <sys/stat.h>
@@ -101,12 +101,15 @@ void routing_manager_stub::stop() {
client_registration_thread_->join();
}
- watchdog_timer_.cancel();
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.cancel();
+ }
if( !is_socket_activated_) {
endpoint_->stop();
endpoint_ = nullptr;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(endpoint_path_.c_str());
#else
if (-1 == ::unlink(endpoint_path_.c_str())) {
@@ -119,7 +122,7 @@ void routing_manager_stub::stop() {
if(local_receiver_) {
local_receiver_->stop();
local_receiver_ = nullptr;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(local_receiver_path_.c_str());
#else
if (-1 == ::unlink(local_receiver_path_.c_str())) {
@@ -206,6 +209,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
bool is_remote_subscriber(false);
client_t its_client_from_header;
client_t its_target_client;
+ client_t its_subscriber;
its_command = _data[VSOMEIP_COMMAND_TYPE_POS];
std::memcpy(&its_client, &_data[VSOMEIP_COMMAND_CLIENT_POS],
@@ -255,6 +259,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_OFFER_SERVICE:
+ if (_size != VSOMEIP_OFFER_SERVICE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a OFFER_SERVICE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -269,6 +277,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_STOP_OFFER_SERVICE:
+ if (_size != VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a STOP_OFFER_SERVICE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -284,6 +296,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE:
+ if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -309,6 +325,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_UNSUBSCRIBE:
+ if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a UNSUBSCRIBE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -320,13 +340,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_ACK:
+ if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a SUBSCRIBE_ACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
- host_->on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup);
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
+ host_->on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -335,13 +361,19 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_NACK:
+ if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a SUBSCRIBE_NACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
- host_->on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup);
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
+ host_->on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -420,6 +452,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
}
case VSOMEIP_REQUEST_SERVICE:
+ if (_size != VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a REQUEST_SERVICE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -443,6 +479,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_RELEASE_SERVICE:
+ if (_size != VSOMEIP_RELEASE_SERVICE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a RELEASE_SERVICE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -452,6 +492,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_REGISTER_EVENT:
+ if (_size < VSOMEIP_REGISTER_EVENT_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a REGISTER_EVENT command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service,
&_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
@@ -490,6 +534,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_UNREGISTER_EVENT:
+ if (_size != VSOMEIP_UNREGISTER_EVENT_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a UNREGISTER_EVENT command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -516,6 +564,10 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_ID_RESPONSE:
+ if (_size != VSOMEIP_ID_RESPONSE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a ID_RESPONSE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance,
@@ -602,15 +654,13 @@ void routing_manager_stub::client_registration_func(void) {
} else {
on_deregister_application(r.first);
}
- {
- // Inform (de)registered client. All others will be informed after
- // the client acknowledged its registered state!
- // Don't inform client if we deregister because of an client
- // endpoint error to avoid writing in an already closed socket
+ // Inform (de)registered client. All others will be informed after
+ // the client acknowledged its registered state!
+ // Don't inform client if we deregister because of an client
+ // endpoint error to avoid writing in an already closed socket
+ if (b != registration_type_e::DEREGISTER_ERROR_CASE) {
std::lock_guard<std::mutex> its_guard(routing_info_mutex_);
- if (b != registration_type_e::DEREGISTER_ERROR_CASE) {
- send_routing_info(r.first);
- }
+ send_routing_info(r.first);
}
if (b != registration_type_e::REGISTER) {
{
@@ -649,7 +699,7 @@ void routing_manager_stub::init_routing_endpoint() {
} else if (num_fd == 1) {
native_socket_fd = SD_LISTEN_FDS_START + 0;
VSOMEIP_INFO << "Using native socket created by systemd socket activation! fd: " << native_socket_fd;
- #ifndef WIN32
+ #ifndef _WIN32
try {
endpoint_ =
std::make_shared < local_server_endpoint_impl
@@ -666,7 +716,7 @@ void routing_manager_stub::init_routing_endpoint() {
#endif
is_socket_activated_ = true;
} else {
- #if WIN32
+ #if _WIN32
::_unlink(endpoint_path_.c_str());
int port = VSOMEIP_INTERNAL_BASE_PORT;
VSOMEIP_INFO << "Routing endpoint at " << port;
@@ -684,7 +734,7 @@ void routing_manager_stub::init_routing_endpoint() {
endpoint_ =
std::make_shared < local_server_endpoint_impl
> (shared_from_this(),
- #ifdef WIN32
+ #ifdef _WIN32
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
#else
boost::asio::local::stream_protocol::endpoint(endpoint_path_),
@@ -697,7 +747,7 @@ void routing_manager_stub::init_routing_endpoint() {
VSOMEIP_ERROR << "routing_manager_stub::init_routing_endpoint Client ID: "
<< std::hex << VSOMEIP_ROUTING_CLIENT << ": " << e.what();
}
- #ifndef WIN32
+ #ifndef _WIN32
::umask(previous_mask);
#endif
is_socket_activated_ = false;
@@ -952,7 +1002,7 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi
client_t this_client = get_client();
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SUBSCRIBE_ACK;
std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &this_client,
- sizeof(_client));
+ sizeof(this_client));
std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size,
sizeof(its_size));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service,
@@ -961,6 +1011,8 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client,
+ sizeof(_client));
its_endpoint->send(&its_command[0], sizeof(its_command), true);
}
@@ -978,7 +1030,7 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv
client_t this_client = get_client();
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SUBSCRIBE_NACK;
std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &this_client,
- sizeof(_client));
+ sizeof(this_client));
std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size,
sizeof(its_size));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service,
@@ -987,6 +1039,8 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client,
+ sizeof(_client));
its_endpoint->send(&its_command[0], sizeof(its_command), true);
}
@@ -1024,7 +1078,8 @@ void routing_manager_stub::on_pong(client_t _client) {
if (found_info != routing_info_.end()) {
found_info->second.first = 0;
} else {
- VSOMEIP_ERROR << "Received PONG from unregistered application!";
+ VSOMEIP_ERROR << "Received PONG from unregistered application: "
+ << std::hex << std::setw(4) << std::setfill('0') << _client;
}
}
remove_from_pinged_clients(_client);
@@ -1032,17 +1087,21 @@ void routing_manager_stub::on_pong(client_t _client) {
}
void routing_manager_stub::start_watchdog() {
- // Divide / 2 as start and check sleep each
- watchdog_timer_.expires_from_now(
- std::chrono::milliseconds(configuration_->get_watchdog_timeout() / 2));
std::function<void(boost::system::error_code const &)> its_callback =
[this](boost::system::error_code const &_error) {
if (!_error)
- check_watchdog();
+ check_watchdog();
};
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ // Divide / 2 as start and check sleep each
+ watchdog_timer_.expires_from_now(
+ std::chrono::milliseconds(
+ configuration_->get_watchdog_timeout() / 2));
- watchdog_timer_.async_wait(its_callback);
+ watchdog_timer_.async_wait(its_callback);
+ }
}
void routing_manager_stub::check_watchdog() {
@@ -1054,8 +1113,6 @@ void routing_manager_stub::check_watchdog() {
}
broadcast_ping();
- watchdog_timer_.expires_from_now(
- std::chrono::milliseconds(configuration_->get_watchdog_timeout() / 2));
std::function<void(boost::system::error_code const &)> its_callback =
[this](boost::system::error_code const &_error) {
@@ -1077,8 +1134,13 @@ void routing_manager_stub::check_watchdog() {
}
start_watchdog();
};
-
- watchdog_timer_.async_wait(its_callback);
+ {
+ std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
+ watchdog_timer_.expires_from_now(
+ std::chrono::milliseconds(
+ configuration_->get_watchdog_timeout() / 2));
+ watchdog_timer_.async_wait(its_callback);
+ }
}
void routing_manager_stub::create_local_receiver() {
@@ -1088,7 +1150,7 @@ void routing_manager_stub::create_local_receiver() {
std::stringstream its_local_receiver_path;
its_local_receiver_path << VSOMEIP_BASE_PATH << std::hex << host_->get_client();
local_receiver_path_ = its_local_receiver_path.str();
-#if WIN32
+#if _WIN32
::_unlink(local_receiver_path_.c_str());
int port = VSOMEIP_INTERNAL_BASE_PORT;
#else
@@ -1109,7 +1171,7 @@ void routing_manager_stub::create_local_receiver() {
local_receiver_ =
std::make_shared < local_server_endpoint_impl
> (shared_from_this(),
- #ifdef WIN32
+ #ifdef _WIN32
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port + host_->get_client()),
#else
boost::asio::local::stream_protocol::endpoint(local_receiver_path_),
@@ -1122,7 +1184,7 @@ void routing_manager_stub::create_local_receiver() {
VSOMEIP_ERROR << "routing_manager_stub::_local_receiver Client ID: "
<< std::hex << VSOMEIP_ROUTING_CLIENT << ": " << e.what();
}
-#ifndef WIN32
+#ifndef _WIN32
::umask(previous_mask);
#endif
local_receiver_->start();
@@ -1293,7 +1355,7 @@ client_t routing_manager_stub::get_client() const {
return host_->get_client();
}
-#ifndef WIN32
+#ifndef _WIN32
bool routing_manager_stub::check_credentials(client_t _client, uid_t _uid, gid_t _gid) {
return configuration_->check_credentials(_client, _uid, _gid);
}
diff --git a/implementation/routing/src/serviceinfo.cpp b/implementation/routing/src/serviceinfo.cpp
index d497fe1..3afc937 100644
--- a/implementation/routing/src/serviceinfo.cpp
+++ b/implementation/routing/src/serviceinfo.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -64,11 +64,13 @@ void serviceinfo::set_precise_ttl(std::chrono::milliseconds _precise_ttl) {
}
std::shared_ptr<endpoint> serviceinfo::get_endpoint(bool _reliable) const {
+ std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
return (_reliable ? reliable_ : unreliable_);
}
void serviceinfo::set_endpoint(std::shared_ptr<endpoint> _endpoint,
bool _reliable) {
+ std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (_reliable) {
reliable_ = _endpoint;
} else {