diff options
Diffstat (limited to 'implementation/endpoints/src/client_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/client_endpoint_impl.cpp | 54 |
1 files changed, 45 insertions, 9 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 9b31cc1..66b3138 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -120,9 +120,24 @@ void client_endpoint_impl<Protocol>::stop() { connect_timer_.cancel(ec); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; - shutdown_and_close_socket(false); + + // bind to strand as stop() might be called from different thread + strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket, + this->shared_from_this(), + false) + ); +} + +template<typename Protocol> +message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() { + message_buffer_ptr_t its_buffer; + if (queue_.size()) + its_buffer = queue_.front(); + + return (its_buffer); } + template<typename Protocol> bool client_endpoint_impl<Protocol>::send_to( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, @@ -317,7 +332,11 @@ void client_endpoint_impl<Protocol>::send_segments( // respect minimal debounce time wait_until_debounce_time_reached(); // ignore retention time and send immediately as the train is full anyway - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); + } } train_.last_departure_ = std::chrono::steady_clock::now(); } @@ -397,8 +416,10 @@ void client_endpoint_impl<Protocol>::connect_cbk( if (was_not_connected_) { was_not_connected_ = false; std::lock_guard<std::mutex> its_lock(mutex_); - if (queue_.size() > 0) { - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); VSOMEIP_WARNING << __func__ << ": resume sending to: " << get_remote_information(); } @@ -415,7 +436,9 @@ template<typename Protocol> void client_endpoint_impl<Protocol>::wait_connect_cbk( boost::system::error_code const &_error) { if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) { - connect(); + auto self = this->shared_from_this(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } } @@ -429,7 +452,9 @@ void client_endpoint_impl<Protocol>::send_cbk( if (queue_.size() > 0) { queue_size_ -= queue_.front()->size(); queue_.pop_front(); - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) + send_queued(its_buffer); } } else if (_error == boost::asio::error::broken_pipe) { state_ = cei_state_e::CLOSED; @@ -475,7 +500,8 @@ void client_endpoint_impl<Protocol>::send_cbk( } was_not_connected_ = true; shutdown_and_close_socket(true); - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } else if (_error == boost::asio::error::not_connected || _error == boost::asio::error::bad_descriptor || _error == boost::asio::error::no_permission) { @@ -490,7 +516,8 @@ void client_endpoint_impl<Protocol>::send_cbk( } was_not_connected_ = true; shutdown_and_close_socket(true); - connect(); + strand_.dispatch(std::bind(&client_endpoint_impl::connect, + this->shared_from_this())); } else if (_error == boost::asio::error::operation_aborted) { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message(); // endpoint was stopped @@ -585,6 +612,11 @@ std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const { } template<typename Protocol> +void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) { + local_port_ = _port; +} + +template<typename Protocol> void client_endpoint_impl<Protocol>::start_connect_timer() { std::lock_guard<std::mutex> its_lock(connect_timer_mutex_); connect_timer_.expires_from_now( @@ -665,7 +697,11 @@ void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry) queue_size_ += train_.buffer_->size(); train_.buffer_ = std::make_shared<message_buffer_t>(); if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress - send_queued(); + auto its_buffer = get_front(); + if (its_buffer) { + strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, + this->shared_from_this(), its_buffer)); + } } } |