summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_proxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_proxy.cpp')
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp244
1 files changed, 181 insertions, 63 deletions
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index 79e85f2..00359f9 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -43,7 +43,9 @@ routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host) :
sender_(0),
receiver_(0),
register_application_timer_(io_),
- logger_(logger::get())
+ logger_(logger::get()),
+ request_debounce_timer_ (io_),
+ request_debounce_timer_running_(false)
{
}
@@ -111,6 +113,11 @@ void routing_manager_proxy::stop() {
is_started_ = false;
its_lock.unlock();
+ {
+ std::lock_guard<std::mutex> its_lock(request_timer_mutex_);
+ request_debounce_timer_.cancel();
+ }
+
if (receiver_) {
receiver_->stop();
}
@@ -251,12 +258,28 @@ void routing_manager_proxy::request_service(client_t _client,
_minor, _use_exclusive_proxy);
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
- if (state_ == inner_state_type_e::ST_REGISTERED) {
- send_request_service(_client, _service, _instance, _major, _minor,
- _use_exclusive_proxy);
- }
+ size_t request_debouncing_time = configuration_->get_request_debouncing(host_->get_name());
service_data_t request = { _service, _instance, _major, _minor, _use_exclusive_proxy };
- pending_requests_.insert(request);
+ if (!request_debouncing_time) {
+ if (state_ == inner_state_type_e::ST_REGISTERED) {
+ std::set<service_data_t> requests;
+ requests.insert(request);
+ send_request_services(requests);
+ }
+ requests_.insert(request);
+ } else {
+ requests_to_debounce_.insert(request);
+ std::lock_guard<std::mutex> its_lock(request_timer_mutex_);
+ if (!request_debounce_timer_running_) {
+ request_debounce_timer_running_ = true;
+ request_debounce_timer_.expires_from_now(std::chrono::milliseconds(request_debouncing_time));
+ request_debounce_timer_.async_wait(
+ std::bind(
+ &routing_manager_proxy::request_debounce_timeout_cbk,
+ std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()),
+ std::placeholders::_1));
+ }
+ }
}
}
@@ -266,18 +289,33 @@ void routing_manager_proxy::release_service(client_t _client,
{
std::lock_guard<std::mutex> its_lock(state_mutex_);
remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT);
- if (state_ == inner_state_type_e::ST_REGISTERED) {
- send_release_service(_client, _service, _instance);
- }
- auto it = pending_requests_.begin();
- while (it != pending_requests_.end()) {
+
+ bool pending(false);
+ auto it = requests_to_debounce_.begin();
+ while (it != requests_to_debounce_.end()) {
if (it->service_ == _service
&& it->instance_ == _instance) {
- break;
+ pending = true;
}
it++;
}
- if (it != pending_requests_.end()) pending_requests_.erase(it);
+ if (it != requests_to_debounce_.end()) requests_to_debounce_.erase(it);
+
+ if (!pending && state_ == inner_state_type_e::ST_REGISTERED) {
+ send_release_service(_client, _service, _instance);
+ }
+
+ {
+ auto it = requests_.begin();
+ while (it != requests_.end()) {
+ if (it->service_ == _service
+ && it->instance_ == _instance) {
+ break;
+ }
+ it++;
+ }
+ if (it != requests_.end()) requests_.erase(it);
+ }
}
}
@@ -754,6 +792,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
uint8_t is_remote_subscriber;
client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host());
client_t its_subscriber;
+ bool its_reliable;
if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) {
its_command = _data[VSOMEIP_COMMAND_TYPE_POS];
@@ -1006,6 +1045,24 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
<< std::hex << std::setw(4) << std::setfill('0') << its_event << "]";
break;
+ case VSOMEIP_ID_REQUEST:
+ if (_size < VSOMEIP_ID_REQUEST_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received a VSOMEIP_ID_REQUEST command with wrong size ~> skip!";
+ break;
+ }
+ std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
+ sizeof(its_service));
+ std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
+ sizeof(its_instance));
+ std::memcpy(&its_major, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
+ sizeof(its_major));
+ std::memcpy(&its_reliable, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 5],
+ sizeof(its_reliable));
+
+ send_identify_request(its_service, its_instance, its_major, its_reliable);
+
+ break;
+
default:
break;
}
@@ -1034,33 +1091,8 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
}
}
}
- VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
- << " is deregistered.";
-
- // inform host about its own registration state changes
- host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED));
-
- {
- std::lock_guard<std::mutex> its_lock(state_mutex_);
- state_ = inner_state_type_e::ST_DEREGISTERED;
- }
-
- // Notify stop() call about clean deregistration
- state_condition_.notify_one();
-
- // Remove all local connections/endpoints
- for (const auto client : clients_to_delete) {
- if (client != VSOMEIP_ROUTING_CLIENT) {
- remove_local(client);
- }
- }
- VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
- <<": Reconnecting to routing manager.";
- std::lock_guard<std::mutex> its_lock(sender_mutex_);
- if (sender_) {
- sender_->restart();
- }
+ reconnect(clients_to_delete);
// Abort due to routing manager has stopped
return;
@@ -1242,6 +1274,33 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
}
}
+void routing_manager_proxy::reconnect(const std::unordered_set<client_t> &_clients) {
+ // inform host about its own registration state changes
+ host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED));
+
+ {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ state_ = inner_state_type_e::ST_DEREGISTERED;
+ }
+
+ // Notify stop() call about clean deregistration
+ state_condition_.notify_one();
+
+ // Remove all local connections/endpoints
+ for (const auto its_client : _clients) {
+ if (its_client != VSOMEIP_ROUTING_CLIENT) {
+ remove_local(its_client);
+ }
+ }
+
+ VSOMEIP_INFO << std::hex << "Application/Client " << get_client()
+ <<": Reconnecting to routing manager.";
+ std::lock_guard<std::mutex> its_lock(sender_mutex_);
+ if (sender_) {
+ sender_->restart();
+ }
+}
+
void routing_manager_proxy::register_application() {
byte_t its_command[] = {
VSOMEIP_REGISTER_APPLICATION, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
@@ -1265,7 +1324,8 @@ void routing_manager_proxy::register_application() {
register_application_timer_.async_wait(
std::bind(
&routing_manager_proxy::register_application_timeout_cbk,
- this, std::placeholders::_1));
+ std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()),
+ std::placeholders::_1));
}
}
}
@@ -1303,37 +1363,43 @@ void routing_manager_proxy::send_pong() const {
}
}
-void routing_manager_proxy::send_request_service(client_t _client, service_t _service,
- instance_t _instance, major_version_t _major,
- minor_version_t _minor, bool _use_exclusive_proxy) {
- (void)_client;
-
- byte_t its_command[VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE];
- uint32_t its_size = VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE
- - VSOMEIP_COMMAND_HEADER_SIZE;
+void routing_manager_proxy::send_request_services(std::set<service_data_t>& _requests) {
+ if (!_requests.size()) {
+ return;
+ }
+ uint32_t its_size = (VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE)
+ * (uint32_t)_requests.size();
+ std::vector<byte_t> its_command(its_size + VSOMEIP_COMMAND_HEADER_SIZE);
its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_REQUEST_SERVICE;
std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_,
sizeof(client_));
std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size,
sizeof(its_size));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service,
- sizeof(_service));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance,
- sizeof(_instance));
- its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4] = _major;
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 5], &_minor,
- sizeof(_minor));
- std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9], &_use_exclusive_proxy,
- sizeof(_use_exclusive_proxy));
+
+ uint32_t entry_size = (sizeof(service_t) + sizeof(instance_t) + sizeof(major_version_t)
+ + sizeof(minor_version_t) + sizeof(bool));
+
+ int i = 0;
+ for (auto its_service : _requests) {
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + (i * entry_size)], &its_service.service_,
+ sizeof(its_service.service_));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2 + (i * entry_size)], &its_service.instance_,
+ sizeof(its_service.instance_));
+ its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4 + (i * entry_size)] = its_service.major_;
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 5 + (i * entry_size)], &its_service.minor_,
+ sizeof(its_service.minor_));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9 + (i * entry_size)], &its_service.use_exclusive_proxy_,
+ sizeof(its_service.use_exclusive_proxy_));
+ ++i;
+ }
{
std::lock_guard<std::mutex> its_lock(sender_mutex_);
if (sender_) {
- sender_->send(its_command, sizeof(its_command));
+ sender_->send(&its_command[0], its_size + VSOMEIP_COMMAND_HEADER_SIZE);
}
}
-
}
void routing_manager_proxy::send_release_service(client_t _client, service_t _service,
@@ -1504,10 +1570,7 @@ void routing_manager_proxy::send_pending_commands() {
per.event_, per.eventgroups_,
per.is_field_, per.is_provided_);
- for (auto &po : pending_requests_) {
- send_request_service(client_, po.service_, po.instance_,
- po.major_, po.minor_, po.use_exclusive_proxy_);
- }
+ send_request_services(requests_);
}
void routing_manager_proxy::init_receiver() {
@@ -1681,4 +1744,59 @@ bool routing_manager_proxy::create_placeholder_event_and_subscribe(
}
return is_inserted;
}
+
+void routing_manager_proxy::request_debounce_timeout_cbk(
+ boost::system::error_code const &_error) {
+ std::lock_guard<std::mutex> its_lock(state_mutex_);
+ if (!_error) {
+ if (requests_to_debounce_.size()) {
+ if (state_ == inner_state_type_e::ST_REGISTERED) {
+ send_request_services(requests_to_debounce_);
+ requests_.insert(requests_to_debounce_.begin(),
+ requests_to_debounce_.end());
+ requests_to_debounce_.clear();
+ } else {
+ {
+ std::lock_guard<std::mutex> its_lock(request_timer_mutex_);
+ request_debounce_timer_running_ = true;
+ request_debounce_timer_.expires_from_now(std::chrono::milliseconds(configuration_->get_request_debouncing(host_->get_name())));
+ request_debounce_timer_.async_wait(
+ std::bind(
+ &routing_manager_proxy::request_debounce_timeout_cbk,
+ std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()),
+ std::placeholders::_1));
+ return;
+ }
+ }
+ }
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(request_timer_mutex_);
+ request_debounce_timer_running_ = false;
+ }
+}
+
+void routing_manager_proxy::register_client_error_handler(client_t _client,
+ const std::shared_ptr<endpoint> &_endpoint) {
+ _endpoint->register_error_handler(
+ std::bind(&routing_manager_proxy::handle_client_error, this, _client));
+}
+
+void routing_manager_proxy::handle_client_error(client_t _client) {
+ if (_client != VSOMEIP_ROUTING_CLIENT) {
+ VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
+ << " handles a client error(" << std::hex << _client << ")";
+ remove_local(_client);
+ } else {
+ bool should_reconnect(true);
+ {
+ std::unique_lock<std::mutex> its_lock(state_mutex_);
+ should_reconnect = is_started_;
+ }
+ if (should_reconnect) {
+ reconnect(known_clients_);
+ }
+ }
+}
+
} // namespace vsomeip