summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp529
1 files changed, 300 insertions, 229 deletions
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index ddf6b25..0d06537 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -11,8 +11,13 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
-#include <boost/asio/ip/udp_ext.hpp>
+#if VSOMEIP_BOOST_VERSION < 106600
#include <boost/asio/local/stream_protocol_ext.hpp>
+#include <boost/asio/ip/udp_ext.hpp>
+#else
+#include <boost/asio/local/stream_protocol.hpp>
+#include <boost/asio/ip/udp.hpp>
+#endif
#include <vsomeip/defines.hpp>
#include <vsomeip/internal/logger.hpp>
@@ -30,69 +35,73 @@ template<typename Protocol>
server_endpoint_impl<Protocol>::server_endpoint_impl(
const std::shared_ptr<endpoint_host>& _endpoint_host,
const std::shared_ptr<routing_host>& _routing_host, endpoint_type _local,
- boost::asio::io_service &_io, std::uint32_t _max_message_size,
+ boost::asio::io_context &_io, std::uint32_t _max_message_size,
configuration::endpoint_queue_limit_t _queue_limit,
const std::shared_ptr<configuration>& _configuration)
: endpoint_impl<Protocol>(_endpoint_host, _routing_host, _local, _io, _max_message_size,
_queue_limit, _configuration),
sent_timer_(_io) {
+
is_sending_ = false;
}
template<typename Protocol>
server_endpoint_impl<Protocol>::~server_endpoint_impl() {
+
}
template<typename Protocol>
void server_endpoint_impl<Protocol>::prepare_stop(
- endpoint::prepare_stop_handler_t _handler, service_t _service) {
+ const endpoint::prepare_stop_handler_t &_handler, service_t _service) {
+
std::lock_guard<std::mutex> its_lock(mutex_);
bool queued_train(false);
+ std::vector<target_data_iterator_type> its_erased;
+ boost::system::error_code ec;
+
if (_service == ANY_SERVICE) { // endpoint is shutting down completely
endpoint_impl<Protocol>::sending_blocked_ = true;
- boost::system::error_code ec;
- for (auto const& train_iter : trains_) {
- train_iter.second->departure_timer_->cancel(ec);
- if (train_iter.second->buffer_->size() > 0) {
- auto target_queue_iter = queues_.find(train_iter.first);
- if (target_queue_iter != queues_.end()) {
- auto& its_qpair = target_queue_iter->second;
- const bool queue_size_zero_on_entry(its_qpair.second.empty());
- queue_train(target_queue_iter, train_iter.second,
- queue_size_zero_on_entry);
- queued_train = true;
- }
+ for (auto t = targets_.begin(); t != targets_.end(); t++) {
+ auto its_train (t->second.train_);
+ // cancel dispatch timer
+ t->second.dispatch_timer_->cancel(ec);
+ if (its_train->buffer_->size() > 0) {
+ const bool queue_size_zero_on_entry(t->second.queue_.empty());
+ if (queue_train(t, its_train, queue_size_zero_on_entry))
+ its_erased.push_back(t);
+ queued_train = true;
}
}
} else {
- for (auto const& train_iter : trains_) {
- for (auto const& passenger_iter : train_iter.second->passengers_) {
+ for (auto t = targets_.begin(); t != targets_.end(); t++) {
+ auto its_train(t->second.train_);
+ for (auto const& passenger_iter : its_train->passengers_) {
if (passenger_iter.first == _service) {
- // cancel departure timer
- boost::system::error_code ec;
- train_iter.second->departure_timer_->cancel(ec);
+ // cancel dispatch timer
+ t->second.dispatch_timer_->cancel(ec);
// queue train
- auto target_queue_iter = queues_.find(train_iter.first);
- if (target_queue_iter != queues_.end()) {
- const auto& its_qpair = target_queue_iter->second;
- const bool queue_size_zero_on_entry(its_qpair.second.empty());
- queue_train(target_queue_iter, train_iter.second,
- queue_size_zero_on_entry);
- queued_train = true;
- }
+ const bool queue_size_zero_on_entry(t->second.queue_.empty());
+ // TODO: Queue all(!) trains here...
+ if (queue_train(t, its_train, queue_size_zero_on_entry))
+ its_erased.push_back(t);
+ queued_train = true;
break;
}
}
}
}
+
+ for (const auto t : its_erased)
+ targets_.erase(t);
+
if (!queued_train) {
if (_service == ANY_SERVICE) {
- if (std::all_of(queues_.begin(), queues_.end(),
- [&](const typename queue_type::value_type& q)
- { return q.second.second.empty(); })) {
+ if (std::all_of(targets_.begin(), targets_.end(),
+ [&](const typename target_data_type::value_type &_t)
+ { return _t.second.queue_.empty(); })) {
// nothing was queued and all queues are empty -> ensure cbk is called
auto ptr = this->shared_from_this();
- endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){
+ endpoint_impl<Protocol>::io_.post([ptr, _handler, _service](){
_handler(ptr, _service);
});
} else {
@@ -101,11 +110,11 @@ void server_endpoint_impl<Protocol>::prepare_stop(
} else {
// check if any of the queues contains a message of to be stopped service
bool found_service_msg(false);
- for (const auto& q : queues_) {
- for (const auto& msg : q.second.second ) {
+ for (const auto &t : targets_) {
+ for (const auto &q : t.second.queue_) {
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*msg)[VSOMEIP_SERVICE_POS_MIN],
- (*msg)[VSOMEIP_SERVICE_POS_MAX]);
+ (*q.first)[VSOMEIP_SERVICE_POS_MIN],
+ (*q.first)[VSOMEIP_SERVICE_POS_MAX]);
if (its_service == _service) {
found_service_msg = true;
break;
@@ -119,7 +128,7 @@ void server_endpoint_impl<Protocol>::prepare_stop(
prepare_stop_handlers_[_service] = _handler;
} else { // no messages of the to be stopped service are or have been queued
auto ptr = this->shared_from_this();
- endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){
+ endpoint_impl<Protocol>::io_.post([ptr, _handler, _service](){
_handler(ptr, _service);
});
}
@@ -170,7 +179,7 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8
std::stringstream msg;
msg << "sei::send ";
for (uint32_t i = 0; i < _size; i++)
- msg << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
+ msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
endpoint_type its_target;
@@ -185,45 +194,38 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]);
- requests_mutex_.lock();
- auto found_client = requests_.find(its_client);
- if (found_client != requests_.end()) {
- auto its_request = std::make_tuple(its_service, its_method, its_session);
- auto found_request = found_client->second.find(its_request);
- if (found_request != found_client->second.end()) {
- its_target = found_request->second;
+ clients_mutex_.lock();
+ auto found_client = clients_.find(its_client);
+ if (found_client != clients_.end()) {
+ auto found_session = found_client->second.find(its_session);
+ if (found_session != found_client->second.end()) {
+ its_target = found_session->second;
is_valid_target = true;
- found_client->second.erase(found_request);
+ found_client->second.erase(its_session);
} else {
- VSOMEIP_WARNING << "server_endpoint::send: request ["
- << std::hex << std::setw(4) << std::setfill('0')
- << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0')
- << its_method << "/"
- << std::hex << std::setw(4) << std::setfill('0')
- << its_client << "."
- << std::hex << std::setw(4) << std::setfill('0')
- << its_session
- << "] could not be found.";
+ VSOMEIP_WARNING << "server_endpoint::send: session_id 0x"
+ << std::hex << its_session
+ << " not found for client 0x" << its_client;
+ const method_t its_method =
+ VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
if (its_service == VSOMEIP_SD_SERVICE
&& its_method == VSOMEIP_SD_METHOD) {
VSOMEIP_ERROR << "Clearing clients map as a request was "
"received on SD port";
- requests_.clear();
+ clients_.clear();
is_valid_target = get_default_target(its_service, its_target);
}
}
} else {
is_valid_target = get_default_target(its_service, its_target);
}
- requests_mutex_.unlock();
+ clients_mutex_.unlock();
if (is_valid_target) {
is_valid_target = send_intern(its_target, _data, _size);
@@ -276,9 +278,11 @@ bool server_endpoint_impl<Protocol>::send_intern(
}
}
- const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target);
+ const auto its_target_iterator = find_or_create_target_unlocked(_target);
+ auto &its_data(its_target_iterator->second);
bool must_depart(false);
+ auto its_now(std::chrono::steady_clock::now());
#if 0
std::stringstream msg;
@@ -287,15 +291,12 @@ bool server_endpoint_impl<Protocol>::send_intern(
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_DEBUG << msg.str();
#endif
- // STEP 1: determine the correct train
- std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target);
-
- const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
- if (!check_queue_limit(_data, _size, target_queue_iterator->second.first)) {
+ // STEP 1: Check queue limit
+ if (!check_queue_limit(_data, _size, its_data.queue_size_)) {
return false;
}
- // STEP 2: Determine elapsed time and update the departure time and cancel the timer
- target_train->update_departure_time_and_stop_departure();
+ // STEP 2: Cancel the dispatch timer
+ cancel_dispatch_timer(its_target_iterator);
// STEP 3: Get configured timings
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
@@ -310,38 +311,38 @@ bool server_endpoint_impl<Protocol>::send_intern(
}
// STEP 4: Check if the passenger enters an empty train
- const std::pair<service_t, method_t> its_identifier = std::make_pair(
- its_service, its_method);
- if (target_train->passengers_.empty()) {
- target_train->departure_ = its_retention;
+ const std::pair<service_t, method_t> its_identifier
+ = std::make_pair(its_service, its_method);
+ if (its_data.train_->passengers_.empty()) {
+ its_data.train_->departure_ = its_now + its_retention;
} else {
- if (target_train->passengers_.end()
- != target_train->passengers_.find(its_identifier)) {
+ if (its_data.train_->passengers_.end()
+ != its_data.train_->passengers_.find(its_identifier)) {
must_depart = true;
} else {
// STEP 5: Check whether the current message fits into the current train
- if (target_train->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
+ if (its_data.train_->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
must_depart = true;
} else {
// STEP 6: Check debouncing time
- if (its_debouncing > target_train->minimal_max_retention_time_) {
+ if (its_debouncing > its_data.train_->minimal_max_retention_time_) {
// train's latest departure would already undershot new
// passenger's debounce time
must_depart = true;
} else {
- if (its_debouncing > target_train->departure_) {
+ if (its_now + its_debouncing > its_data.train_->departure_) {
// train departs earlier as the new passenger's debounce
// time allows
must_depart = true;
} else {
// STEP 7: Check maximum retention time
- if (its_retention < target_train->minimal_debounce_time_) {
+ if (its_retention < its_data.train_->minimal_debounce_time_) {
// train's earliest departure would already exceed
// the new passenger's retention time.
must_depart = true;
} else {
- if (its_retention < target_train->departure_) {
- target_train->departure_ = its_retention;
+ if (its_now + its_retention < its_data.train_->departure_) {
+ its_data.train_->departure_ = its_now + its_retention;
}
}
}
@@ -354,52 +355,42 @@ bool server_endpoint_impl<Protocol>::send_intern(
if (must_depart) {
// STEP 8.1: check if debounce time would be undershot here if the train
// departs. Block sending until train is allowed to depart.
- wait_until_debounce_time_reached(target_train);
- queue_train(target_queue_iterator, target_train,
- queue_size_zero_on_entry);
- target_train->departure_ = its_retention;
+ schedule_train(its_data);
+
+ its_data.train_ = std::make_shared<train>();
+ its_data.train_->departure_ = its_now + its_retention;
}
// STEP 9: insert current message buffer
- target_train->buffer_->insert(target_train->buffer_->end(), _data, _data + _size);
- target_train->passengers_.insert(its_identifier);
+ its_data.train_->buffer_->insert(its_data.train_->buffer_->end(), _data, _data + _size);
+ its_data.train_->passengers_.insert(its_identifier);
// STEP 9.1: update the trains minimal debounce time if necessary
- if (its_debouncing < target_train->minimal_debounce_time_) {
- target_train->minimal_debounce_time_ = its_debouncing;
+ if (its_debouncing < its_data.train_->minimal_debounce_time_) {
+ its_data.train_->minimal_debounce_time_ = its_debouncing;
}
// STEP 9.2: update the trains minimal maximum retention time if necessary
- if (its_retention < target_train->minimal_max_retention_time_) {
- target_train->minimal_max_retention_time_ = its_retention;
+ if (its_retention < its_data.train_->minimal_max_retention_time_) {
+ its_data.train_->minimal_max_retention_time_ = its_retention;
}
// STEP 10: restart timer with current departure time
-#ifndef _WIN32
- target_train->departure_timer_->expires_from_now(target_train->departure_);
-#else
- target_train->departure_timer_->expires_from_now(
- std::chrono::duration_cast<
- std::chrono::steady_clock::duration>(target_train->departure_));
-#endif
- target_train->departure_timer_->async_wait(
- std::bind(&server_endpoint_impl<Protocol>::flush_cbk,
- this->shared_from_this(), _target,
- target_train, std::placeholders::_1));
+ start_dispatch_timer(its_target_iterator, its_now);
return (true);
}
template<typename Protocol>
void server_endpoint_impl<Protocol>::send_segments(
- const tp::tp_split_messages_t &_segments, const endpoint_type &_target) {
+ const tp::tp_split_messages_t &_segments, std::uint32_t _separation_time,
+ const endpoint_type &_target) {
if (_segments.size() == 0)
return;
- const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target);
- const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
+ const auto its_target_iterator = find_or_create_target_unlocked(_target);
+ auto &its_data = its_target_iterator->second;
- std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target);
- target_train->update_departure_time_and_stop_departure();
+ auto its_now(std::chrono::steady_clock::now());
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
(*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN], (*(_segments[0]))[VSOMEIP_SERVICE_POS_MAX]);
@@ -412,47 +403,63 @@ void server_endpoint_impl<Protocol>::send_segments(
&its_debouncing, &its_retention);
}
// update the trains minimal debounce time if necessary
- if (its_debouncing < target_train->minimal_debounce_time_) {
- target_train->minimal_debounce_time_ = its_debouncing;
+ if (its_debouncing < its_data.train_->minimal_debounce_time_) {
+ its_data.train_->minimal_debounce_time_ = its_debouncing;
}
// update the trains minimal maximum retention time if necessary
- if (its_retention < target_train->minimal_max_retention_time_) {
- target_train->minimal_max_retention_time_ = its_retention;
+ if (its_retention < its_data.train_->minimal_max_retention_time_) {
+ its_data.train_->minimal_max_retention_time_ = its_retention;
}
// We only need to respect the debouncing. There is no need to wait for further
// messages as we will send several now anyway.
- if (!target_train->passengers_.empty()) {
- wait_until_debounce_time_reached(target_train);
- queue_train(target_queue_iterator, target_train, queue_size_zero_on_entry);
+ if (!its_data.train_->passengers_.empty()) {
+ schedule_train(its_data);
+ its_data.train_->departure_ = its_now + its_retention;
}
- const bool queue_size_still_zero(target_queue_iterator->second.second.empty());
+ const bool queue_size_still_zero(its_data.queue_.empty());
for (const auto &s : _segments) {
- target_queue_iterator->second.second.emplace_back(s);
- target_queue_iterator->second.first += s->size();
+ its_data.queue_.emplace_back(std::make_pair(s, _separation_time));
+ its_data.queue_size_ += s->size();
}
- if (queue_size_still_zero && !target_queue_iterator->second.second.empty()) { // no writing in progress
+
+ if (queue_size_still_zero && !its_data.queue_.empty()) { // no writing in progress
// respect minimal debounce time
- wait_until_debounce_time_reached(target_train);
+ schedule_train(its_data);
// ignore retention time and send immediately as the train is full anyway
- send_queued(target_queue_iterator);
+ (void)send_queued(its_target_iterator);
}
- target_train->last_departure_ = std::chrono::steady_clock::now();
}
template<typename Protocol>
-void server_endpoint_impl<Protocol>::wait_until_debounce_time_reached(
- const std::shared_ptr<train>& _train) const {
- const std::chrono::nanoseconds time_since_last_departure =
- std::chrono::duration_cast<std::chrono::nanoseconds>(
- std::chrono::steady_clock::now() - _train->last_departure_);
-
- if (time_since_last_departure < _train->minimal_debounce_time_) {
- std::this_thread::sleep_for(
- _train->minimal_debounce_time_ - time_since_last_departure);
+typename server_endpoint_impl<Protocol>::target_data_iterator_type
+server_endpoint_impl<Protocol>::find_or_create_target_unlocked(endpoint_type _target) {
+
+ auto its_iterator = targets_.find(_target);
+ if (its_iterator == targets_.end()) {
+
+ auto its_result = targets_.emplace(
+ std::make_pair(_target, endpoint_data_type(this->io_)));
+ its_iterator = its_result.first;
}
+
+ return (its_iterator);
}
+template<typename Protocol>
+void server_endpoint_impl<Protocol>::schedule_train(endpoint_data_type &_data) {
+
+ if (_data.has_last_departure_) {
+ if (_data.last_departure_ + _data.train_->minimal_debounce_time_
+ > _data.train_->departure_) {
+ _data.train_->departure_ = _data.last_departure_
+ + _data.train_->minimal_debounce_time_;
+ }
+ }
+
+ _data.dispatched_trains_[_data.train_->departure_]
+ .push_back(_data.train_);
+}
template<typename Protocol>
typename endpoint_impl<Protocol>::cms_ret_e server_endpoint_impl<Protocol>::check_message_size(
@@ -469,11 +476,21 @@ typename endpoint_impl<Protocol>::cms_ret_e server_endpoint_impl<Protocol>::chec
_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
if (tp_segmentation_enabled(its_service, its_method)) {
- send_segments(tp::tp::tp_split_message(_data, _size), _target);
- return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
+ instance_t its_instance = this->get_instance(its_service);
+ if (its_instance != 0xFFFF) {
+ std::uint16_t its_max_segment_length;
+ std::uint32_t its_separation_time;
+
+ this->configuration_->get_tp_configuration(
+ its_service, its_instance, its_method, false,
+ its_max_segment_length, its_separation_time);
+ send_segments(tp::tp::tp_split_message(_data, _size,
+ its_max_segment_length), its_separation_time, _target);
+ return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
+ }
}
}
- VSOMEIP_ERROR << "sei::send_intern: Dropping too big message (" << _size
+ VSOMEIP_ERROR << "sei::send_intern: Dropping to big message (" << _size
<< " Bytes). Maximum allowed message size is: "
<< endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
ret = endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG;
@@ -523,72 +540,67 @@ bool server_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std
}
template<typename Protocol>
-void server_endpoint_impl<Protocol>::queue_train(
- const queue_iterator_type _queue_iterator,
- const std::shared_ptr<train>& _train,
- bool _queue_size_zero_on_entry) {
- _queue_iterator->second.second.emplace_back(_train->buffer_);
- _queue_iterator->second.first += _train->buffer_->size();
- _train->last_departure_ = std::chrono::steady_clock::now();
- _train->passengers_.clear();
- _train->buffer_ = std::make_shared<message_buffer_t>();
- _train->minimal_debounce_time_ = std::chrono::nanoseconds::max();
- _train->minimal_max_retention_time_ = std::chrono::nanoseconds::max();
- if (_queue_size_zero_on_entry && !_queue_iterator->second.second.empty()) { // no writing in progress
- send_queued(_queue_iterator);
- }
-}
+bool server_endpoint_impl<Protocol>::queue_train(
+ target_data_iterator_type _it, const std::shared_ptr<train> &_train,
+ bool _queue_size_zero_on_entry) {
-template<typename Protocol>
-typename server_endpoint_impl<Protocol>::queue_iterator_type
-server_endpoint_impl<Protocol>::find_or_create_queue_unlocked(const endpoint_type& _target) {
- queue_iterator_type target_queue_iterator = queues_.find(_target);
- if (target_queue_iterator == queues_.end()) {
- target_queue_iterator = queues_.insert(queues_.begin(),
- std::make_pair(
- _target,
- std::make_pair(std::size_t(0),
- std::deque<message_buffer_ptr_t>())
- ));
- }
- return target_queue_iterator;
-}
+ bool must_erase(false);
-template<typename Protocol>
-std::shared_ptr<train> server_endpoint_impl<Protocol>::find_or_create_train_unlocked(
- const endpoint_type& _target) {
- auto train_iter = trains_.find(_target);
- if (train_iter == trains_.end()) {
- train_iter = trains_.insert(trains_.begin(),
- std::make_pair(_target, std::make_shared<train>(this->service_)));
+ auto &its_data = _it->second;
+ its_data.queue_.push_back(std::make_pair(_train->buffer_, 0));
+ its_data.queue_size_ += _train->buffer_->size();
+
+ if (_queue_size_zero_on_entry && !its_data.queue_.empty()) { // no writing in progress
+ must_erase = send_queued(_it);
}
- return train_iter->second;
+
+ return must_erase;
}
template<typename Protocol>
-bool server_endpoint_impl<Protocol>::flush(
- endpoint_type _target,
- const std::shared_ptr<train>& _train) {
+bool server_endpoint_impl<Protocol>::flush(target_data_iterator_type _it) {
+
+ bool has_queued(true);
+ bool is_current_train(true);
+ auto &its_data = _it->second;
+
std::lock_guard<std::mutex> its_lock(mutex_);
- bool is_flushed = false;
- if (!_train->buffer_->empty()) {
- const queue_iterator_type target_queue_iterator = queues_.find(_target);
- if (target_queue_iterator != queues_.end()) {
- const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
- queue_train(target_queue_iterator, _train, queue_size_zero_on_entry);
- is_flushed = true;
- } else {
- std::stringstream ss;
- ss << "sei::flush couldn't find target queue, won't queue train to: "
- << get_remote_information(_target) << " passengers: ";
- for (const auto& p : _train->passengers_) {
- ss << "[" << std::hex << std::setw(4) << std::setfill('0')
- << p.first << ":" << p.second << "] ";
+
+ auto its_train(its_data.train_);
+ if (!its_data.dispatched_trains_.empty()) {
+
+ auto its_dispatched = its_data.dispatched_trains_.begin();
+ if (its_dispatched->first <= its_train->departure_) {
+
+ is_current_train = false;
+ its_train = its_dispatched->second.front();
+ its_dispatched->second.pop_front();
+ if (its_dispatched->second.empty()) {
+
+ its_data.dispatched_trains_.erase(its_dispatched);
}
- VSOMEIP_WARNING << ss.str();
}
}
- return is_flushed;
+
+ if (!its_train->buffer_->empty()) {
+
+ queue_train(_it, its_train, its_data.queue_.empty());
+
+ // Reset current train if necessary
+ if (is_current_train) {
+ its_train->reset();
+ }
+ } else {
+ has_queued = false;
+ }
+
+ if (!is_current_train || !its_data.dispatched_trains_.empty()) {
+
+ auto its_now(std::chrono::steady_clock::now());
+ start_dispatch_timer(_it, its_now);
+ }
+
+ return (has_queued);
}
template<typename Protocol>
@@ -599,7 +611,7 @@ void server_endpoint_impl<Protocol>::connect_cbk(
template<typename Protocol>
void server_endpoint_impl<Protocol>::send_cbk(
- const queue_iterator_type _queue_iterator,
+ const target_data_iterator_type _it,
boost::system::error_code const &_error, std::size_t _bytes) {
(void)_bytes;
@@ -623,11 +635,11 @@ void server_endpoint_impl<Protocol>::send_cbk(
++stp_hndlr_iter;
continue;
}
- for (const auto& q : queues_) {
- for (const auto& msg : q.second.second ) {
+ for (const auto& t : targets_) {
+ for (const auto& e : t.second.queue_ ) {
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*msg)[VSOMEIP_SERVICE_POS_MIN],
- (*msg)[VSOMEIP_SERVICE_POS_MAX]);
+ (*e.first)[VSOMEIP_SERVICE_POS_MIN],
+ (*e.first)[VSOMEIP_SERVICE_POS_MAX]);
if (its_service == its_stopped_service) {
found_service_msg = true;
break;
@@ -639,14 +651,13 @@ void server_endpoint_impl<Protocol>::send_cbk(
}
if (found_service_msg) {
++stp_hndlr_iter;
- found_service_msg = false;
} else { // all messages of the to be stopped service have been sent
auto handler = stp_hndlr_iter->second;
auto ptr = this->shared_from_this();
- #ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
endpoint_impl<Protocol>::
- #endif
- service_.post([ptr, handler, its_stopped_service](){
+#endif
+ io_.post([ptr, handler, its_stopped_service](){
handler(ptr, its_stopped_service);
});
stp_hndlr_iter = prepare_stop_handlers_.erase(stp_hndlr_iter);
@@ -660,22 +671,18 @@ void server_endpoint_impl<Protocol>::send_cbk(
// prepare_stop_handlers have been queued ensure to call them as well
check_if_all_msgs_for_stopped_service_are_sent();
}
- if (std::all_of(queues_.begin(), queues_.end(), [&]
- #ifndef _WIN32
- (const typename queue_type::value_type& q)
- #else
- (const std::pair<endpoint_type,std::pair<size_t, std::deque<message_buffer_ptr_t>>>& q)
- #endif
- { return q.second.second.empty(); })) {
+ if (std::all_of(targets_.begin(), targets_.end(),
+ [&](const typename target_data_type::value_type &_t)
+ { return _t.second.queue_.empty(); })) {
// all outstanding response have been sent.
auto found_cbk = prepare_stop_handlers_.find(ANY_SERVICE);
if (found_cbk != prepare_stop_handlers_.end()) {
auto handler = found_cbk->second;
auto ptr = this->shared_from_this();
- #ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
endpoint_impl<Protocol>::
- #endif
- service_.post([ptr, handler](){
+#endif
+ io_.post([ptr, handler](){
handler(ptr, ANY_SERVICE);
});
prepare_stop_handlers_.erase(found_cbk);
@@ -683,27 +690,30 @@ void server_endpoint_impl<Protocol>::send_cbk(
}
};
- auto& its_qpair = _queue_iterator->second;
+ auto& its_data = _it->second;
if (!_error) {
- its_qpair.first -= its_qpair.second.front()->size();
- its_qpair.second.pop_front();
+ its_data.queue_size_ -= its_data.queue_.front().first->size();
+ its_data.queue_.pop_front();
+
+ update_last_departure(its_data);
if (!prepare_stop_handlers_.empty() && !endpoint_impl<Protocol>::sending_blocked_) {
// only one service instance is stopped
check_if_all_msgs_for_stopped_service_are_sent();
}
- if (its_qpair.second.size() > 0) {
- send_queued(_queue_iterator);
+ if (!its_data.queue_.empty()) {
+ (void)send_queued(_it);
} else if (!prepare_stop_handlers_.empty() && endpoint_impl<Protocol>::sending_blocked_) {
// endpoint is shutting down completely
- queues_.erase(_queue_iterator);
+ cancel_dispatch_timer(_it);
+ targets_.erase(_it);
check_if_all_queues_are_empty();
}
} else {
message_buffer_ptr_t its_buffer;
- if (_queue_iterator->second.second.size()) {
- its_buffer = _queue_iterator->second.second.front();
+ if (its_data.queue_.size()) {
+ its_buffer = its_data.queue_.front().first;
}
service_t its_service(0);
method_t its_method(0);
@@ -727,14 +737,15 @@ void server_endpoint_impl<Protocol>::send_cbk(
// delete remaining outstanding responses
VSOMEIP_WARNING << "sei::send_cbk received error: " << _error.message()
<< " (" << std::dec << _error.value() << ") "
- << get_remote_information(_queue_iterator) << " "
- << std::dec << _queue_iterator->second.second.size() << " "
- << std::dec << _queue_iterator->second.first << " ("
+ << get_remote_information(_it) << " "
+ << std::dec << its_data.queue_.size() << " "
+ << std::dec << its_data.queue_size_ << " ("
<< std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_method << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
- queues_.erase(_queue_iterator);
+ cancel_dispatch_timer(_it);
+ targets_.erase(_it);
if (!prepare_stop_handlers_.empty()) {
if (endpoint_impl<Protocol>::sending_blocked_) {
// endpoint is shutting down completely, ensure to call
@@ -751,32 +762,92 @@ void server_endpoint_impl<Protocol>::send_cbk(
template<typename Protocol>
void server_endpoint_impl<Protocol>::flush_cbk(
- endpoint_type _target,
- const std::shared_ptr<train>& _train, const boost::system::error_code &_error_code) {
+ target_data_iterator_type _it,
+ const boost::system::error_code &_error_code) {
+
if (!_error_code) {
- (void) flush(_target, _train);
+
+ (void) flush(_it);
}
}
template<typename Protocol>
size_t server_endpoint_impl<Protocol>::get_queue_size() const {
size_t its_queue_size(0);
-
{
std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto &q : queues_) {
- its_queue_size += q.second.second.size();
+ for (const auto &t : targets_) {
+ its_queue_size += t.second.queue_size_;
}
}
+ return (its_queue_size);
+}
+
+template<typename Protocol>
+void server_endpoint_impl<Protocol>::start_dispatch_timer(
+ target_data_iterator_type _it,
+ const std::chrono::steady_clock::time_point &_now) {
+
+ auto &its_data = _it->second;
+ std::shared_ptr<train> its_train(its_data.train_);
+
+ if (!its_data.dispatched_trains_.empty()) {
+
+ auto its_dispatched = its_data.dispatched_trains_.begin();
+ if (its_dispatched->first < its_train->departure_) {
- return its_queue_size;
+ its_train = its_dispatched->second.front();
+ }
+ }
+
+ std::chrono::nanoseconds its_offset;
+ if (its_train->departure_ > _now) {
+
+ its_offset = std::chrono::duration_cast<std::chrono::nanoseconds>(
+ its_train->departure_ - _now);
+ } else { // already departure time
+
+ its_offset = std::chrono::nanoseconds::zero();
+ }
+
+#if defined(__linux__) || defined(ANDROID)
+ its_data.dispatch_timer_->expires_from_now(its_offset);
+#else
+ its_data.dispatch_timer_->expires_from_now(
+ std::chrono::duration_cast<
+ std::chrono::steady_clock::duration>(its_offset));
+#endif
+ its_data.dispatch_timer_->async_wait(
+ std::bind(&server_endpoint_impl<Protocol>::flush_cbk,
+ this->shared_from_this(), _it, std::placeholders::_1));
+}
+
+template<typename Protocol>
+void server_endpoint_impl<Protocol>::cancel_dispatch_timer(
+ target_data_iterator_type _it) {
+
+ boost::system::error_code ec;
+ _it->second.dispatch_timer_->cancel(ec);
+}
+
+template<typename Protocol>
+void server_endpoint_impl<Protocol>::update_last_departure(
+ endpoint_data_type &_data) {
+
+ _data.last_departure_ = std::chrono::steady_clock::now();
+ _data.has_last_departure_ = true;
}
// Instantiate template
-#ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
+#if VSOMEIP_BOOST_VERSION < 106600
template class server_endpoint_impl<boost::asio::local::stream_protocol_ext>;
+template class server_endpoint_impl<boost::asio::ip::udp_ext>;
+#else
+template class server_endpoint_impl<boost::asio::local::stream_protocol>;
+template class server_endpoint_impl<boost::asio::ip::udp>;
+#endif
#endif
template class server_endpoint_impl<boost::asio::ip::tcp>;
-template class server_endpoint_impl<boost::asio::ip::udp_ext>;
} // namespace vsomeip_v3