diff options
Diffstat (limited to 'implementation/endpoints/include/client_endpoint_impl.hpp')
-rw-r--r-- | implementation/endpoints/include/client_endpoint_impl.hpp | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index 518e696..f7e7567 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -6,15 +6,15 @@ #ifndef VSOMEIP_V3_CLIENT_ENDPOINT_IMPL_HPP_ #define VSOMEIP_V3_CLIENT_ENDPOINT_IMPL_HPP_ +#include <atomic> #include <condition_variable> #include <deque> #include <mutex> #include <vector> -#include <atomic> #include <chrono> +#include <thread> #include <boost/array.hpp> -#include <boost/asio/io_service.hpp> #include <boost/asio/strand.hpp> #include <boost/asio/ip/udp.hpp> #include <boost/utility.hpp> @@ -35,13 +35,13 @@ template<typename Protocol> class client_endpoint_impl: public endpoint_impl<Protocol>, public client_endpoint, public std::enable_shared_from_this<client_endpoint_impl<Protocol> > { public: - typedef typename Protocol::endpoint endpoint_type; - typedef typename Protocol::socket socket_type; + using endpoint_type = typename Protocol::endpoint; + using socket_type = typename Protocol::socket; client_endpoint_impl(const std::shared_ptr<endpoint_host>& _endpoint_host, const std::shared_ptr<routing_host>& _routing_host, const endpoint_type& _local, const endpoint_type& _remote, - boost::asio::io_service &_io, + boost::asio::io_context &_io, std::uint32_t _max_message_size, configuration::endpoint_queue_limit_t _queue_limit, const std::shared_ptr<configuration>& _configuration); @@ -56,7 +56,7 @@ public: const byte_t *_data, uint32_t _size); bool flush(); - void prepare_stop(endpoint::prepare_stop_handler_t _handler, service_t _service); + void prepare_stop(const endpoint::prepare_stop_handler_t &_handler, service_t _service); virtual void stop(); virtual void restart(bool _force = false) = 0; @@ -78,6 +78,7 @@ public: public: void connect_cbk(boost::system::error_code const &_error); void wait_connect_cbk(boost::system::error_code const &_error); + void wait_connecting_cbk(boost::system::error_code const &_error); virtual void send_cbk(boost::system::error_code const &_error, std::size_t _bytes, const message_buffer_ptr_t& _sent_msg); void flush_cbk(boost::system::error_code const &_error); @@ -94,8 +95,8 @@ protected: CONNECTED, ESTABLISHED }; - message_buffer_ptr_t get_front(); - virtual void send_queued(message_buffer_ptr_t _buffer) = 0; + std::pair<message_buffer_ptr_t, uint32_t> get_front(); + virtual void send_queued(std::pair<message_buffer_ptr_t, uint32_t> &_entry) = 0; virtual void get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, @@ -103,10 +104,13 @@ protected: void shutdown_and_close_socket(bool _recreate_socket); void shutdown_and_close_socket_unlocked(bool _recreate_socket); void start_connect_timer(); + void start_connecting_timer(); typename endpoint_impl<Protocol>::cms_ret_e check_message_size( const std::uint8_t * const _data, std::uint32_t _size); bool check_queue_limit(const uint8_t *_data, std::uint32_t _size) const; - void queue_train(bool _queue_size_zero_on_entry); + void queue_train(const std::shared_ptr<train> &_train, + bool _queue_size_zero_on_entry); + void update_last_departure(); protected: mutable std::mutex socket_mutex_; @@ -121,10 +125,20 @@ protected: std::atomic<cei_state_e> state_; std::atomic<std::uint32_t> reconnect_counter_; - // send data - train train_; + std::mutex connecting_timer_mutex_; + boost::asio::steady_timer connecting_timer_; + std::atomic<uint32_t> connecting_timeout_; + - std::deque<message_buffer_ptr_t> queue_; + // send data + std::shared_ptr<train> train_; + std::map<std::chrono::steady_clock::time_point, + std::deque<std::shared_ptr<train> > > dispatched_trains_; + boost::asio::steady_timer dispatch_timer_; + std::chrono::steady_clock::time_point last_departure_; + std::atomic<bool> has_last_departure_; + + std::deque<std::pair<message_buffer_ptr_t, uint32_t> > queue_; std::size_t queue_size_; mutable std::mutex mutex_; @@ -133,7 +147,7 @@ protected: std::atomic<std::uint16_t> local_port_; - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; private: virtual void set_local_port() = 0; @@ -142,8 +156,13 @@ private: method_t _method) const = 0; virtual std::uint32_t get_max_allowed_reconnects() const = 0; virtual void max_allowed_reconnects_reached() = 0; - void send_segments(const tp::tp_split_messages_t &_segments); - void wait_until_debounce_time_reached() const; + void send_segments(const tp::tp_split_messages_t &_segments, + std::uint32_t _separation_time); + + void schedule_train(); + + void start_dispatch_timer(const std::chrono::steady_clock::time_point &_now); + void cancel_dispatch_timer(); }; } // namespace vsomeip_v3 |