summaryrefslogtreecommitdiff
path: root/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc')
-rw-r--r--src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc234
1 files changed, 110 insertions, 124 deletions
diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
index 1e189ed10..67e41cec2 100644
--- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
+++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
*
@@ -36,7 +36,9 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
+
#include "utils/logger.h"
+#include "utils/threads/thread.h"
#include "transport_manager/transport_adapter/threaded_socket_connection.h"
#include "transport_manager/transport_adapter/transport_adapter_controller.h"
@@ -46,197 +48,173 @@ namespace transport_adapter {
CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager")
ThreadedSocketConnection::ThreadedSocketConnection(
- const DeviceUID& device_id, const ApplicationHandle& app_handle,
- TransportAdapterController* controller)
- : read_fd_(-1), write_fd_(-1), controller_(controller),
- frames_to_send_(),
- frames_to_send_mutex_(),
- socket_(-1),
- terminate_flag_(false),
- unexpected_disconnect_(false),
- device_uid_(device_id),
- app_handle_(app_handle)
- {
- pthread_mutex_init(&frames_to_send_mutex_, 0);
+ const DeviceUID& device_id, const ApplicationHandle& app_handle,
+ TransportAdapterController* controller)
+ : read_fd_(-1),
+ write_fd_(-1),
+ controller_(controller),
+ frames_to_send_(),
+ frames_to_send_mutex_(),
+ socket_(-1),
+ terminate_flag_(false),
+ unexpected_disconnect_(false),
+ device_uid_(device_id),
+ app_handle_(app_handle),
+ thread_(NULL) {
+ const std::string thread_name = std::string("Socket ") + device_handle();
+ thread_ = threads::CreateThread(thread_name.c_str(),
+ new SocketConnectionDelegate(this));
}
ThreadedSocketConnection::~ThreadedSocketConnection() {
- terminate_flag_ = true;
- Notify();
- pthread_mutex_destroy(&frames_to_send_mutex_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ Disconnect();
+ thread_->join();
+ delete thread_->delegate();
+ threads::DeleteThread(thread_);
+ if (-1 != read_fd_) {
+ close(read_fd_);
+ }
+ if (-1 != write_fd_) {
+ close(write_fd_);
+ }
}
void ThreadedSocketConnection::Abort() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
unexpected_disconnect_ = true;
terminate_flag_ = true;
- LOG4CXX_TRACE(logger_, "exit");
}
TransportAdapter::Error ThreadedSocketConnection::Start() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
int fds[2];
const int pipe_ret = pipe(fds);
if (0 == pipe_ret) {
- LOG4CXX_DEBUG(logger_, "pipe created(#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "pipe created");
read_fd_ = fds[0];
write_fd_ = fds[1];
} else {
- LOG4CXX_WARN(logger_, "pipe creation failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "pipe creation failed");
return TransportAdapter::FAIL;
}
const int fcntl_ret = fcntl(read_fd_, F_SETFL,
fcntl(read_fd_, F_GETFL) | O_NONBLOCK);
if (0 != fcntl_ret) {
- LOG4CXX_WARN(logger_, "fcntl failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "fcntl failed");
return TransportAdapter::FAIL;
}
- const std::string thread_name = std::string("Socket ") + device_handle();
- thread_ = threads::CreateThread(thread_name.c_str(), this);
-
if (!thread_->start()) {
- LOG4CXX_WARN(logger_, "thread creation failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "thread creation failed");
return TransportAdapter::FAIL;
}
- LOG4CXX_DEBUG(logger_, "thread created (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
+ LOG4CXX_INFO(logger_, "thread created");
return TransportAdapter::OK;
}
void ThreadedSocketConnection::Finalize() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
if (unexpected_disconnect_) {
- LOG4CXX_DEBUG(logger_, "unexpected_disconnect (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "unexpected_disconnect");
controller_->ConnectionAborted(device_handle(), application_handle(),
CommunicationError());
} else {
- LOG4CXX_DEBUG(logger_, "not unexpected_disconnect (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "not unexpected_disconnect");
controller_->ConnectionFinished(device_handle(), application_handle());
}
close(socket_);
- LOG4CXX_TRACE(logger_, "exit");
}
TransportAdapter::Error ThreadedSocketConnection::Notify() const {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
if (-1 == write_fd_) {
- LOG4CXX_ERROR_WITH_ERRNO(logger_,
- "Failed to wake up connection thread for connection " << this);
+ LOG4CXX_ERROR_WITH_ERRNO(
+ logger_, "Failed to wake up connection thread for connection " << this);
LOG4CXX_TRACE(logger_, "exit with TransportAdapter::BAD_STATE");
return TransportAdapter::BAD_STATE;
}
uint8_t c = 0;
- if (1 == write(write_fd_, &c, 1)) {
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
- return TransportAdapter::OK;
- } else {
+ if (1 != write(write_fd_, &c, 1)) {
LOG4CXX_ERROR_WITH_ERRNO(
- logger_, "Failed to wake up connection thread for connection " << this);
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ logger_, "Failed to wake up connection thread for connection " << this);
return TransportAdapter::FAIL;
}
+ return TransportAdapter::OK;
}
TransportAdapter::Error ThreadedSocketConnection::SendData(
- ::protocol_handler::RawMessagePtr message) {
- LOG4CXX_TRACE(logger_, "enter");
- pthread_mutex_lock(&frames_to_send_mutex_);
+ ::protocol_handler::RawMessagePtr message) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock auto_lock(frames_to_send_mutex_);
frames_to_send_.push(message);
- pthread_mutex_unlock(&frames_to_send_mutex_);
- TransportAdapter::Error error = Notify();
- LOG4CXX_TRACE(logger_, "exit with error" << error);
- return error;
+ return Notify();
}
TransportAdapter::Error ThreadedSocketConnection::Disconnect() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
terminate_flag_ = true;
- TransportAdapter::Error error = Notify();
- LOG4CXX_TRACE(logger_, "exit with error" << error);
- return error;
-}
-
-bool ThreadedSocketConnection::exitThreadMain() {
- terminate_flag_ = true;
- Notify();
- return true;
+ return Notify();
}
void ThreadedSocketConnection::threadMain() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
controller_->ConnectionCreated(this, device_uid_, app_handle_);
ConnectError* connect_error = NULL;
- if (Establish(&connect_error)) {
- LOG4CXX_DEBUG(logger_, "Connection established (#" << pthread_self() << ")");
- controller_->ConnectDone(device_handle(), application_handle());
- while (!terminate_flag_) {
- Transmit();
- }
- LOG4CXX_DEBUG(logger_, "Connection is to finalize (#" << pthread_self() << ")");
- Finalize();
- while (!frames_to_send_.empty()) {
- LOG4CXX_INFO(logger_, "removing message (#" << pthread_self() << ")");
- ::protocol_handler::RawMessagePtr message = frames_to_send_.front();
- frames_to_send_.pop();
- controller_->DataSendFailed(device_handle(), application_handle(),
- message, DataSendError());
- }
- controller_->DisconnectDone(device_handle(), application_handle());
- } else {
- LOG4CXX_ERROR(logger_, "Connection Establish failed (#" << pthread_self() << ")");
- controller_->ConnectFailed(device_handle(), application_handle(),
- *connect_error);
+ if (!Establish(&connect_error)) {
+ LOG4CXX_ERROR(logger_, "Connection Establish failed");
delete connect_error;
}
- if (-1 != read_fd_) {
- close(read_fd_);
+ LOG4CXX_DEBUG(logger_, "Connection established");
+ controller_->ConnectDone(device_handle(), application_handle());
+ while (!terminate_flag_) {
+ Transmit();
}
- if (-1 != write_fd_) {
- close(write_fd_);
+ LOG4CXX_DEBUG(logger_, "Connection is to finalize");
+ Finalize();
+ sync_primitives::AutoLock auto_lock(frames_to_send_mutex_);
+ while (!frames_to_send_.empty()) {
+ LOG4CXX_INFO(logger_, "removing message");
+ ::protocol_handler::RawMessagePtr message = frames_to_send_.front();
+ frames_to_send_.pop();
+ controller_->DataSendFailed(device_handle(), application_handle(),
+ message, DataSendError());
}
- LOG4CXX_TRACE(logger_, "exit");
}
void ThreadedSocketConnection::Transmit() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
- const nfds_t poll_fds_size = 2;
- pollfd poll_fds[poll_fds_size];
+ const nfds_t kPollFdsSize = 2;
+ pollfd poll_fds[kPollFdsSize];
poll_fds[0].fd = socket_;
- poll_fds[0].events = POLLIN | POLLPRI | (frames_to_send_.empty() ? 0 : POLLOUT);
+ poll_fds[0].events = POLLIN | POLLPRI
+ | (frames_to_send_.empty() ? 0 : POLLOUT);
poll_fds[1].fd = read_fd_;
poll_fds[1].events = POLLIN | POLLPRI;
- LOG4CXX_DEBUG(logger_, "poll (#" << pthread_self() << ") " << this);
- if (-1 == poll(poll_fds, poll_fds_size, -1)) {
+ LOG4CXX_DEBUG(logger_, "poll " << this);
+ if (-1 == poll(poll_fds, kPollFdsSize, -1)) {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this);
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: -1 == poll(poll_fds, poll_fds_size, -1)");
return;
}
- LOG4CXX_DEBUG(logger_, "poll is ok (#" << pthread_self() << ") " << this << " revents0:"
- <<
- std::hex << poll_fds[0].revents << " revents1:" << std::hex << poll_fds[1].revents);
+ LOG4CXX_DEBUG(
+ logger_,
+ "poll is ok " << this << " revents0: " << std::hex << poll_fds[0].revents <<
+ " revents1:" << std::hex << poll_fds[1].revents);
// error check
if (0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))) {
LOG4CXX_ERROR(logger_,
"Notification pipe for connection " << this << " terminated");
Abort();
- LOG4CXX_TRACE(logger_,
- "exit. Condition: 0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))");
return;
}
if (poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
LOG4CXX_WARN(logger_, "Connection " << this << " terminated");
Abort();
- LOG4CXX_TRACE(logger_,
- "exit. Condition: poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL");
return;
}
@@ -250,20 +228,18 @@ void ThreadedSocketConnection::Transmit() {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to clear notification pipe");
LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this);
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: (bytes_read < 0) && (EAGAIN != errno)");
return;
}
// send data if possible
if (!frames_to_send_.empty() && (poll_fds[0].revents | POLLOUT)) {
- LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() ");
// send data
const bool send_ok = Send();
if (!send_ok) {
- LOG4CXX_ERROR(logger_, "Send() failed (#" << pthread_self() << ")");
+ LOG4CXX_ERROR(logger_, "Send() failed ");
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: !send_ok");
return;
}
}
@@ -272,17 +248,15 @@ void ThreadedSocketConnection::Transmit() {
if (poll_fds[0].revents & (POLLIN | POLLPRI)) {
const bool receive_ok = Receive();
if (!receive_ok) {
- LOG4CXX_ERROR(logger_, "Receive() failed (#" << pthread_self() << ")");
+ LOG4CXX_ERROR(logger_, "Receive() failed ");
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: !receive_ok");
return;
}
}
- LOG4CXX_TRACE(logger_, "exit");
}
bool ThreadedSocketConnection::Receive() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
uint8_t buffer[4096];
ssize_t bytes_read = -1;
@@ -291,8 +265,8 @@ bool ThreadedSocketConnection::Receive() {
if (bytes_read > 0) {
LOG4CXX_DEBUG(
- logger_,
- "Received " << bytes_read << " bytes for connection " << this);
+ logger_,
+ "Received " << bytes_read << " bytes for connection " << this);
::protocol_handler::RawMessagePtr frame(
new protocol_handler::RawMessage(0, 0, buffer, bytes_read));
controller_->DataReceiveDone(device_handle(), application_handle(),
@@ -301,36 +275,33 @@ bool ThreadedSocketConnection::Receive() {
if (EAGAIN != errno && EWOULDBLOCK != errno) {
LOG4CXX_ERROR_WITH_ERRNO(logger_,
"recv() failed for connection " << this);
- LOG4CXX_TRACE(logger_,
- "exit with FALSE. Condition: EAGAIN != errno && EWOULDBLOCK != errno");
return false;
}
} else {
LOG4CXX_WARN(logger_, "Connection " << this << " closed by remote peer");
- LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: bytes_read >= 0");
return false;
}
} while (bytes_read > 0);
- LOG4CXX_TRACE(logger_, "exit with TRUE");
+
return true;
}
bool ThreadedSocketConnection::Send() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
FrameQueue frames_to_send;
- pthread_mutex_lock(&frames_to_send_mutex_);
+ frames_to_send_mutex_.Acquire();
std::swap(frames_to_send, frames_to_send_);
- pthread_mutex_unlock(&frames_to_send_mutex_);
+ frames_to_send_mutex_.Release();
size_t offset = 0;
while (!frames_to_send.empty()) {
- LOG4CXX_INFO(logger_, "frames_to_send is not empty" << pthread_self() << ")");
+ LOG4CXX_INFO(logger_, "frames_to_send is not empty");
::protocol_handler::RawMessagePtr frame = frames_to_send.front();
const ssize_t bytes_sent = ::send(socket_, frame->data() + offset,
frame->data_size() - offset, 0);
if (bytes_sent >= 0) {
- LOG4CXX_DEBUG(logger_, "bytes_sent >= 0" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "bytes_sent >= 0");
offset += bytes_sent;
if (offset == frame->data_size()) {
frames_to_send.pop();
@@ -338,7 +309,7 @@ bool ThreadedSocketConnection::Send() {
controller_->DataSendDone(device_handle(), application_handle(), frame);
}
} else {
- LOG4CXX_DEBUG(logger_, "bytes_sent < 0" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "bytes_sent < 0");
LOG4CXX_ERROR_WITH_ERRNO(logger_, "Send failed for connection " << this);
frames_to_send.pop();
offset = 0;
@@ -346,9 +317,24 @@ bool ThreadedSocketConnection::Send() {
DataSendError());
}
}
- LOG4CXX_TRACE(logger_, "exit with TRUE");
+
return true;
}
-} // namespace
-} // namespace
+ThreadedSocketConnection::SocketConnectionDelegate::SocketConnectionDelegate(
+ ThreadedSocketConnection* connection)
+ : connection_(connection) {
+}
+
+void ThreadedSocketConnection::SocketConnectionDelegate::threadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ DCHECK(connection_);
+ connection_->threadMain();
+}
+
+void ThreadedSocketConnection::SocketConnectionDelegate::exitThreadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+}
+
+} // namespace transport_adapter
+} // namespace transport_manager