diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_proxy.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_proxy.cpp | 244 |
1 files changed, 181 insertions, 63 deletions
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index 79e85f2..00359f9 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -43,7 +43,9 @@ routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host) : sender_(0), receiver_(0), register_application_timer_(io_), - logger_(logger::get()) + logger_(logger::get()), + request_debounce_timer_ (io_), + request_debounce_timer_running_(false) { } @@ -111,6 +113,11 @@ void routing_manager_proxy::stop() { is_started_ = false; its_lock.unlock(); + { + std::lock_guard<std::mutex> its_lock(request_timer_mutex_); + request_debounce_timer_.cancel(); + } + if (receiver_) { receiver_->stop(); } @@ -251,12 +258,28 @@ void routing_manager_proxy::request_service(client_t _client, _minor, _use_exclusive_proxy); { std::lock_guard<std::mutex> its_lock(state_mutex_); - if (state_ == inner_state_type_e::ST_REGISTERED) { - send_request_service(_client, _service, _instance, _major, _minor, - _use_exclusive_proxy); - } + size_t request_debouncing_time = configuration_->get_request_debouncing(host_->get_name()); service_data_t request = { _service, _instance, _major, _minor, _use_exclusive_proxy }; - pending_requests_.insert(request); + if (!request_debouncing_time) { + if (state_ == inner_state_type_e::ST_REGISTERED) { + std::set<service_data_t> requests; + requests.insert(request); + send_request_services(requests); + } + requests_.insert(request); + } else { + requests_to_debounce_.insert(request); + std::lock_guard<std::mutex> its_lock(request_timer_mutex_); + if (!request_debounce_timer_running_) { + request_debounce_timer_running_ = true; + request_debounce_timer_.expires_from_now(std::chrono::milliseconds(request_debouncing_time)); + request_debounce_timer_.async_wait( + std::bind( + &routing_manager_proxy::request_debounce_timeout_cbk, + std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()), + std::placeholders::_1)); + } + } } } @@ -266,18 +289,33 @@ void routing_manager_proxy::release_service(client_t _client, { std::lock_guard<std::mutex> its_lock(state_mutex_); remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT); - if (state_ == inner_state_type_e::ST_REGISTERED) { - send_release_service(_client, _service, _instance); - } - auto it = pending_requests_.begin(); - while (it != pending_requests_.end()) { + + bool pending(false); + auto it = requests_to_debounce_.begin(); + while (it != requests_to_debounce_.end()) { if (it->service_ == _service && it->instance_ == _instance) { - break; + pending = true; } it++; } - if (it != pending_requests_.end()) pending_requests_.erase(it); + if (it != requests_to_debounce_.end()) requests_to_debounce_.erase(it); + + if (!pending && state_ == inner_state_type_e::ST_REGISTERED) { + send_release_service(_client, _service, _instance); + } + + { + auto it = requests_.begin(); + while (it != requests_.end()) { + if (it->service_ == _service + && it->instance_ == _instance) { + break; + } + it++; + } + if (it != requests_.end()) requests_.erase(it); + } } } @@ -754,6 +792,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, uint8_t is_remote_subscriber; client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host()); client_t its_subscriber; + bool its_reliable; if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; @@ -1006,6 +1045,24 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, << std::hex << std::setw(4) << std::setfill('0') << its_event << "]"; break; + case VSOMEIP_ID_REQUEST: + if (_size < VSOMEIP_ID_REQUEST_COMMAND_SIZE) { + VSOMEIP_WARNING << "Received a VSOMEIP_ID_REQUEST command with wrong size ~> skip!"; + break; + } + std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS], + sizeof(its_service)); + std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], + sizeof(its_instance)); + std::memcpy(&its_major, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], + sizeof(its_major)); + std::memcpy(&its_reliable, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 5], + sizeof(its_reliable)); + + send_identify_request(its_service, its_instance, its_major, its_reliable); + + break; + default: break; } @@ -1034,33 +1091,8 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, } } } - VSOMEIP_INFO << std::hex << "Application/Client " << get_client() - << " is deregistered."; - - // inform host about its own registration state changes - host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED)); - - { - std::lock_guard<std::mutex> its_lock(state_mutex_); - state_ = inner_state_type_e::ST_DEREGISTERED; - } - - // Notify stop() call about clean deregistration - state_condition_.notify_one(); - - // Remove all local connections/endpoints - for (const auto client : clients_to_delete) { - if (client != VSOMEIP_ROUTING_CLIENT) { - remove_local(client); - } - } - VSOMEIP_INFO << std::hex << "Application/Client " << get_client() - <<": Reconnecting to routing manager."; - std::lock_guard<std::mutex> its_lock(sender_mutex_); - if (sender_) { - sender_->restart(); - } + reconnect(clients_to_delete); // Abort due to routing manager has stopped return; @@ -1242,6 +1274,33 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data, } } +void routing_manager_proxy::reconnect(const std::unordered_set<client_t> &_clients) { + // inform host about its own registration state changes + host_->on_state(static_cast<state_type_e>(inner_state_type_e::ST_DEREGISTERED)); + + { + std::lock_guard<std::mutex> its_lock(state_mutex_); + state_ = inner_state_type_e::ST_DEREGISTERED; + } + + // Notify stop() call about clean deregistration + state_condition_.notify_one(); + + // Remove all local connections/endpoints + for (const auto its_client : _clients) { + if (its_client != VSOMEIP_ROUTING_CLIENT) { + remove_local(its_client); + } + } + + VSOMEIP_INFO << std::hex << "Application/Client " << get_client() + <<": Reconnecting to routing manager."; + std::lock_guard<std::mutex> its_lock(sender_mutex_); + if (sender_) { + sender_->restart(); + } +} + void routing_manager_proxy::register_application() { byte_t its_command[] = { VSOMEIP_REGISTER_APPLICATION, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; @@ -1265,7 +1324,8 @@ void routing_manager_proxy::register_application() { register_application_timer_.async_wait( std::bind( &routing_manager_proxy::register_application_timeout_cbk, - this, std::placeholders::_1)); + std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()), + std::placeholders::_1)); } } } @@ -1303,37 +1363,43 @@ void routing_manager_proxy::send_pong() const { } } -void routing_manager_proxy::send_request_service(client_t _client, service_t _service, - instance_t _instance, major_version_t _major, - minor_version_t _minor, bool _use_exclusive_proxy) { - (void)_client; - - byte_t its_command[VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE]; - uint32_t its_size = VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE - - VSOMEIP_COMMAND_HEADER_SIZE; +void routing_manager_proxy::send_request_services(std::set<service_data_t>& _requests) { + if (!_requests.size()) { + return; + } + uint32_t its_size = (VSOMEIP_REQUEST_SERVICE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE) + * (uint32_t)_requests.size(); + std::vector<byte_t> its_command(its_size + VSOMEIP_COMMAND_HEADER_SIZE); its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_REQUEST_SERVICE; std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, - sizeof(_service)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, - sizeof(_instance)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4] = _major; - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 5], &_minor, - sizeof(_minor)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9], &_use_exclusive_proxy, - sizeof(_use_exclusive_proxy)); + + uint32_t entry_size = (sizeof(service_t) + sizeof(instance_t) + sizeof(major_version_t) + + sizeof(minor_version_t) + sizeof(bool)); + + int i = 0; + for (auto its_service : _requests) { + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + (i * entry_size)], &its_service.service_, + sizeof(its_service.service_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2 + (i * entry_size)], &its_service.instance_, + sizeof(its_service.instance_)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4 + (i * entry_size)] = its_service.major_; + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 5 + (i * entry_size)], &its_service.minor_, + sizeof(its_service.minor_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9 + (i * entry_size)], &its_service.use_exclusive_proxy_, + sizeof(its_service.use_exclusive_proxy_)); + ++i; + } { std::lock_guard<std::mutex> its_lock(sender_mutex_); if (sender_) { - sender_->send(its_command, sizeof(its_command)); + sender_->send(&its_command[0], its_size + VSOMEIP_COMMAND_HEADER_SIZE); } } - } void routing_manager_proxy::send_release_service(client_t _client, service_t _service, @@ -1504,10 +1570,7 @@ void routing_manager_proxy::send_pending_commands() { per.event_, per.eventgroups_, per.is_field_, per.is_provided_); - for (auto &po : pending_requests_) { - send_request_service(client_, po.service_, po.instance_, - po.major_, po.minor_, po.use_exclusive_proxy_); - } + send_request_services(requests_); } void routing_manager_proxy::init_receiver() { @@ -1681,4 +1744,59 @@ bool routing_manager_proxy::create_placeholder_event_and_subscribe( } return is_inserted; } + +void routing_manager_proxy::request_debounce_timeout_cbk( + boost::system::error_code const &_error) { + std::lock_guard<std::mutex> its_lock(state_mutex_); + if (!_error) { + if (requests_to_debounce_.size()) { + if (state_ == inner_state_type_e::ST_REGISTERED) { + send_request_services(requests_to_debounce_); + requests_.insert(requests_to_debounce_.begin(), + requests_to_debounce_.end()); + requests_to_debounce_.clear(); + } else { + { + std::lock_guard<std::mutex> its_lock(request_timer_mutex_); + request_debounce_timer_running_ = true; + request_debounce_timer_.expires_from_now(std::chrono::milliseconds(configuration_->get_request_debouncing(host_->get_name()))); + request_debounce_timer_.async_wait( + std::bind( + &routing_manager_proxy::request_debounce_timeout_cbk, + std::dynamic_pointer_cast<routing_manager_proxy>(shared_from_this()), + std::placeholders::_1)); + return; + } + } + } + } + { + std::lock_guard<std::mutex> its_lock(request_timer_mutex_); + request_debounce_timer_running_ = false; + } +} + +void routing_manager_proxy::register_client_error_handler(client_t _client, + const std::shared_ptr<endpoint> &_endpoint) { + _endpoint->register_error_handler( + std::bind(&routing_manager_proxy::handle_client_error, this, _client)); +} + +void routing_manager_proxy::handle_client_error(client_t _client) { + if (_client != VSOMEIP_ROUTING_CLIENT) { + VSOMEIP_INFO << "Client 0x" << std::hex << get_client() + << " handles a client error(" << std::hex << _client << ")"; + remove_local(_client); + } else { + bool should_reconnect(true); + { + std::unique_lock<std::mutex> its_lock(state_mutex_); + should_reconnect = is_started_; + } + if (should_reconnect) { + reconnect(known_clients_); + } + } +} + } // namespace vsomeip |