summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_proxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_proxy.cpp')
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp241
1 files changed, 135 insertions, 106 deletions
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index a000211..e12e962 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -10,7 +10,7 @@
#include <future>
#include <forward_list>
-#ifndef WIN32
+#ifndef _WIN32
// for umask
#include <sys/types.h>
#include <sys/stat.h>
@@ -54,7 +54,7 @@ void routing_manager_proxy::init() {
routing_manager_base::init();
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
}
@@ -64,24 +64,24 @@ void routing_manager_proxy::init() {
void routing_manager_proxy::start() {
is_started_ = true;
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (!sender_) {
- // application has been stopped and started again
- sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
- }
- }
if (!receiver_) {
// application has been stopped and started again
init_receiver();
}
-
- if (sender_) {
- sender_->start();
+ if (receiver_) {
+ receiver_->start();
}
- if (receiver_)
- receiver_->start();
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (!sender_) {
+ // application has been stopped and started again
+ sender_ = create_local(VSOMEIP_ROUTING_CLIENT);
+ }
+ if (sender_) {
+ sender_->start();
+ }
+ }
}
void routing_manager_proxy::stop() {
@@ -114,9 +114,15 @@ void routing_manager_proxy::stop() {
if (receiver_) {
receiver_->stop();
}
+ receiver_ = nullptr;
- if (sender_) {
- sender_->stop();
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->stop();
+ }
+ // delete the sender
+ sender_ = nullptr;
}
for (auto client: get_connected_clients()) {
@@ -125,15 +131,9 @@ void routing_manager_proxy::stop() {
}
}
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- // delete the sender
- sender_ = nullptr;
- }
- receiver_ = nullptr;
std::stringstream its_client;
its_client << VSOMEIP_BASE_PATH << std::hex << client_;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(its_client.str().c_str());
#else
if (-1 == ::unlink(its_client.str().c_str())) {
@@ -187,7 +187,7 @@ void routing_manager_proxy::send_offer_service(client_t _client,
sizeof(_minor));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -226,7 +226,7 @@ void routing_manager_proxy::stop_offer_service(client_t _client,
sizeof(_minor));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -287,31 +287,39 @@ void routing_manager_proxy::register_event(client_t _client,
std::chrono::milliseconds _cycle, bool _change_resets_cycle,
epsilon_change_func_t _epsilon_change_func,
bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
-
(void)_is_shadow;
(void)_is_cache_placeholder;
- routing_manager_base::register_event(_client, _service, _instance,
- _event,_eventgroups, _is_field,
- _cycle, _change_resets_cycle,
- _epsilon_change_func,
- _is_provided);
-
+ const event_data_t registration = {
+ _service,
+ _instance,
+ _event,
+ _is_field,
+ _is_provided,
+ _eventgroups
+ };
+ bool is_first(false);
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
- if (state_ == inner_state_type_e::ST_REGISTERED) {
+ is_first = pending_event_registrations_.find(registration)
+ == pending_event_registrations_.end();
+ if (is_first) {
+ pending_event_registrations_.insert(registration);
+ }
+ }
+ if (is_first) {
+ routing_manager_base::register_event(_client, _service, _instance,
+ _event,_eventgroups, _is_field,
+ _cycle, _change_resets_cycle,
+ _epsilon_change_func,
+ _is_provided);
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ if (state_ == inner_state_type_e::ST_REGISTERED && is_first) {
send_register_event(client_, _service, _instance,
_event, _eventgroups, _is_field, _is_provided);
}
- event_data_t registration = {
- _service,
- _instance,
- _event,
- _is_field,
- _is_provided,
- _eventgroups
- };
- pending_event_registrations_.insert(registration);
}
}
@@ -344,7 +352,7 @@ void routing_manager_proxy::unregister_event(client_t _client,
= static_cast<byte_t>(_is_provided);
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -418,11 +426,9 @@ void routing_manager_proxy::send_subscribe(client_t _client, service_t _service,
auto its_target = find_or_create_local(target_client);
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -445,16 +451,16 @@ void routing_manager_proxy::send_subscribe_nack(client_t _subscriber,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
+ sizeof(_subscriber));
auto its_target = find_local(_subscriber);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -477,16 +483,16 @@ void routing_manager_proxy::send_subscribe_ack(client_t _subscriber,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_subscriber,
+ sizeof(_subscriber));
auto its_target = find_local(_subscriber);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
}
}
}
@@ -497,7 +503,6 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
if (state_ == inner_state_type_e::ST_REGISTERED) {
- bool is_remote(false);
byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE];
uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE
- VSOMEIP_COMMAND_HEADER_SIZE;
@@ -513,19 +518,16 @@ void routing_manager_proxy::unsubscribe(client_t _client, service_t _service,
sizeof(_instance));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
sizeof(_eventgroup));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &is_remote,
- sizeof(is_remote));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = 0; // is_local
auto its_target = find_local(_service, _instance);
if (its_target) {
its_target->send(its_command, sizeof(its_command));
} else {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_command, sizeof(its_command));
- }
- };
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_command, sizeof(its_command));
+ }
}
}
remove_pending_subscription(_service, _instance);
@@ -591,11 +593,18 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data,
_instance, _flush, _reliable, VSOMEIP_SEND);
}
}
- // If no direct endpoint could be found/is connected
+ // If no direct endpoint could be found
// or for notifications ~> route to routing_manager_stub
- if (!its_target || !its_target->is_connected()) {
+#ifdef USE_DLT
+ bool message_to_stub(false);
+#endif
+ if (!its_target) {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
its_target = sender_;
+#ifdef USE_DLT
+ message_to_stub = true;
+#endif
} else {
return false;
}
@@ -615,7 +624,7 @@ bool routing_manager_proxy::send(client_t _client, const byte_t *_data,
}
}
#ifdef USE_DLT
- else if (its_target != sender_) {
+ else if (!message_to_stub) {
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
@@ -656,8 +665,11 @@ bool routing_manager_proxy::send_to(
}
void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) {
- if (_endpoint != sender_) {
- return;
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (_endpoint != sender_) {
+ return;
+ }
}
is_connected_ = true;
if (is_connected_ && is_started_) {
@@ -668,7 +680,10 @@ void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) {
}
void routing_manager_proxy::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
- is_connected_ = !(_endpoint == sender_);
+ {
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ is_connected_ = !(_endpoint == sender_);
+ }
if (!is_connected_) {
host_->on_state(state_type_e::ST_DEREGISTERED);
}
@@ -717,8 +732,9 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
instance_t its_instance;
eventgroup_t its_eventgroup;
major_version_t its_major;
- bool is_remote_subscriber;
+ uint8_t is_remote_subscriber;
client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host());
+ client_t its_subscriber;
if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) {
its_command = _data[VSOMEIP_COMMAND_TYPE_POS];
@@ -818,6 +834,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE:
+ if (_size != VSOMEIP_SUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a SUBSCRIBE command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -878,6 +898,10 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_UNSUBSCRIBE:
+ if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
@@ -905,14 +929,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_NACK:
+ if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
- on_subscribe_nack(its_client, its_service, its_instance, its_eventgroup);
+ on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -921,14 +951,20 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
break;
case VSOMEIP_SUBSCRIBE_ACK:
+ if (_size != VSOMEIP_SUBSCRIBE_ACK_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_ACK command with wrong size ~> skip!";
+ break;
+ }
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
sizeof(its_service));
std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
sizeof(its_instance));
std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
sizeof(its_eventgroup));
+ std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_subscriber));
- on_subscribe_ack(its_client, its_service, its_instance, its_eventgroup);
+ on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup);
VSOMEIP_INFO << "SUBSCRIBE ACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -1160,20 +1196,16 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
}
}
- if (clients_to_delete.size()) {
- std::async(std::launch::async, [this, clients_to_delete, restart_sender] () {
- for (const auto client : clients_to_delete) {
- if (client != VSOMEIP_ROUTING_CLIENT) {
- remove_local(client);
- }
- }
- });
+ for (const auto client : clients_to_delete) {
+ if (client != VSOMEIP_ROUTING_CLIENT) {
+ remove_local(client);
+ }
}
if (restart_sender && is_started_) {
VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
<<": Reconnecting to routing manager.";
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->start();
}
@@ -1191,7 +1223,7 @@ void routing_manager_proxy::register_application() {
if (is_connected_) {
std::lock_guard<std::mutex> its_state_lock(state_mutex_);
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
{
state_ = inner_state_type_e::ST_REGISTERING;
@@ -1219,7 +1251,7 @@ void routing_manager_proxy::deregister_application() {
sizeof(its_size));
if (is_connected_)
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(&its_command[0], uint32_t(its_command.size()));
}
@@ -1234,11 +1266,9 @@ void routing_manager_proxy::send_pong() const {
sizeof(client_t));
if (is_connected_) {
- {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->send(its_pong, sizeof(its_pong));
- }
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->send(its_pong, sizeof(its_pong));
}
}
}
@@ -1268,7 +1298,7 @@ void routing_manager_proxy::send_request_service(client_t _client, service_t _se
sizeof(_use_exclusive_proxy));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -1294,7 +1324,7 @@ void routing_manager_proxy::send_release_service(client_t _client, service_t _se
sizeof(_instance));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, sizeof(its_command));
}
@@ -1336,7 +1366,7 @@ void routing_manager_proxy::send_register_event(client_t _client,
}
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command,
uint32_t(VSOMEIP_REGISTER_EVENT_COMMAND_SIZE
@@ -1361,8 +1391,7 @@ void routing_manager_proxy::on_subscribe_nack(client_t _client,
void routing_manager_proxy::on_identify_response(client_t _client, service_t _service,
instance_t _instance, bool _reliable) {
- static const uint32_t size = uint32_t(VSOMEIP_COMMAND_HEADER_SIZE + sizeof(service_t) + sizeof(instance_t)
- + sizeof(bool));
+ static const uint32_t size = VSOMEIP_ID_RESPONSE_COMMAND_SIZE;
byte_t its_command[size];
uint32_t its_size = size - VSOMEIP_COMMAND_HEADER_SIZE;
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_ID_RESPONSE;
@@ -1377,7 +1406,7 @@ void routing_manager_proxy::on_identify_response(client_t _client, service_t _se
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_reliable,
sizeof(_reliable));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, size);
}
@@ -1454,7 +1483,7 @@ void routing_manager_proxy::send_pending_commands() {
void routing_manager_proxy::init_receiver() {
std::stringstream its_client;
its_client << VSOMEIP_BASE_PATH << std::hex << client_;
-#ifdef WIN32
+#ifdef _WIN32
::_unlink(its_client.str().c_str());
int port = VSOMEIP_INTERNAL_BASE_PORT + client_;
#else
@@ -1472,14 +1501,14 @@ void routing_manager_proxy::init_receiver() {
#endif
try {
receiver_ = std::make_shared<local_server_endpoint_impl>(shared_from_this(),
-#ifdef WIN32
+#ifdef _WIN32
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
#else
boost::asio::local::stream_protocol::endpoint(its_client.str()),
#endif
io_, configuration_->get_max_message_size_local(),
configuration_->get_buffer_shrink_threshold());
-#ifdef WIN32
+#ifdef _WIN32
VSOMEIP_INFO << "Listening at " << port;
#else
VSOMEIP_INFO << "Listening at " << its_client.str();
@@ -1488,7 +1517,7 @@ void routing_manager_proxy::init_receiver() {
host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED);
VSOMEIP_ERROR << "Client ID: " << std::hex << client_ << ": " << e.what();
}
-#ifndef WIN32
+#ifndef _WIN32
::umask(previous_mask);
#endif
}
@@ -1514,7 +1543,7 @@ void routing_manager_proxy::notify_remote_initally(service_t _service, instance_
std::lock_guard<std::mutex> its_lock(serialize_mutex_);
if (serializer_->serialize(its_notification.get())) {
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
send_local(sender_, VSOMEIP_ROUTING_CLIENT, serializer_->get_data(),
serializer_->get_size(), _instance, true, false, VSOMEIP_NOTIFY);
@@ -1573,7 +1602,7 @@ void routing_manager_proxy::register_application_timeout_cbk(
}
}
if (register_again) {
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
VSOMEIP_WARNING << std::hex << "Client 0x" << get_client() << " register timeout!"
<< " : Restart route to stub!";
if (sender_) {
@@ -1592,7 +1621,7 @@ void routing_manager_proxy::send_registered_ack() {
std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0,
sizeof(uint32_t));
{
- std::lock_guard<std::recursive_mutex> its_lock(sender_mutex_);
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
sender_->send(its_command, VSOMEIP_COMMAND_HEADER_SIZE);
}