diff options
2 files changed, 22 insertions, 18 deletions
diff --git a/src/components/transport_manager/include/transport_manager/transport_adapter/threaded_socket_connection.h b/src/components/transport_manager/include/transport_manager/transport_adapter/threaded_socket_connection.h index 5e0caa22e..d764979fb 100644 --- a/src/components/transport_manager/include/transport_manager/transport_adapter/threaded_socket_connection.h +++ b/src/components/transport_manager/include/transport_manager/transport_adapter/threaded_socket_connection.h @@ -81,6 +81,13 @@ class ThreadedSocketConnection : public Connection { TransportAdapter::Error Start(); /** + * @brief Checks is queue with frames to send empty or not. + * + * @return Information about queue is empty or not. + */ + bool IsFramesToSendQueueEmpty() const; + + /** * @brief Set variable that hold socket No. */ void set_socket(int socket) { diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc index cc476c745..f520841a4 100644 --- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc +++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc @@ -184,6 +184,12 @@ void ThreadedSocketConnection::threadMain() { } } +bool ThreadedSocketConnection::IsFramesToSendQueueEmpty() const { + // Check Frames queue is empty or not + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + return frames_to_send_.empty(); +} + void ThreadedSocketConnection::Transmit() { LOG4CXX_AUTO_TRACE(logger_); @@ -191,15 +197,10 @@ void ThreadedSocketConnection::Transmit() { pollfd poll_fds[kPollFdsSize]; poll_fds[0].fd = socket_; - bool is_queue_empty = true; - { - // Check Frames queue is empty or not - sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); - is_queue_empty = frames_to_send_.empty(); - } + const bool is_queue_empty_on_poll = IsFramesToSendQueueEmpty(); poll_fds[0].events = POLLIN | POLLPRI - | (is_queue_empty ? 0 : POLLOUT); + | (is_queue_empty_on_poll ? 0 : POLLOUT); poll_fds[1].fd = read_fd_; poll_fds[1].events = POLLIN | POLLPRI; @@ -240,11 +241,7 @@ void ThreadedSocketConnection::Transmit() { return; } - { - // Check Frames queue is empty or not - sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); - is_queue_empty = frames_to_send_.empty(); - } + const bool is_queue_empty = IsFramesToSendQueueEmpty(); // Send data if possible if (!is_queue_empty && (poll_fds[0].revents | POLLOUT)) { @@ -303,17 +300,17 @@ bool ThreadedSocketConnection::Receive() { bool ThreadedSocketConnection::Send() { LOG4CXX_AUTO_TRACE(logger_); - FrameQueue frames_to_send; + FrameQueue frames_to_send_local; { sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); - std::swap(frames_to_send, frames_to_send_); + std::swap(frames_to_send_local, frames_to_send_); } size_t offset = 0; - while (!frames_to_send.empty()) { + while (!frames_to_send_local.empty()) { LOG4CXX_INFO(logger_, "frames_to_send is not empty"); - ::protocol_handler::RawMessagePtr frame = frames_to_send.front(); + ::protocol_handler::RawMessagePtr frame = frames_to_send_local.front(); const ssize_t bytes_sent = ::send(socket_, frame->data() + offset, frame->data_size() - offset, 0); @@ -321,14 +318,14 @@ bool ThreadedSocketConnection::Send() { LOG4CXX_DEBUG(logger_, "bytes_sent >= 0"); offset += bytes_sent; if (offset == frame->data_size()) { - frames_to_send.pop(); + frames_to_send_local.pop(); offset = 0; controller_->DataSendDone(device_handle(), application_handle(), frame); } } else { LOG4CXX_DEBUG(logger_, "bytes_sent < 0"); LOG4CXX_ERROR_WITH_ERRNO(logger_, "Send failed for connection " << this); - frames_to_send.pop(); + frames_to_send_local.pop(); offset = 0; controller_->DataSendFailed(device_handle(), application_handle(), frame, DataSendError()); |