summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp338
1 files changed, 236 insertions, 102 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 66b3138..6850f66 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -30,7 +30,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
const std::shared_ptr<routing_host>& _routing_host,
const endpoint_type& _local,
const endpoint_type& _remote,
- boost::asio::io_service &_io,
+ boost::asio::io_context &_io,
std::uint32_t _max_message_size,
configuration::endpoint_queue_limit_t _queue_limit,
const std::shared_ptr<configuration>& _configuration)
@@ -41,7 +41,10 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
state_(cei_state_e::CLOSED),
reconnect_counter_(0),
- train_(_io),
+ connecting_timer_(_io), connecting_timeout_(VSOMEIP_DEFAULT_CONNECTING_TIMEOUT),
+ train_(std::make_shared<train>()),
+ dispatch_timer_(_io),
+ has_last_departure_(false),
queue_size_(0),
was_not_connected_(false),
local_port_(0),
@@ -50,26 +53,31 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
template<typename Protocol>
client_endpoint_impl<Protocol>::~client_endpoint_impl() {
+
}
template<typename Protocol>
bool client_endpoint_impl<Protocol>::is_client() const {
+
return true;
}
template<typename Protocol>
bool client_endpoint_impl<Protocol>::is_established() const {
+
return state_ == cei_state_e::ESTABLISHED;
}
template<typename Protocol>
bool client_endpoint_impl<Protocol>::is_established_or_connected() const {
+
return (state_ == cei_state_e::ESTABLISHED
|| state_ == cei_state_e::CONNECTED);
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::set_established(bool _established) {
+
if (_established) {
if (state_ != cei_state_e::CONNECTING) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
@@ -86,6 +94,7 @@ void client_endpoint_impl<Protocol>::set_established(bool _established) {
template<typename Protocol>
void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
+
if (_connected) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if (socket_->is_open()) {
@@ -100,7 +109,8 @@ void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
template<typename Protocol>
void client_endpoint_impl<Protocol>::prepare_stop(
- endpoint::prepare_stop_handler_t _handler, service_t _service) {
+ const endpoint::prepare_stop_handler_t &_handler, service_t _service) {
+
(void) _handler;
(void) _service;
}
@@ -129,12 +139,14 @@ void client_endpoint_impl<Protocol>::stop() {
}
template<typename Protocol>
-message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() {
- message_buffer_ptr_t its_buffer;
+std::pair<message_buffer_ptr_t, uint32_t>
+client_endpoint_impl<Protocol>::get_front() {
+
+ std::pair<message_buffer_ptr_t, uint32_t> its_entry;
if (queue_.size())
- its_buffer = queue_.front();
+ its_entry = queue_.front();
- return (its_buffer);
+ return (its_entry);
}
@@ -142,6 +154,7 @@ template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
uint32_t _size) {
+
(void)_target;
(void)_data;
(void)_size;
@@ -154,6 +167,7 @@ template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_error(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
uint32_t _size) {
+
(void)_target;
(void)_data;
(void)_size;
@@ -165,9 +179,11 @@ bool client_endpoint_impl<Protocol>::send_error(
template<typename Protocol>
bool client_endpoint_impl<Protocol>::send(const uint8_t *_data, uint32_t _size) {
+
std::lock_guard<std::mutex> its_lock(mutex_);
bool must_depart(false);
- const bool queue_size_zero_on_entry(queue_.empty());
+ auto its_now(std::chrono::steady_clock::now());
+
#if 0
std::stringstream msg;
msg << "cei::send: ";
@@ -192,8 +208,8 @@ bool client_endpoint_impl<Protocol>::send(const uint8_t *_data, uint32_t _size)
break;
}
- // STEP 1: Determine elapsed time and update the departure time and cancel the departure timer
- train_.update_departure_time_and_stop_departure();
+ // STEP 1: Cancel dispatch timer
+ cancel_dispatch_timer();
// STEP 3: Get configured timings
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
@@ -207,36 +223,36 @@ bool client_endpoint_impl<Protocol>::send(const uint8_t *_data, uint32_t _size)
// STEP 4: Check if the passenger enters an empty train
const std::pair<service_t, method_t> its_identifier = std::make_pair(
its_service, its_method);
- if (train_.passengers_.empty()) {
- train_.departure_ = its_retention;
+ if (train_->passengers_.empty()) {
+ train_->departure_ = its_now + its_retention; // latest possible
} else {
// STEP 4.1: Check whether the current train already contains the message
- if (train_.passengers_.end() != train_.passengers_.find(its_identifier)) {
+ if (train_->passengers_.end() != train_->passengers_.find(its_identifier)) {
must_depart = true;
} else {
// STEP 5: Check whether the current message fits into the current train
- if (train_.buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
+ if (train_->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
must_depart = true;
} else {
// STEP 6: Check debouncing time
- if (its_debouncing > train_.minimal_max_retention_time_) {
+ if (its_debouncing > train_->minimal_max_retention_time_) {
// train's latest departure would already undershot new
// passenger's debounce time
must_depart = true;
} else {
- if (its_debouncing > train_.departure_) {
+ if (its_now + its_debouncing > train_->departure_) {
// train departs earlier as the new passenger's debounce
// time allows
must_depart = true;
} else {
// STEP 7: Check maximum retention time
- if (its_retention < train_.minimal_debounce_time_) {
+ if (its_retention < train_->minimal_debounce_time_) {
// train's earliest departure would already exceed
// the new passenger's retention time.
must_depart = true;
} else {
- if (its_retention < train_.departure_) {
- train_.departure_ = its_retention;
+ if (its_now + its_retention < train_->departure_) {
+ train_->departure_ = its_now + its_retention;
}
}
}
@@ -248,50 +264,40 @@ bool client_endpoint_impl<Protocol>::send(const uint8_t *_data, uint32_t _size)
// STEP 8: if necessary, send current buffer and create a new one
if (must_depart) {
// STEP 8.1: check if debounce time would be undershot here if the train
- // departs. Block sending until train is allowed to depart.
- wait_until_debounce_time_reached();
- train_.passengers_.clear();
- queue_train(queue_size_zero_on_entry);
- train_.last_departure_ = std::chrono::steady_clock::now();
- train_.departure_ = its_retention;
- train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
- train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
+ // departs. Schedule departure of current train and create a new one.
+ schedule_train();
+
+ train_ = std::make_shared<train>();
+ train_->departure_ = its_now + its_retention;
}
// STEP 9: insert current message buffer
- train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
- train_.passengers_.insert(its_identifier);
+ train_->buffer_->insert(train_->buffer_->end(), _data, _data + _size);
+ train_->passengers_.insert(its_identifier);
// STEP 9.1: update the trains minimal debounce time if necessary
- if (its_debouncing < train_.minimal_debounce_time_) {
- train_.minimal_debounce_time_ = its_debouncing;
+ if (its_debouncing < train_->minimal_debounce_time_) {
+ train_->minimal_debounce_time_ = its_debouncing;
}
// STEP 9.2: update the trains minimal maximum retention time if necessary
- if (its_retention < train_.minimal_max_retention_time_) {
- train_.minimal_max_retention_time_ = its_retention;
+ if (its_retention < train_->minimal_max_retention_time_) {
+ train_->minimal_max_retention_time_ = its_retention;
}
- // STEP 10: restart timer with current departure time
-#ifndef _WIN32
- train_.departure_timer_->expires_from_now(train_.departure_);
-#else
- train_.departure_timer_->expires_from_now(
- std::chrono::duration_cast<
- std::chrono::steady_clock::duration>(train_.departure_));
-#endif
- train_.departure_timer_->async_wait(
- std::bind(&client_endpoint_impl<Protocol>::flush_cbk,
- this->shared_from_this(), std::placeholders::_1));
+ // STEP 10: restart dispatch timer with next departure time
+ start_dispatch_timer(its_now);
return true;
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::send_segments(
- const tp::tp_split_messages_t &_segments) {
+ const tp::tp_split_messages_t &_segments, std::uint32_t _separation_time) {
+
+ auto its_now(std::chrono::steady_clock::now());
+
if (_segments.size() == 0) {
return;
}
- const bool queue_size_zero_on_entry(queue_.empty());
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
(*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN],
@@ -303,53 +309,49 @@ void client_endpoint_impl<Protocol>::send_segments(
get_configured_times_from_endpoint(its_service, its_method,
&its_debouncing, &its_retention);
// update the trains minimal debounce time if necessary
- if (its_debouncing < train_.minimal_debounce_time_) {
- train_.minimal_debounce_time_ = its_debouncing;
+ if (its_debouncing < train_->minimal_debounce_time_) {
+ train_->minimal_debounce_time_ = its_debouncing;
}
// update the trains minimal maximum retention time if necessary
- if (its_retention < train_.minimal_max_retention_time_) {
- train_.minimal_max_retention_time_ = its_retention;
+ if (its_retention < train_->minimal_max_retention_time_) {
+ train_->minimal_max_retention_time_ = its_retention;
}
// We only need to respect the debouncing. There is no need to wait for further
// messages as we will send several now anyway.
- if (!train_.passengers_.empty()) {
- wait_until_debounce_time_reached();
- train_.passengers_.clear();
- queue_train(queue_size_zero_on_entry);
- train_.last_departure_ = std::chrono::steady_clock::now();
- train_.departure_ = its_retention;
- train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
- train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
+ if (!train_->passengers_.empty()) {
+ schedule_train();
+ train_->departure_ = its_now + its_retention;
}
+
const bool queue_size_still_zero(queue_.empty());
for (const auto& s : _segments) {
- queue_.emplace_back(s);
+ queue_.emplace_back(std::make_pair(s, _separation_time));
queue_size_ += s->size();
}
if (queue_size_still_zero && !queue_.empty()) { // no writing in progress
// respect minimal debounce time
- wait_until_debounce_time_reached();
+ schedule_train();
// ignore retention time and send immediately as the train is full anyway
- auto its_buffer = get_front();
- if (its_buffer) {
+ auto its_entry = get_front();
+ if (its_entry.first) {
strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
- this->shared_from_this(), its_buffer));
+ this->shared_from_this(), its_entry));
}
}
- train_.last_departure_ = std::chrono::steady_clock::now();
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::wait_until_debounce_time_reached() const {
- const std::chrono::nanoseconds time_since_last_departure =
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::steady_clock::now() - train_.last_departure_);
- if (time_since_last_departure < train_.minimal_debounce_time_) {
- std::this_thread::sleep_for(
- train_.minimal_debounce_time_ - time_since_last_departure);
+void client_endpoint_impl<Protocol>::schedule_train() {
+
+ if (has_last_departure_) {
+ if (last_departure_ + train_->minimal_debounce_time_ > train_->departure_) {
+ train_->departure_ = last_departure_ + train_->minimal_debounce_time_;
+ }
}
+
+ dispatched_trains_[train_->departure_].push_back(train_);
}
template<typename Protocol>
@@ -363,24 +365,58 @@ bool client_endpoint_impl<Protocol>::send(const std::vector<byte_t>& _cmd_header
template<typename Protocol>
bool client_endpoint_impl<Protocol>::flush() {
- bool is_successful(true);
+
+ bool has_queued(true);
+ bool is_current_train(true);
+
std::lock_guard<std::mutex> its_lock(mutex_);
- if (!train_.buffer_->empty()) {
- queue_train(!queue_.size());
- train_.last_departure_ = std::chrono::steady_clock::now();
- train_.passengers_.clear();
- train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
- train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
+
+ std::shared_ptr<train> its_train(train_);
+ if (!dispatched_trains_.empty()) {
+
+ auto its_dispatched = dispatched_trains_.begin();
+ if (its_dispatched->first <= its_train->departure_) {
+
+ is_current_train = false;
+ its_train = its_dispatched->second.front();
+ its_dispatched->second.pop_front();
+ if (its_dispatched->second.empty()) {
+
+ dispatched_trains_.erase(its_dispatched);
+ }
+ }
+ }
+
+ if (!its_train->buffer_->empty()) {
+
+ queue_train(its_train, !queue_.size());
+
+ // Reset current train if necessary
+ if (is_current_train) {
+ its_train->reset();
+ }
} else {
- is_successful = false;
+ has_queued = false;
}
- return is_successful;
+ if (!is_current_train || !dispatched_trains_.empty()) {
+
+ auto its_now(std::chrono::steady_clock::now());
+ start_dispatch_timer(its_now);
+ }
+
+ return (has_queued);
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::connect_cbk(
boost::system::error_code const &_error) {
+
+ if (_error != boost::asio::error::timed_out) {
+ std::lock_guard<std::mutex> its_lock(connecting_timer_mutex_);
+ connecting_timer_.cancel();
+ }
+
if (_error == boost::asio::error::operation_aborted
|| endpoint_impl<Protocol>::sending_blocked_) {
// endpoint was stopped
@@ -416,10 +452,10 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (was_not_connected_) {
was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- auto its_buffer = get_front();
- if (its_buffer) {
+ auto its_entry = get_front();
+ if (its_entry.first) {
strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
- this->shared_from_this(), its_buffer));
+ this->shared_from_this(), its_entry));
VSOMEIP_WARNING << __func__ << ": resume sending to: "
<< get_remote_information();
}
@@ -435,6 +471,7 @@ void client_endpoint_impl<Protocol>::connect_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
+
if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
auto self = this->shared_from_this();
strand_.dispatch(std::bind(&client_endpoint_impl::connect,
@@ -443,18 +480,32 @@ void client_endpoint_impl<Protocol>::wait_connect_cbk(
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::wait_connecting_cbk(
+ boost::system::error_code const &_error) {
+ if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
+ connect_cbk(boost::asio::error::timed_out);
+ }
+}
+
+template<typename Protocol>
void client_endpoint_impl<Protocol>::send_cbk(
boost::system::error_code const &_error, std::size_t _bytes,
const message_buffer_ptr_t& _sent_msg) {
+
(void)_bytes;
+
if (!_error) {
std::lock_guard<std::mutex> its_lock(mutex_);
if (queue_.size() > 0) {
- queue_size_ -= queue_.front()->size();
+ queue_size_ -= queue_.front().first->size();
queue_.pop_front();
- auto its_buffer = get_front();
- if (its_buffer)
- send_queued(its_buffer);
+
+ update_last_departure();
+
+ auto its_entry = get_front();
+ if (its_entry.first) {
+ send_queued(its_entry);
+ }
}
} else if (_error == boost::asio::error::broken_pipe) {
state_ = cei_state_e::CLOSED;
@@ -563,6 +614,7 @@ void client_endpoint_impl<Protocol>::send_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::flush_cbk(
boost::system::error_code const &_error) {
+
if (!_error) {
(void) flush();
}
@@ -570,15 +622,17 @@ void client_endpoint_impl<Protocol>::flush_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket(bool _recreate_socket) {
+
std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked(_recreate_socket);
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked(bool _recreate_socket) {
+
local_port_ = 0;
if (socket_->is_open()) {
-#ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
if (-1 == fcntl(socket_->native_handle(), F_GETFD)) {
VSOMEIP_ERROR << "cei::shutdown_and_close_socket_unlocked: socket/handle closed already '"
<< std::string(std::strerror(errno))
@@ -590,34 +644,39 @@ void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked(bool _re
socket_->close(its_error);
}
if (_recreate_socket) {
- socket_.reset(new socket_type(endpoint_impl<Protocol>::service_));
+ socket_.reset(new socket_type(endpoint_impl<Protocol>::io_));
}
}
template<typename Protocol>
bool client_endpoint_impl<Protocol>::get_remote_address(
boost::asio::ip::address &_address) const {
+
(void)_address;
return false;
}
template<typename Protocol>
std::uint16_t client_endpoint_impl<Protocol>::get_remote_port() const {
+
return 0;
}
template<typename Protocol>
std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const {
+
return local_port_;
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) {
+
local_port_ = _port;
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::start_connect_timer() {
+
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
std::chrono::milliseconds(connect_timeout_));
@@ -627,8 +686,20 @@ void client_endpoint_impl<Protocol>::start_connect_timer() {
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::start_connecting_timer() {
+
+ std::lock_guard<std::mutex> its_lock(connecting_timer_mutex_);
+ connecting_timer_.expires_from_now(
+ std::chrono::milliseconds(connecting_timeout_));
+ connecting_timer_.async_wait(
+ std::bind(&client_endpoint_impl<Protocol>::wait_connecting_cbk,
+ this->shared_from_this(), std::placeholders::_1));
+}
+
+template<typename Protocol>
typename endpoint_impl<Protocol>::cms_ret_e client_endpoint_impl<Protocol>::check_message_size(
const std::uint8_t * const _data, std::uint32_t _size) {
+
typename endpoint_impl<Protocol>::cms_ret_e ret(endpoint_impl<Protocol>::cms_ret_e::MSG_OK);
if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
&& _size > endpoint_impl<Protocol>::max_message_size_) {
@@ -640,11 +711,20 @@ typename endpoint_impl<Protocol>::cms_ret_e client_endpoint_impl<Protocol>::chec
_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
if (tp_segmentation_enabled(its_service, its_method)) {
- send_segments(tp::tp::tp_split_message(_data, _size));
- return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
+ instance_t its_instance = this->get_instance(its_service);
+ if (its_instance != 0xFFFF) {
+ std::uint16_t its_max_segment_length;
+ std::uint32_t its_separation_time;
+ this->configuration_->get_tp_configuration(
+ its_service, its_instance, its_method, true,
+ its_max_segment_length, its_separation_time);
+ send_segments(tp::tp::tp_split_message(_data, _size,
+ its_max_segment_length), its_separation_time);
+ return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
+ }
}
}
- VSOMEIP_ERROR << "cei::check_message_size: Dropping too big message ("
+ VSOMEIP_ERROR << "cei::check_message_size: Dropping to big message ("
<< std::dec << _size << " Bytes). Maximum allowed message size is: "
<< endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
ret = endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG;
@@ -654,6 +734,7 @@ typename endpoint_impl<Protocol>::cms_ret_e client_endpoint_impl<Protocol>::chec
template<typename Protocol>
bool client_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std::uint32_t _size) const {
+
if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
&& queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
service_t its_service(0);
@@ -692,31 +773,84 @@ bool client_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry) {
- queue_.push_back(train_.buffer_);
- queue_size_ += train_.buffer_->size();
- train_.buffer_ = std::make_shared<message_buffer_t>();
+void client_endpoint_impl<Protocol>::queue_train(
+ const std::shared_ptr<train> &_train, bool _queue_size_zero_on_entry) {
+
+ queue_.push_back(std::make_pair(_train->buffer_, 0));
+ queue_size_ += _train->buffer_->size();
+
if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- auto its_buffer = get_front();
- if (its_buffer) {
+ auto its_entry = get_front();
+ if (its_entry.first) {
strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
- this->shared_from_this(), its_buffer));
+ this->shared_from_this(), its_entry));
}
}
}
template<typename Protocol>
size_t client_endpoint_impl<Protocol>::get_queue_size() const {
+
std::lock_guard<std::mutex> its_lock(mutex_);
return queue_size_;
}
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::start_dispatch_timer(
+ const std::chrono::steady_clock::time_point &_now) {
+
+ // Choose the next train
+ std::shared_ptr<train> its_train(train_);
+ if (!dispatched_trains_.empty()) {
+
+ auto its_dispatched = dispatched_trains_.begin();
+ if (its_dispatched->first < its_train->departure_) {
+
+ its_train = its_dispatched->second.front();
+ }
+ }
+
+ std::chrono::nanoseconds its_offset;
+ if (its_train->departure_ > _now) {
+
+ its_offset = std::chrono::duration_cast<std::chrono::nanoseconds>(
+ its_train->departure_ - _now);
+ } else { // already departure time
+
+ its_offset = std::chrono::nanoseconds::zero();
+ }
+
+#if defined(__linux__) || defined(ANDROID)
+ dispatch_timer_.expires_from_now(its_offset);
+#else
+ dispatch_timer_.expires_from_now(
+ std::chrono::duration_cast<
+ std::chrono::steady_clock::duration>(its_offset));
+#endif
+ dispatch_timer_.async_wait(
+ std::bind(&client_endpoint_impl<Protocol>::flush_cbk,
+ this->shared_from_this(), std::placeholders::_1));
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::cancel_dispatch_timer() {
+
+ boost::system::error_code ec;
+ dispatch_timer_.cancel(ec);
+}
+
+template<typename Protocol>
+void client_endpoint_impl<Protocol>::update_last_departure() {
+
+ last_departure_ = std::chrono::steady_clock::now();
+ has_last_departure_ = true;
+}
+
// Instantiate template
-#ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
#endif
template class client_endpoint_impl<boost::asio::ip::tcp>;
template class client_endpoint_impl<boost::asio::ip::udp>;
} // namespace vsomeip_v3
-