diff options
Diffstat (limited to 'implementation/endpoints/src/server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/server_endpoint_impl.cpp | 529 |
1 files changed, 300 insertions, 229 deletions
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index ddf6b25..0d06537 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -11,8 +11,13 @@ #include <boost/asio/buffer.hpp> #include <boost/asio/ip/tcp.hpp> -#include <boost/asio/ip/udp_ext.hpp> +#if VSOMEIP_BOOST_VERSION < 106600 #include <boost/asio/local/stream_protocol_ext.hpp> +#include <boost/asio/ip/udp_ext.hpp> +#else +#include <boost/asio/local/stream_protocol.hpp> +#include <boost/asio/ip/udp.hpp> +#endif #include <vsomeip/defines.hpp> #include <vsomeip/internal/logger.hpp> @@ -30,69 +35,73 @@ template<typename Protocol> server_endpoint_impl<Protocol>::server_endpoint_impl( const std::shared_ptr<endpoint_host>& _endpoint_host, const std::shared_ptr<routing_host>& _routing_host, endpoint_type _local, - boost::asio::io_service &_io, std::uint32_t _max_message_size, + boost::asio::io_context &_io, std::uint32_t _max_message_size, configuration::endpoint_queue_limit_t _queue_limit, const std::shared_ptr<configuration>& _configuration) : endpoint_impl<Protocol>(_endpoint_host, _routing_host, _local, _io, _max_message_size, _queue_limit, _configuration), sent_timer_(_io) { + is_sending_ = false; } template<typename Protocol> server_endpoint_impl<Protocol>::~server_endpoint_impl() { + } template<typename Protocol> void server_endpoint_impl<Protocol>::prepare_stop( - endpoint::prepare_stop_handler_t _handler, service_t _service) { + const endpoint::prepare_stop_handler_t &_handler, service_t _service) { + std::lock_guard<std::mutex> its_lock(mutex_); bool queued_train(false); + std::vector<target_data_iterator_type> its_erased; + boost::system::error_code ec; + if (_service == ANY_SERVICE) { // endpoint is shutting down completely endpoint_impl<Protocol>::sending_blocked_ = true; - boost::system::error_code ec; - for (auto const& train_iter : trains_) { - train_iter.second->departure_timer_->cancel(ec); - if (train_iter.second->buffer_->size() > 0) { - auto target_queue_iter = queues_.find(train_iter.first); - if (target_queue_iter != queues_.end()) { - auto& its_qpair = target_queue_iter->second; - const bool queue_size_zero_on_entry(its_qpair.second.empty()); - queue_train(target_queue_iter, train_iter.second, - queue_size_zero_on_entry); - queued_train = true; - } + for (auto t = targets_.begin(); t != targets_.end(); t++) { + auto its_train (t->second.train_); + // cancel dispatch timer + t->second.dispatch_timer_->cancel(ec); + if (its_train->buffer_->size() > 0) { + const bool queue_size_zero_on_entry(t->second.queue_.empty()); + if (queue_train(t, its_train, queue_size_zero_on_entry)) + its_erased.push_back(t); + queued_train = true; } } } else { - for (auto const& train_iter : trains_) { - for (auto const& passenger_iter : train_iter.second->passengers_) { + for (auto t = targets_.begin(); t != targets_.end(); t++) { + auto its_train(t->second.train_); + for (auto const& passenger_iter : its_train->passengers_) { if (passenger_iter.first == _service) { - // cancel departure timer - boost::system::error_code ec; - train_iter.second->departure_timer_->cancel(ec); + // cancel dispatch timer + t->second.dispatch_timer_->cancel(ec); // queue train - auto target_queue_iter = queues_.find(train_iter.first); - if (target_queue_iter != queues_.end()) { - const auto& its_qpair = target_queue_iter->second; - const bool queue_size_zero_on_entry(its_qpair.second.empty()); - queue_train(target_queue_iter, train_iter.second, - queue_size_zero_on_entry); - queued_train = true; - } + const bool queue_size_zero_on_entry(t->second.queue_.empty()); + // TODO: Queue all(!) trains here... + if (queue_train(t, its_train, queue_size_zero_on_entry)) + its_erased.push_back(t); + queued_train = true; break; } } } } + + for (const auto t : its_erased) + targets_.erase(t); + if (!queued_train) { if (_service == ANY_SERVICE) { - if (std::all_of(queues_.begin(), queues_.end(), - [&](const typename queue_type::value_type& q) - { return q.second.second.empty(); })) { + if (std::all_of(targets_.begin(), targets_.end(), + [&](const typename target_data_type::value_type &_t) + { return _t.second.queue_.empty(); })) { // nothing was queued and all queues are empty -> ensure cbk is called auto ptr = this->shared_from_this(); - endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){ + endpoint_impl<Protocol>::io_.post([ptr, _handler, _service](){ _handler(ptr, _service); }); } else { @@ -101,11 +110,11 @@ void server_endpoint_impl<Protocol>::prepare_stop( } else { // check if any of the queues contains a message of to be stopped service bool found_service_msg(false); - for (const auto& q : queues_) { - for (const auto& msg : q.second.second ) { + for (const auto &t : targets_) { + for (const auto &q : t.second.queue_) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*msg)[VSOMEIP_SERVICE_POS_MIN], - (*msg)[VSOMEIP_SERVICE_POS_MAX]); + (*q.first)[VSOMEIP_SERVICE_POS_MIN], + (*q.first)[VSOMEIP_SERVICE_POS_MAX]); if (its_service == _service) { found_service_msg = true; break; @@ -119,7 +128,7 @@ void server_endpoint_impl<Protocol>::prepare_stop( prepare_stop_handlers_[_service] = _handler; } else { // no messages of the to be stopped service are or have been queued auto ptr = this->shared_from_this(); - endpoint_impl<Protocol>::service_.post([ptr, _handler, _service](){ + endpoint_impl<Protocol>::io_.post([ptr, _handler, _service](){ _handler(ptr, _service); }); } @@ -170,7 +179,7 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8 std::stringstream msg; msg << "sei::send "; for (uint32_t i = 0; i < _size; i++) - msg << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; + msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_INFO << msg.str(); #endif endpoint_type its_target; @@ -185,45 +194,38 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8 const service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); const client_t its_client = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]); - requests_mutex_.lock(); - auto found_client = requests_.find(its_client); - if (found_client != requests_.end()) { - auto its_request = std::make_tuple(its_service, its_method, its_session); - auto found_request = found_client->second.find(its_request); - if (found_request != found_client->second.end()) { - its_target = found_request->second; + clients_mutex_.lock(); + auto found_client = clients_.find(its_client); + if (found_client != clients_.end()) { + auto found_session = found_client->second.find(its_session); + if (found_session != found_client->second.end()) { + its_target = found_session->second; is_valid_target = true; - found_client->second.erase(found_request); + found_client->second.erase(its_session); } else { - VSOMEIP_WARNING << "server_endpoint::send: request [" - << std::hex << std::setw(4) << std::setfill('0') - << its_service << "." - << std::hex << std::setw(4) << std::setfill('0') - << its_method << "/" - << std::hex << std::setw(4) << std::setfill('0') - << its_client << "." - << std::hex << std::setw(4) << std::setfill('0') - << its_session - << "] could not be found."; + VSOMEIP_WARNING << "server_endpoint::send: session_id 0x" + << std::hex << its_session + << " not found for client 0x" << its_client; + const method_t its_method = + VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); if (its_service == VSOMEIP_SD_SERVICE && its_method == VSOMEIP_SD_METHOD) { VSOMEIP_ERROR << "Clearing clients map as a request was " "received on SD port"; - requests_.clear(); + clients_.clear(); is_valid_target = get_default_target(its_service, its_target); } } } else { is_valid_target = get_default_target(its_service, its_target); } - requests_mutex_.unlock(); + clients_mutex_.unlock(); if (is_valid_target) { is_valid_target = send_intern(its_target, _data, _size); @@ -276,9 +278,11 @@ bool server_endpoint_impl<Protocol>::send_intern( } } - const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target); + const auto its_target_iterator = find_or_create_target_unlocked(_target); + auto &its_data(its_target_iterator->second); bool must_depart(false); + auto its_now(std::chrono::steady_clock::now()); #if 0 std::stringstream msg; @@ -287,15 +291,12 @@ bool server_endpoint_impl<Protocol>::send_intern( msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_DEBUG << msg.str(); #endif - // STEP 1: determine the correct train - std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target); - - const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty()); - if (!check_queue_limit(_data, _size, target_queue_iterator->second.first)) { + // STEP 1: Check queue limit + if (!check_queue_limit(_data, _size, its_data.queue_size_)) { return false; } - // STEP 2: Determine elapsed time and update the departure time and cancel the timer - target_train->update_departure_time_and_stop_departure(); + // STEP 2: Cancel the dispatch timer + cancel_dispatch_timer(its_target_iterator); // STEP 3: Get configured timings const service_t its_service = VSOMEIP_BYTES_TO_WORD( @@ -310,38 +311,38 @@ bool server_endpoint_impl<Protocol>::send_intern( } // STEP 4: Check if the passenger enters an empty train - const std::pair<service_t, method_t> its_identifier = std::make_pair( - its_service, its_method); - if (target_train->passengers_.empty()) { - target_train->departure_ = its_retention; + const std::pair<service_t, method_t> its_identifier + = std::make_pair(its_service, its_method); + if (its_data.train_->passengers_.empty()) { + its_data.train_->departure_ = its_now + its_retention; } else { - if (target_train->passengers_.end() - != target_train->passengers_.find(its_identifier)) { + if (its_data.train_->passengers_.end() + != its_data.train_->passengers_.find(its_identifier)) { must_depart = true; } else { // STEP 5: Check whether the current message fits into the current train - if (target_train->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) { + if (its_data.train_->buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) { must_depart = true; } else { // STEP 6: Check debouncing time - if (its_debouncing > target_train->minimal_max_retention_time_) { + if (its_debouncing > its_data.train_->minimal_max_retention_time_) { // train's latest departure would already undershot new // passenger's debounce time must_depart = true; } else { - if (its_debouncing > target_train->departure_) { + if (its_now + its_debouncing > its_data.train_->departure_) { // train departs earlier as the new passenger's debounce // time allows must_depart = true; } else { // STEP 7: Check maximum retention time - if (its_retention < target_train->minimal_debounce_time_) { + if (its_retention < its_data.train_->minimal_debounce_time_) { // train's earliest departure would already exceed // the new passenger's retention time. must_depart = true; } else { - if (its_retention < target_train->departure_) { - target_train->departure_ = its_retention; + if (its_now + its_retention < its_data.train_->departure_) { + its_data.train_->departure_ = its_now + its_retention; } } } @@ -354,52 +355,42 @@ bool server_endpoint_impl<Protocol>::send_intern( if (must_depart) { // STEP 8.1: check if debounce time would be undershot here if the train // departs. Block sending until train is allowed to depart. - wait_until_debounce_time_reached(target_train); - queue_train(target_queue_iterator, target_train, - queue_size_zero_on_entry); - target_train->departure_ = its_retention; + schedule_train(its_data); + + its_data.train_ = std::make_shared<train>(); + its_data.train_->departure_ = its_now + its_retention; } // STEP 9: insert current message buffer - target_train->buffer_->insert(target_train->buffer_->end(), _data, _data + _size); - target_train->passengers_.insert(its_identifier); + its_data.train_->buffer_->insert(its_data.train_->buffer_->end(), _data, _data + _size); + its_data.train_->passengers_.insert(its_identifier); // STEP 9.1: update the trains minimal debounce time if necessary - if (its_debouncing < target_train->minimal_debounce_time_) { - target_train->minimal_debounce_time_ = its_debouncing; + if (its_debouncing < its_data.train_->minimal_debounce_time_) { + its_data.train_->minimal_debounce_time_ = its_debouncing; } // STEP 9.2: update the trains minimal maximum retention time if necessary - if (its_retention < target_train->minimal_max_retention_time_) { - target_train->minimal_max_retention_time_ = its_retention; + if (its_retention < its_data.train_->minimal_max_retention_time_) { + its_data.train_->minimal_max_retention_time_ = its_retention; } // STEP 10: restart timer with current departure time -#ifndef _WIN32 - target_train->departure_timer_->expires_from_now(target_train->departure_); -#else - target_train->departure_timer_->expires_from_now( - std::chrono::duration_cast< - std::chrono::steady_clock::duration>(target_train->departure_)); -#endif - target_train->departure_timer_->async_wait( - std::bind(&server_endpoint_impl<Protocol>::flush_cbk, - this->shared_from_this(), _target, - target_train, std::placeholders::_1)); + start_dispatch_timer(its_target_iterator, its_now); return (true); } template<typename Protocol> void server_endpoint_impl<Protocol>::send_segments( - const tp::tp_split_messages_t &_segments, const endpoint_type &_target) { + const tp::tp_split_messages_t &_segments, std::uint32_t _separation_time, + const endpoint_type &_target) { if (_segments.size() == 0) return; - const queue_iterator_type target_queue_iterator = find_or_create_queue_unlocked(_target); - const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty()); + const auto its_target_iterator = find_or_create_target_unlocked(_target); + auto &its_data = its_target_iterator->second; - std::shared_ptr<train> target_train = find_or_create_train_unlocked(_target); - target_train->update_departure_time_and_stop_departure(); + auto its_now(std::chrono::steady_clock::now()); const service_t its_service = VSOMEIP_BYTES_TO_WORD( (*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN], (*(_segments[0]))[VSOMEIP_SERVICE_POS_MAX]); @@ -412,47 +403,63 @@ void server_endpoint_impl<Protocol>::send_segments( &its_debouncing, &its_retention); } // update the trains minimal debounce time if necessary - if (its_debouncing < target_train->minimal_debounce_time_) { - target_train->minimal_debounce_time_ = its_debouncing; + if (its_debouncing < its_data.train_->minimal_debounce_time_) { + its_data.train_->minimal_debounce_time_ = its_debouncing; } // update the trains minimal maximum retention time if necessary - if (its_retention < target_train->minimal_max_retention_time_) { - target_train->minimal_max_retention_time_ = its_retention; + if (its_retention < its_data.train_->minimal_max_retention_time_) { + its_data.train_->minimal_max_retention_time_ = its_retention; } // We only need to respect the debouncing. There is no need to wait for further // messages as we will send several now anyway. - if (!target_train->passengers_.empty()) { - wait_until_debounce_time_reached(target_train); - queue_train(target_queue_iterator, target_train, queue_size_zero_on_entry); + if (!its_data.train_->passengers_.empty()) { + schedule_train(its_data); + its_data.train_->departure_ = its_now + its_retention; } - const bool queue_size_still_zero(target_queue_iterator->second.second.empty()); + const bool queue_size_still_zero(its_data.queue_.empty()); for (const auto &s : _segments) { - target_queue_iterator->second.second.emplace_back(s); - target_queue_iterator->second.first += s->size(); + its_data.queue_.emplace_back(std::make_pair(s, _separation_time)); + its_data.queue_size_ += s->size(); } - if (queue_size_still_zero && !target_queue_iterator->second.second.empty()) { // no writing in progress + + if (queue_size_still_zero && !its_data.queue_.empty()) { // no writing in progress // respect minimal debounce time - wait_until_debounce_time_reached(target_train); + schedule_train(its_data); // ignore retention time and send immediately as the train is full anyway - send_queued(target_queue_iterator); + (void)send_queued(its_target_iterator); } - target_train->last_departure_ = std::chrono::steady_clock::now(); } template<typename Protocol> -void server_endpoint_impl<Protocol>::wait_until_debounce_time_reached( - const std::shared_ptr<train>& _train) const { - const std::chrono::nanoseconds time_since_last_departure = - std::chrono::duration_cast<std::chrono::nanoseconds>( - std::chrono::steady_clock::now() - _train->last_departure_); - - if (time_since_last_departure < _train->minimal_debounce_time_) { - std::this_thread::sleep_for( - _train->minimal_debounce_time_ - time_since_last_departure); +typename server_endpoint_impl<Protocol>::target_data_iterator_type +server_endpoint_impl<Protocol>::find_or_create_target_unlocked(endpoint_type _target) { + + auto its_iterator = targets_.find(_target); + if (its_iterator == targets_.end()) { + + auto its_result = targets_.emplace( + std::make_pair(_target, endpoint_data_type(this->io_))); + its_iterator = its_result.first; } + + return (its_iterator); } +template<typename Protocol> +void server_endpoint_impl<Protocol>::schedule_train(endpoint_data_type &_data) { + + if (_data.has_last_departure_) { + if (_data.last_departure_ + _data.train_->minimal_debounce_time_ + > _data.train_->departure_) { + _data.train_->departure_ = _data.last_departure_ + + _data.train_->minimal_debounce_time_; + } + } + + _data.dispatched_trains_[_data.train_->departure_] + .push_back(_data.train_); +} template<typename Protocol> typename endpoint_impl<Protocol>::cms_ret_e server_endpoint_impl<Protocol>::check_message_size( @@ -469,11 +476,21 @@ typename endpoint_impl<Protocol>::cms_ret_e server_endpoint_impl<Protocol>::chec _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if (tp_segmentation_enabled(its_service, its_method)) { - send_segments(tp::tp::tp_split_message(_data, _size), _target); - return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT; + instance_t its_instance = this->get_instance(its_service); + if (its_instance != 0xFFFF) { + std::uint16_t its_max_segment_length; + std::uint32_t its_separation_time; + + this->configuration_->get_tp_configuration( + its_service, its_instance, its_method, false, + its_max_segment_length, its_separation_time); + send_segments(tp::tp::tp_split_message(_data, _size, + its_max_segment_length), its_separation_time, _target); + return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT; + } } } - VSOMEIP_ERROR << "sei::send_intern: Dropping too big message (" << _size + VSOMEIP_ERROR << "sei::send_intern: Dropping to big message (" << _size << " Bytes). Maximum allowed message size is: " << endpoint_impl<Protocol>::max_message_size_ << " Bytes."; ret = endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG; @@ -523,72 +540,67 @@ bool server_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std } template<typename Protocol> -void server_endpoint_impl<Protocol>::queue_train( - const queue_iterator_type _queue_iterator, - const std::shared_ptr<train>& _train, - bool _queue_size_zero_on_entry) { - _queue_iterator->second.second.emplace_back(_train->buffer_); - _queue_iterator->second.first += _train->buffer_->size(); - _train->last_departure_ = std::chrono::steady_clock::now(); - _train->passengers_.clear(); - _train->buffer_ = std::make_shared<message_buffer_t>(); - _train->minimal_debounce_time_ = std::chrono::nanoseconds::max(); - _train->minimal_max_retention_time_ = std::chrono::nanoseconds::max(); - if (_queue_size_zero_on_entry && !_queue_iterator->second.second.empty()) { // no writing in progress - send_queued(_queue_iterator); - } -} +bool server_endpoint_impl<Protocol>::queue_train( + target_data_iterator_type _it, const std::shared_ptr<train> &_train, + bool _queue_size_zero_on_entry) { -template<typename Protocol> -typename server_endpoint_impl<Protocol>::queue_iterator_type -server_endpoint_impl<Protocol>::find_or_create_queue_unlocked(const endpoint_type& _target) { - queue_iterator_type target_queue_iterator = queues_.find(_target); - if (target_queue_iterator == queues_.end()) { - target_queue_iterator = queues_.insert(queues_.begin(), - std::make_pair( - _target, - std::make_pair(std::size_t(0), - std::deque<message_buffer_ptr_t>()) - )); - } - return target_queue_iterator; -} + bool must_erase(false); -template<typename Protocol> -std::shared_ptr<train> server_endpoint_impl<Protocol>::find_or_create_train_unlocked( - const endpoint_type& _target) { - auto train_iter = trains_.find(_target); - if (train_iter == trains_.end()) { - train_iter = trains_.insert(trains_.begin(), - std::make_pair(_target, std::make_shared<train>(this->service_))); + auto &its_data = _it->second; + its_data.queue_.push_back(std::make_pair(_train->buffer_, 0)); + its_data.queue_size_ += _train->buffer_->size(); + + if (_queue_size_zero_on_entry && !its_data.queue_.empty()) { // no writing in progress + must_erase = send_queued(_it); } - return train_iter->second; + + return must_erase; } template<typename Protocol> -bool server_endpoint_impl<Protocol>::flush( - endpoint_type _target, - const std::shared_ptr<train>& _train) { +bool server_endpoint_impl<Protocol>::flush(target_data_iterator_type _it) { + + bool has_queued(true); + bool is_current_train(true); + auto &its_data = _it->second; + std::lock_guard<std::mutex> its_lock(mutex_); - bool is_flushed = false; - if (!_train->buffer_->empty()) { - const queue_iterator_type target_queue_iterator = queues_.find(_target); - if (target_queue_iterator != queues_.end()) { - const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty()); - queue_train(target_queue_iterator, _train, queue_size_zero_on_entry); - is_flushed = true; - } else { - std::stringstream ss; - ss << "sei::flush couldn't find target queue, won't queue train to: " - << get_remote_information(_target) << " passengers: "; - for (const auto& p : _train->passengers_) { - ss << "[" << std::hex << std::setw(4) << std::setfill('0') - << p.first << ":" << p.second << "] "; + + auto its_train(its_data.train_); + if (!its_data.dispatched_trains_.empty()) { + + auto its_dispatched = its_data.dispatched_trains_.begin(); + if (its_dispatched->first <= its_train->departure_) { + + is_current_train = false; + its_train = its_dispatched->second.front(); + its_dispatched->second.pop_front(); + if (its_dispatched->second.empty()) { + + its_data.dispatched_trains_.erase(its_dispatched); } - VSOMEIP_WARNING << ss.str(); } } - return is_flushed; + + if (!its_train->buffer_->empty()) { + + queue_train(_it, its_train, its_data.queue_.empty()); + + // Reset current train if necessary + if (is_current_train) { + its_train->reset(); + } + } else { + has_queued = false; + } + + if (!is_current_train || !its_data.dispatched_trains_.empty()) { + + auto its_now(std::chrono::steady_clock::now()); + start_dispatch_timer(_it, its_now); + } + + return (has_queued); } template<typename Protocol> @@ -599,7 +611,7 @@ void server_endpoint_impl<Protocol>::connect_cbk( template<typename Protocol> void server_endpoint_impl<Protocol>::send_cbk( - const queue_iterator_type _queue_iterator, + const target_data_iterator_type _it, boost::system::error_code const &_error, std::size_t _bytes) { (void)_bytes; @@ -623,11 +635,11 @@ void server_endpoint_impl<Protocol>::send_cbk( ++stp_hndlr_iter; continue; } - for (const auto& q : queues_) { - for (const auto& msg : q.second.second ) { + for (const auto& t : targets_) { + for (const auto& e : t.second.queue_ ) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( - (*msg)[VSOMEIP_SERVICE_POS_MIN], - (*msg)[VSOMEIP_SERVICE_POS_MAX]); + (*e.first)[VSOMEIP_SERVICE_POS_MIN], + (*e.first)[VSOMEIP_SERVICE_POS_MAX]); if (its_service == its_stopped_service) { found_service_msg = true; break; @@ -639,14 +651,13 @@ void server_endpoint_impl<Protocol>::send_cbk( } if (found_service_msg) { ++stp_hndlr_iter; - found_service_msg = false; } else { // all messages of the to be stopped service have been sent auto handler = stp_hndlr_iter->second; auto ptr = this->shared_from_this(); - #ifndef _WIN32 +#if defined(__linux__) || defined(ANDROID) endpoint_impl<Protocol>:: - #endif - service_.post([ptr, handler, its_stopped_service](){ +#endif + io_.post([ptr, handler, its_stopped_service](){ handler(ptr, its_stopped_service); }); stp_hndlr_iter = prepare_stop_handlers_.erase(stp_hndlr_iter); @@ -660,22 +671,18 @@ void server_endpoint_impl<Protocol>::send_cbk( // prepare_stop_handlers have been queued ensure to call them as well check_if_all_msgs_for_stopped_service_are_sent(); } - if (std::all_of(queues_.begin(), queues_.end(), [&] - #ifndef _WIN32 - (const typename queue_type::value_type& q) - #else - (const std::pair<endpoint_type,std::pair<size_t, std::deque<message_buffer_ptr_t>>>& q) - #endif - { return q.second.second.empty(); })) { + if (std::all_of(targets_.begin(), targets_.end(), + [&](const typename target_data_type::value_type &_t) + { return _t.second.queue_.empty(); })) { // all outstanding response have been sent. auto found_cbk = prepare_stop_handlers_.find(ANY_SERVICE); if (found_cbk != prepare_stop_handlers_.end()) { auto handler = found_cbk->second; auto ptr = this->shared_from_this(); - #ifndef _WIN32 +#if defined(__linux__) || defined(ANDROID) endpoint_impl<Protocol>:: - #endif - service_.post([ptr, handler](){ +#endif + io_.post([ptr, handler](){ handler(ptr, ANY_SERVICE); }); prepare_stop_handlers_.erase(found_cbk); @@ -683,27 +690,30 @@ void server_endpoint_impl<Protocol>::send_cbk( } }; - auto& its_qpair = _queue_iterator->second; + auto& its_data = _it->second; if (!_error) { - its_qpair.first -= its_qpair.second.front()->size(); - its_qpair.second.pop_front(); + its_data.queue_size_ -= its_data.queue_.front().first->size(); + its_data.queue_.pop_front(); + + update_last_departure(its_data); if (!prepare_stop_handlers_.empty() && !endpoint_impl<Protocol>::sending_blocked_) { // only one service instance is stopped check_if_all_msgs_for_stopped_service_are_sent(); } - if (its_qpair.second.size() > 0) { - send_queued(_queue_iterator); + if (!its_data.queue_.empty()) { + (void)send_queued(_it); } else if (!prepare_stop_handlers_.empty() && endpoint_impl<Protocol>::sending_blocked_) { // endpoint is shutting down completely - queues_.erase(_queue_iterator); + cancel_dispatch_timer(_it); + targets_.erase(_it); check_if_all_queues_are_empty(); } } else { message_buffer_ptr_t its_buffer; - if (_queue_iterator->second.second.size()) { - its_buffer = _queue_iterator->second.second.front(); + if (its_data.queue_.size()) { + its_buffer = its_data.queue_.front().first; } service_t its_service(0); method_t its_method(0); @@ -727,14 +737,15 @@ void server_endpoint_impl<Protocol>::send_cbk( // delete remaining outstanding responses VSOMEIP_WARNING << "sei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " - << get_remote_information(_queue_iterator) << " " - << std::dec << _queue_iterator->second.second.size() << " " - << std::dec << _queue_iterator->second.first << " (" + << get_remote_information(_it) << " " + << std::dec << its_data.queue_.size() << " " + << std::dec << its_data.queue_size_ << " (" << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_method << "." << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; - queues_.erase(_queue_iterator); + cancel_dispatch_timer(_it); + targets_.erase(_it); if (!prepare_stop_handlers_.empty()) { if (endpoint_impl<Protocol>::sending_blocked_) { // endpoint is shutting down completely, ensure to call @@ -751,32 +762,92 @@ void server_endpoint_impl<Protocol>::send_cbk( template<typename Protocol> void server_endpoint_impl<Protocol>::flush_cbk( - endpoint_type _target, - const std::shared_ptr<train>& _train, const boost::system::error_code &_error_code) { + target_data_iterator_type _it, + const boost::system::error_code &_error_code) { + if (!_error_code) { - (void) flush(_target, _train); + + (void) flush(_it); } } template<typename Protocol> size_t server_endpoint_impl<Protocol>::get_queue_size() const { size_t its_queue_size(0); - { std::lock_guard<std::mutex> its_lock(mutex_); - for (const auto &q : queues_) { - its_queue_size += q.second.second.size(); + for (const auto &t : targets_) { + its_queue_size += t.second.queue_size_; } } + return (its_queue_size); +} + +template<typename Protocol> +void server_endpoint_impl<Protocol>::start_dispatch_timer( + target_data_iterator_type _it, + const std::chrono::steady_clock::time_point &_now) { + + auto &its_data = _it->second; + std::shared_ptr<train> its_train(its_data.train_); + + if (!its_data.dispatched_trains_.empty()) { + + auto its_dispatched = its_data.dispatched_trains_.begin(); + if (its_dispatched->first < its_train->departure_) { - return its_queue_size; + its_train = its_dispatched->second.front(); + } + } + + std::chrono::nanoseconds its_offset; + if (its_train->departure_ > _now) { + + its_offset = std::chrono::duration_cast<std::chrono::nanoseconds>( + its_train->departure_ - _now); + } else { // already departure time + + its_offset = std::chrono::nanoseconds::zero(); + } + +#if defined(__linux__) || defined(ANDROID) + its_data.dispatch_timer_->expires_from_now(its_offset); +#else + its_data.dispatch_timer_->expires_from_now( + std::chrono::duration_cast< + std::chrono::steady_clock::duration>(its_offset)); +#endif + its_data.dispatch_timer_->async_wait( + std::bind(&server_endpoint_impl<Protocol>::flush_cbk, + this->shared_from_this(), _it, std::placeholders::_1)); +} + +template<typename Protocol> +void server_endpoint_impl<Protocol>::cancel_dispatch_timer( + target_data_iterator_type _it) { + + boost::system::error_code ec; + _it->second.dispatch_timer_->cancel(ec); +} + +template<typename Protocol> +void server_endpoint_impl<Protocol>::update_last_departure( + endpoint_data_type &_data) { + + _data.last_departure_ = std::chrono::steady_clock::now(); + _data.has_last_departure_ = true; } // Instantiate template -#ifndef _WIN32 +#if defined(__linux__) || defined(ANDROID) +#if VSOMEIP_BOOST_VERSION < 106600 template class server_endpoint_impl<boost::asio::local::stream_protocol_ext>; +template class server_endpoint_impl<boost::asio::ip::udp_ext>; +#else +template class server_endpoint_impl<boost::asio::local::stream_protocol>; +template class server_endpoint_impl<boost::asio::ip::udp>; +#endif #endif template class server_endpoint_impl<boost::asio::ip::tcp>; -template class server_endpoint_impl<boost::asio::ip::udp_ext>; } // namespace vsomeip_v3 |