diff options
Diffstat (limited to 'src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc')
-rw-r--r-- | src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc | 234 |
1 files changed, 110 insertions, 124 deletions
diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc index 1e189ed10..67e41cec2 100644 --- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc +++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2013, Ford Motor Company * All rights reserved. * @@ -36,7 +36,9 @@ #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> + #include "utils/logger.h" +#include "utils/threads/thread.h" #include "transport_manager/transport_adapter/threaded_socket_connection.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" @@ -46,197 +48,173 @@ namespace transport_adapter { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") ThreadedSocketConnection::ThreadedSocketConnection( - const DeviceUID& device_id, const ApplicationHandle& app_handle, - TransportAdapterController* controller) - : read_fd_(-1), write_fd_(-1), controller_(controller), - frames_to_send_(), - frames_to_send_mutex_(), - socket_(-1), - terminate_flag_(false), - unexpected_disconnect_(false), - device_uid_(device_id), - app_handle_(app_handle) - { - pthread_mutex_init(&frames_to_send_mutex_, 0); + const DeviceUID& device_id, const ApplicationHandle& app_handle, + TransportAdapterController* controller) + : read_fd_(-1), + write_fd_(-1), + controller_(controller), + frames_to_send_(), + frames_to_send_mutex_(), + socket_(-1), + terminate_flag_(false), + unexpected_disconnect_(false), + device_uid_(device_id), + app_handle_(app_handle), + thread_(NULL) { + const std::string thread_name = std::string("Socket ") + device_handle(); + thread_ = threads::CreateThread(thread_name.c_str(), + new SocketConnectionDelegate(this)); } ThreadedSocketConnection::~ThreadedSocketConnection() { - terminate_flag_ = true; - Notify(); - pthread_mutex_destroy(&frames_to_send_mutex_); + LOG4CXX_AUTO_TRACE(logger_); + Disconnect(); + thread_->join(); + delete thread_->delegate(); + threads::DeleteThread(thread_); + if (-1 != read_fd_) { + close(read_fd_); + } + if (-1 != write_fd_) { + close(write_fd_); + } } void ThreadedSocketConnection::Abort() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); unexpected_disconnect_ = true; terminate_flag_ = true; - LOG4CXX_TRACE(logger_, "exit"); } TransportAdapter::Error ThreadedSocketConnection::Start() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); int fds[2]; const int pipe_ret = pipe(fds); if (0 == pipe_ret) { - LOG4CXX_DEBUG(logger_, "pipe created(#" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "pipe created"); read_fd_ = fds[0]; write_fd_ = fds[1]; } else { - LOG4CXX_WARN(logger_, "pipe creation failed (#" << pthread_self() << ")"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL"); + LOG4CXX_ERROR(logger_, "pipe creation failed"); return TransportAdapter::FAIL; } const int fcntl_ret = fcntl(read_fd_, F_SETFL, fcntl(read_fd_, F_GETFL) | O_NONBLOCK); if (0 != fcntl_ret) { - LOG4CXX_WARN(logger_, "fcntl failed (#" << pthread_self() << ")"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL"); + LOG4CXX_ERROR(logger_, "fcntl failed"); return TransportAdapter::FAIL; } - const std::string thread_name = std::string("Socket ") + device_handle(); - thread_ = threads::CreateThread(thread_name.c_str(), this); - if (!thread_->start()) { - LOG4CXX_WARN(logger_, "thread creation failed (#" << pthread_self() << ")"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL"); + LOG4CXX_ERROR(logger_, "thread creation failed"); return TransportAdapter::FAIL; } - LOG4CXX_DEBUG(logger_, "thread created (#" << pthread_self() << ")"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); + LOG4CXX_INFO(logger_, "thread created"); return TransportAdapter::OK; } void ThreadedSocketConnection::Finalize() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); if (unexpected_disconnect_) { - LOG4CXX_DEBUG(logger_, "unexpected_disconnect (#" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "unexpected_disconnect"); controller_->ConnectionAborted(device_handle(), application_handle(), CommunicationError()); } else { - LOG4CXX_DEBUG(logger_, "not unexpected_disconnect (#" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "not unexpected_disconnect"); controller_->ConnectionFinished(device_handle(), application_handle()); } close(socket_); - LOG4CXX_TRACE(logger_, "exit"); } TransportAdapter::Error ThreadedSocketConnection::Notify() const { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); if (-1 == write_fd_) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, - "Failed to wake up connection thread for connection " << this); + LOG4CXX_ERROR_WITH_ERRNO( + logger_, "Failed to wake up connection thread for connection " << this); LOG4CXX_TRACE(logger_, "exit with TransportAdapter::BAD_STATE"); return TransportAdapter::BAD_STATE; } uint8_t c = 0; - if (1 == write(write_fd_, &c, 1)) { - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); - return TransportAdapter::OK; - } else { + if (1 != write(write_fd_, &c, 1)) { LOG4CXX_ERROR_WITH_ERRNO( - logger_, "Failed to wake up connection thread for connection " << this); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL"); + logger_, "Failed to wake up connection thread for connection " << this); return TransportAdapter::FAIL; } + return TransportAdapter::OK; } TransportAdapter::Error ThreadedSocketConnection::SendData( - ::protocol_handler::RawMessagePtr message) { - LOG4CXX_TRACE(logger_, "enter"); - pthread_mutex_lock(&frames_to_send_mutex_); + ::protocol_handler::RawMessagePtr message) { + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); frames_to_send_.push(message); - pthread_mutex_unlock(&frames_to_send_mutex_); - TransportAdapter::Error error = Notify(); - LOG4CXX_TRACE(logger_, "exit with error" << error); - return error; + return Notify(); } TransportAdapter::Error ThreadedSocketConnection::Disconnect() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); terminate_flag_ = true; - TransportAdapter::Error error = Notify(); - LOG4CXX_TRACE(logger_, "exit with error" << error); - return error; -} - -bool ThreadedSocketConnection::exitThreadMain() { - terminate_flag_ = true; - Notify(); - return true; + return Notify(); } void ThreadedSocketConnection::threadMain() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); controller_->ConnectionCreated(this, device_uid_, app_handle_); ConnectError* connect_error = NULL; - if (Establish(&connect_error)) { - LOG4CXX_DEBUG(logger_, "Connection established (#" << pthread_self() << ")"); - controller_->ConnectDone(device_handle(), application_handle()); - while (!terminate_flag_) { - Transmit(); - } - LOG4CXX_DEBUG(logger_, "Connection is to finalize (#" << pthread_self() << ")"); - Finalize(); - while (!frames_to_send_.empty()) { - LOG4CXX_INFO(logger_, "removing message (#" << pthread_self() << ")"); - ::protocol_handler::RawMessagePtr message = frames_to_send_.front(); - frames_to_send_.pop(); - controller_->DataSendFailed(device_handle(), application_handle(), - message, DataSendError()); - } - controller_->DisconnectDone(device_handle(), application_handle()); - } else { - LOG4CXX_ERROR(logger_, "Connection Establish failed (#" << pthread_self() << ")"); - controller_->ConnectFailed(device_handle(), application_handle(), - *connect_error); + if (!Establish(&connect_error)) { + LOG4CXX_ERROR(logger_, "Connection Establish failed"); delete connect_error; } - if (-1 != read_fd_) { - close(read_fd_); + LOG4CXX_DEBUG(logger_, "Connection established"); + controller_->ConnectDone(device_handle(), application_handle()); + while (!terminate_flag_) { + Transmit(); } - if (-1 != write_fd_) { - close(write_fd_); + LOG4CXX_DEBUG(logger_, "Connection is to finalize"); + Finalize(); + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + while (!frames_to_send_.empty()) { + LOG4CXX_INFO(logger_, "removing message"); + ::protocol_handler::RawMessagePtr message = frames_to_send_.front(); + frames_to_send_.pop(); + controller_->DataSendFailed(device_handle(), application_handle(), + message, DataSendError()); } - LOG4CXX_TRACE(logger_, "exit"); } void ThreadedSocketConnection::Transmit() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); - const nfds_t poll_fds_size = 2; - pollfd poll_fds[poll_fds_size]; + const nfds_t kPollFdsSize = 2; + pollfd poll_fds[kPollFdsSize]; poll_fds[0].fd = socket_; - poll_fds[0].events = POLLIN | POLLPRI | (frames_to_send_.empty() ? 0 : POLLOUT); + poll_fds[0].events = POLLIN | POLLPRI + | (frames_to_send_.empty() ? 0 : POLLOUT); poll_fds[1].fd = read_fd_; poll_fds[1].events = POLLIN | POLLPRI; - LOG4CXX_DEBUG(logger_, "poll (#" << pthread_self() << ") " << this); - if (-1 == poll(poll_fds, poll_fds_size, -1)) { + LOG4CXX_DEBUG(logger_, "poll " << this); + if (-1 == poll(poll_fds, kPollFdsSize, -1)) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this); Abort(); - LOG4CXX_TRACE(logger_, "exit. Condition: -1 == poll(poll_fds, poll_fds_size, -1)"); return; } - LOG4CXX_DEBUG(logger_, "poll is ok (#" << pthread_self() << ") " << this << " revents0:" - << - std::hex << poll_fds[0].revents << " revents1:" << std::hex << poll_fds[1].revents); + LOG4CXX_DEBUG( + logger_, + "poll is ok " << this << " revents0: " << std::hex << poll_fds[0].revents << + " revents1:" << std::hex << poll_fds[1].revents); // error check if (0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))) { LOG4CXX_ERROR(logger_, "Notification pipe for connection " << this << " terminated"); Abort(); - LOG4CXX_TRACE(logger_, - "exit. Condition: 0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))"); return; } if (poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { LOG4CXX_WARN(logger_, "Connection " << this << " terminated"); Abort(); - LOG4CXX_TRACE(logger_, - "exit. Condition: poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL"); return; } @@ -250,20 +228,18 @@ void ThreadedSocketConnection::Transmit() { LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to clear notification pipe"); LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this); Abort(); - LOG4CXX_TRACE(logger_, "exit. Condition: (bytes_read < 0) && (EAGAIN != errno)"); return; } // send data if possible if (!frames_to_send_.empty() && (poll_fds[0].revents | POLLOUT)) { - LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() (#" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() "); // send data const bool send_ok = Send(); if (!send_ok) { - LOG4CXX_ERROR(logger_, "Send() failed (#" << pthread_self() << ")"); + LOG4CXX_ERROR(logger_, "Send() failed "); Abort(); - LOG4CXX_TRACE(logger_, "exit. Condition: !send_ok"); return; } } @@ -272,17 +248,15 @@ void ThreadedSocketConnection::Transmit() { if (poll_fds[0].revents & (POLLIN | POLLPRI)) { const bool receive_ok = Receive(); if (!receive_ok) { - LOG4CXX_ERROR(logger_, "Receive() failed (#" << pthread_self() << ")"); + LOG4CXX_ERROR(logger_, "Receive() failed "); Abort(); - LOG4CXX_TRACE(logger_, "exit. Condition: !receive_ok"); return; } } - LOG4CXX_TRACE(logger_, "exit"); } bool ThreadedSocketConnection::Receive() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); uint8_t buffer[4096]; ssize_t bytes_read = -1; @@ -291,8 +265,8 @@ bool ThreadedSocketConnection::Receive() { if (bytes_read > 0) { LOG4CXX_DEBUG( - logger_, - "Received " << bytes_read << " bytes for connection " << this); + logger_, + "Received " << bytes_read << " bytes for connection " << this); ::protocol_handler::RawMessagePtr frame( new protocol_handler::RawMessage(0, 0, buffer, bytes_read)); controller_->DataReceiveDone(device_handle(), application_handle(), @@ -301,36 +275,33 @@ bool ThreadedSocketConnection::Receive() { if (EAGAIN != errno && EWOULDBLOCK != errno) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "recv() failed for connection " << this); - LOG4CXX_TRACE(logger_, - "exit with FALSE. Condition: EAGAIN != errno && EWOULDBLOCK != errno"); return false; } } else { LOG4CXX_WARN(logger_, "Connection " << this << " closed by remote peer"); - LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: bytes_read >= 0"); return false; } } while (bytes_read > 0); - LOG4CXX_TRACE(logger_, "exit with TRUE"); + return true; } bool ThreadedSocketConnection::Send() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); FrameQueue frames_to_send; - pthread_mutex_lock(&frames_to_send_mutex_); + frames_to_send_mutex_.Acquire(); std::swap(frames_to_send, frames_to_send_); - pthread_mutex_unlock(&frames_to_send_mutex_); + frames_to_send_mutex_.Release(); size_t offset = 0; while (!frames_to_send.empty()) { - LOG4CXX_INFO(logger_, "frames_to_send is not empty" << pthread_self() << ")"); + LOG4CXX_INFO(logger_, "frames_to_send is not empty"); ::protocol_handler::RawMessagePtr frame = frames_to_send.front(); const ssize_t bytes_sent = ::send(socket_, frame->data() + offset, frame->data_size() - offset, 0); if (bytes_sent >= 0) { - LOG4CXX_DEBUG(logger_, "bytes_sent >= 0" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "bytes_sent >= 0"); offset += bytes_sent; if (offset == frame->data_size()) { frames_to_send.pop(); @@ -338,7 +309,7 @@ bool ThreadedSocketConnection::Send() { controller_->DataSendDone(device_handle(), application_handle(), frame); } } else { - LOG4CXX_DEBUG(logger_, "bytes_sent < 0" << pthread_self() << ")"); + LOG4CXX_DEBUG(logger_, "bytes_sent < 0"); LOG4CXX_ERROR_WITH_ERRNO(logger_, "Send failed for connection " << this); frames_to_send.pop(); offset = 0; @@ -346,9 +317,24 @@ bool ThreadedSocketConnection::Send() { DataSendError()); } } - LOG4CXX_TRACE(logger_, "exit with TRUE"); + return true; } -} // namespace -} // namespace +ThreadedSocketConnection::SocketConnectionDelegate::SocketConnectionDelegate( + ThreadedSocketConnection* connection) + : connection_(connection) { +} + +void ThreadedSocketConnection::SocketConnectionDelegate::threadMain() { + LOG4CXX_AUTO_TRACE(logger_); + DCHECK(connection_); + connection_->threadMain(); +} + +void ThreadedSocketConnection::SocketConnectionDelegate::exitThreadMain() { + LOG4CXX_AUTO_TRACE(logger_); +} + +} // namespace transport_adapter +} // namespace transport_manager |