summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp54
1 files changed, 45 insertions, 9 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 9b31cc1..66b3138 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -120,9 +120,24 @@ void client_endpoint_impl<Protocol>::stop() {
connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
- shutdown_and_close_socket(false);
+
+ // bind to strand as stop() might be called from different thread
+ strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket,
+ this->shared_from_this(),
+ false)
+ );
+}
+
+template<typename Protocol>
+message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() {
+ message_buffer_ptr_t its_buffer;
+ if (queue_.size())
+ its_buffer = queue_.front();
+
+ return (its_buffer);
}
+
template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
@@ -317,7 +332,11 @@ void client_endpoint_impl<Protocol>::send_segments(
// respect minimal debounce time
wait_until_debounce_time_reached();
// ignore retention time and send immediately as the train is full anyway
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
train_.last_departure_ = std::chrono::steady_clock::now();
}
@@ -397,8 +416,10 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (was_not_connected_) {
was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- if (queue_.size() > 0) {
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
VSOMEIP_WARNING << __func__ << ": resume sending to: "
<< get_remote_information();
}
@@ -415,7 +436,9 @@ template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
- connect();
+ auto self = this->shared_from_this();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
}
@@ -429,7 +452,9 @@ void client_endpoint_impl<Protocol>::send_cbk(
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
}
} else if (_error == boost::asio::error::broken_pipe) {
state_ = cei_state_e::CLOSED;
@@ -475,7 +500,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor
|| _error == boost::asio::error::no_permission) {
@@ -490,7 +516,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::operation_aborted) {
VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message();
// endpoint was stopped
@@ -585,6 +612,11 @@ std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const {
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) {
+ local_port_ = _port;
+}
+
+template<typename Protocol>
void client_endpoint_impl<Protocol>::start_connect_timer() {
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
@@ -665,7 +697,11 @@ void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry)
queue_size_ += train_.buffer_->size();
train_.buffer_ = std::make_shared<message_buffer_t>();
if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
}