summaryrefslogtreecommitdiff
path: root/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
diff options
context:
space:
mode:
authorJustin Dickow <jjdickow@gmail.com>2015-02-20 09:11:18 -0500
committerJustin Dickow <jjdickow@gmail.com>2015-02-20 09:11:18 -0500
commita7c5d752cb75485baa0ded5226335d0f8eb10321 (patch)
treefbfd9251ada2cdcd5cf6a03a79887d08f6b496d7 /src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
parentb2b2233d866f102d3de339afa8ccaf37d3cf2570 (diff)
downloadsdl_core-a7c5d752cb75485baa0ded5226335d0f8eb10321.tar.gz
Bug Fixes and ImprovementsSynchronizationCommit
Fix Empty perform iteration request Fix type of name from string to enum SendLocation implemented on HTML5 HMI Fixed PI response on VR rejection due to high priority. Fix Apps not responsive/not able to start app/apps remain listed on SYNC even after USB disconnect Mobile API change and processing capabilities Change perform interaction request conditions. Fix SDL must always start 3sec timer before resuming the HMILevel of the app Remove redundant StartSavePersistentDataTimer() call. Change wrong predicate name to right. Added stream request handling feature Made streaming timeout in media manager configurable Put navi app in LIMITED in case of phone call Handling of audio state for applications Add stop streaming timeout into ini file Implement HMILevel resumption for job-1 Fix result code ABORTED when interrupts it by Voice recognition activation Fix incorrect value parameter unexpectedDisconnect in BCOnAppUnregistered Fix SDL send BC.OnAppUnregistered with "unexpectedDisconnect" set to "true" in case received from HMI OnExitAllApplications {"reason":"MASTER_RESET"} Fix Update ini file for iAP1 support Current working directory added to image path Fix helpers to make it workable with more then 2 parameters DCHECK() for ManageMobileCommand() replaced with log message because the latter returns false in some regular situations (e.g. TOO_MANY_PENDING_REQUESTS, see SDLAQ-CRS-10) Remove connection after closing. Signed-off-by: Justin Dickow <jjdickow@gmail.com>
Diffstat (limited to 'src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc')
-rw-r--r--src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc234
1 files changed, 110 insertions, 124 deletions
diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
index 1e189ed101..67e41cec2b 100644
--- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
+++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
*
@@ -36,7 +36,9 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
+
#include "utils/logger.h"
+#include "utils/threads/thread.h"
#include "transport_manager/transport_adapter/threaded_socket_connection.h"
#include "transport_manager/transport_adapter/transport_adapter_controller.h"
@@ -46,197 +48,173 @@ namespace transport_adapter {
CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager")
ThreadedSocketConnection::ThreadedSocketConnection(
- const DeviceUID& device_id, const ApplicationHandle& app_handle,
- TransportAdapterController* controller)
- : read_fd_(-1), write_fd_(-1), controller_(controller),
- frames_to_send_(),
- frames_to_send_mutex_(),
- socket_(-1),
- terminate_flag_(false),
- unexpected_disconnect_(false),
- device_uid_(device_id),
- app_handle_(app_handle)
- {
- pthread_mutex_init(&frames_to_send_mutex_, 0);
+ const DeviceUID& device_id, const ApplicationHandle& app_handle,
+ TransportAdapterController* controller)
+ : read_fd_(-1),
+ write_fd_(-1),
+ controller_(controller),
+ frames_to_send_(),
+ frames_to_send_mutex_(),
+ socket_(-1),
+ terminate_flag_(false),
+ unexpected_disconnect_(false),
+ device_uid_(device_id),
+ app_handle_(app_handle),
+ thread_(NULL) {
+ const std::string thread_name = std::string("Socket ") + device_handle();
+ thread_ = threads::CreateThread(thread_name.c_str(),
+ new SocketConnectionDelegate(this));
}
ThreadedSocketConnection::~ThreadedSocketConnection() {
- terminate_flag_ = true;
- Notify();
- pthread_mutex_destroy(&frames_to_send_mutex_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ Disconnect();
+ thread_->join();
+ delete thread_->delegate();
+ threads::DeleteThread(thread_);
+ if (-1 != read_fd_) {
+ close(read_fd_);
+ }
+ if (-1 != write_fd_) {
+ close(write_fd_);
+ }
}
void ThreadedSocketConnection::Abort() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
unexpected_disconnect_ = true;
terminate_flag_ = true;
- LOG4CXX_TRACE(logger_, "exit");
}
TransportAdapter::Error ThreadedSocketConnection::Start() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
int fds[2];
const int pipe_ret = pipe(fds);
if (0 == pipe_ret) {
- LOG4CXX_DEBUG(logger_, "pipe created(#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "pipe created");
read_fd_ = fds[0];
write_fd_ = fds[1];
} else {
- LOG4CXX_WARN(logger_, "pipe creation failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "pipe creation failed");
return TransportAdapter::FAIL;
}
const int fcntl_ret = fcntl(read_fd_, F_SETFL,
fcntl(read_fd_, F_GETFL) | O_NONBLOCK);
if (0 != fcntl_ret) {
- LOG4CXX_WARN(logger_, "fcntl failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "fcntl failed");
return TransportAdapter::FAIL;
}
- const std::string thread_name = std::string("Socket ") + device_handle();
- thread_ = threads::CreateThread(thread_name.c_str(), this);
-
if (!thread_->start()) {
- LOG4CXX_WARN(logger_, "thread creation failed (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ LOG4CXX_ERROR(logger_, "thread creation failed");
return TransportAdapter::FAIL;
}
- LOG4CXX_DEBUG(logger_, "thread created (#" << pthread_self() << ")");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
+ LOG4CXX_INFO(logger_, "thread created");
return TransportAdapter::OK;
}
void ThreadedSocketConnection::Finalize() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
if (unexpected_disconnect_) {
- LOG4CXX_DEBUG(logger_, "unexpected_disconnect (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "unexpected_disconnect");
controller_->ConnectionAborted(device_handle(), application_handle(),
CommunicationError());
} else {
- LOG4CXX_DEBUG(logger_, "not unexpected_disconnect (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "not unexpected_disconnect");
controller_->ConnectionFinished(device_handle(), application_handle());
}
close(socket_);
- LOG4CXX_TRACE(logger_, "exit");
}
TransportAdapter::Error ThreadedSocketConnection::Notify() const {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
if (-1 == write_fd_) {
- LOG4CXX_ERROR_WITH_ERRNO(logger_,
- "Failed to wake up connection thread for connection " << this);
+ LOG4CXX_ERROR_WITH_ERRNO(
+ logger_, "Failed to wake up connection thread for connection " << this);
LOG4CXX_TRACE(logger_, "exit with TransportAdapter::BAD_STATE");
return TransportAdapter::BAD_STATE;
}
uint8_t c = 0;
- if (1 == write(write_fd_, &c, 1)) {
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
- return TransportAdapter::OK;
- } else {
+ if (1 != write(write_fd_, &c, 1)) {
LOG4CXX_ERROR_WITH_ERRNO(
- logger_, "Failed to wake up connection thread for connection " << this);
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL");
+ logger_, "Failed to wake up connection thread for connection " << this);
return TransportAdapter::FAIL;
}
+ return TransportAdapter::OK;
}
TransportAdapter::Error ThreadedSocketConnection::SendData(
- ::protocol_handler::RawMessagePtr message) {
- LOG4CXX_TRACE(logger_, "enter");
- pthread_mutex_lock(&frames_to_send_mutex_);
+ ::protocol_handler::RawMessagePtr message) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock auto_lock(frames_to_send_mutex_);
frames_to_send_.push(message);
- pthread_mutex_unlock(&frames_to_send_mutex_);
- TransportAdapter::Error error = Notify();
- LOG4CXX_TRACE(logger_, "exit with error" << error);
- return error;
+ return Notify();
}
TransportAdapter::Error ThreadedSocketConnection::Disconnect() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
terminate_flag_ = true;
- TransportAdapter::Error error = Notify();
- LOG4CXX_TRACE(logger_, "exit with error" << error);
- return error;
-}
-
-bool ThreadedSocketConnection::exitThreadMain() {
- terminate_flag_ = true;
- Notify();
- return true;
+ return Notify();
}
void ThreadedSocketConnection::threadMain() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
controller_->ConnectionCreated(this, device_uid_, app_handle_);
ConnectError* connect_error = NULL;
- if (Establish(&connect_error)) {
- LOG4CXX_DEBUG(logger_, "Connection established (#" << pthread_self() << ")");
- controller_->ConnectDone(device_handle(), application_handle());
- while (!terminate_flag_) {
- Transmit();
- }
- LOG4CXX_DEBUG(logger_, "Connection is to finalize (#" << pthread_self() << ")");
- Finalize();
- while (!frames_to_send_.empty()) {
- LOG4CXX_INFO(logger_, "removing message (#" << pthread_self() << ")");
- ::protocol_handler::RawMessagePtr message = frames_to_send_.front();
- frames_to_send_.pop();
- controller_->DataSendFailed(device_handle(), application_handle(),
- message, DataSendError());
- }
- controller_->DisconnectDone(device_handle(), application_handle());
- } else {
- LOG4CXX_ERROR(logger_, "Connection Establish failed (#" << pthread_self() << ")");
- controller_->ConnectFailed(device_handle(), application_handle(),
- *connect_error);
+ if (!Establish(&connect_error)) {
+ LOG4CXX_ERROR(logger_, "Connection Establish failed");
delete connect_error;
}
- if (-1 != read_fd_) {
- close(read_fd_);
+ LOG4CXX_DEBUG(logger_, "Connection established");
+ controller_->ConnectDone(device_handle(), application_handle());
+ while (!terminate_flag_) {
+ Transmit();
}
- if (-1 != write_fd_) {
- close(write_fd_);
+ LOG4CXX_DEBUG(logger_, "Connection is to finalize");
+ Finalize();
+ sync_primitives::AutoLock auto_lock(frames_to_send_mutex_);
+ while (!frames_to_send_.empty()) {
+ LOG4CXX_INFO(logger_, "removing message");
+ ::protocol_handler::RawMessagePtr message = frames_to_send_.front();
+ frames_to_send_.pop();
+ controller_->DataSendFailed(device_handle(), application_handle(),
+ message, DataSendError());
}
- LOG4CXX_TRACE(logger_, "exit");
}
void ThreadedSocketConnection::Transmit() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
- const nfds_t poll_fds_size = 2;
- pollfd poll_fds[poll_fds_size];
+ const nfds_t kPollFdsSize = 2;
+ pollfd poll_fds[kPollFdsSize];
poll_fds[0].fd = socket_;
- poll_fds[0].events = POLLIN | POLLPRI | (frames_to_send_.empty() ? 0 : POLLOUT);
+ poll_fds[0].events = POLLIN | POLLPRI
+ | (frames_to_send_.empty() ? 0 : POLLOUT);
poll_fds[1].fd = read_fd_;
poll_fds[1].events = POLLIN | POLLPRI;
- LOG4CXX_DEBUG(logger_, "poll (#" << pthread_self() << ") " << this);
- if (-1 == poll(poll_fds, poll_fds_size, -1)) {
+ LOG4CXX_DEBUG(logger_, "poll " << this);
+ if (-1 == poll(poll_fds, kPollFdsSize, -1)) {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this);
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: -1 == poll(poll_fds, poll_fds_size, -1)");
return;
}
- LOG4CXX_DEBUG(logger_, "poll is ok (#" << pthread_self() << ") " << this << " revents0:"
- <<
- std::hex << poll_fds[0].revents << " revents1:" << std::hex << poll_fds[1].revents);
+ LOG4CXX_DEBUG(
+ logger_,
+ "poll is ok " << this << " revents0: " << std::hex << poll_fds[0].revents <<
+ " revents1:" << std::hex << poll_fds[1].revents);
// error check
if (0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))) {
LOG4CXX_ERROR(logger_,
"Notification pipe for connection " << this << " terminated");
Abort();
- LOG4CXX_TRACE(logger_,
- "exit. Condition: 0 != (poll_fds[1].revents & (POLLERR | POLLHUP | POLLNVAL))");
return;
}
if (poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
LOG4CXX_WARN(logger_, "Connection " << this << " terminated");
Abort();
- LOG4CXX_TRACE(logger_,
- "exit. Condition: poll_fds[0].revents & (POLLERR | POLLHUP | POLLNVAL");
return;
}
@@ -250,20 +228,18 @@ void ThreadedSocketConnection::Transmit() {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to clear notification pipe");
LOG4CXX_ERROR_WITH_ERRNO(logger_, "poll failed for connection " << this);
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: (bytes_read < 0) && (EAGAIN != errno)");
return;
}
// send data if possible
if (!frames_to_send_.empty() && (poll_fds[0].revents | POLLOUT)) {
- LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() (#" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() ");
// send data
const bool send_ok = Send();
if (!send_ok) {
- LOG4CXX_ERROR(logger_, "Send() failed (#" << pthread_self() << ")");
+ LOG4CXX_ERROR(logger_, "Send() failed ");
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: !send_ok");
return;
}
}
@@ -272,17 +248,15 @@ void ThreadedSocketConnection::Transmit() {
if (poll_fds[0].revents & (POLLIN | POLLPRI)) {
const bool receive_ok = Receive();
if (!receive_ok) {
- LOG4CXX_ERROR(logger_, "Receive() failed (#" << pthread_self() << ")");
+ LOG4CXX_ERROR(logger_, "Receive() failed ");
Abort();
- LOG4CXX_TRACE(logger_, "exit. Condition: !receive_ok");
return;
}
}
- LOG4CXX_TRACE(logger_, "exit");
}
bool ThreadedSocketConnection::Receive() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
uint8_t buffer[4096];
ssize_t bytes_read = -1;
@@ -291,8 +265,8 @@ bool ThreadedSocketConnection::Receive() {
if (bytes_read > 0) {
LOG4CXX_DEBUG(
- logger_,
- "Received " << bytes_read << " bytes for connection " << this);
+ logger_,
+ "Received " << bytes_read << " bytes for connection " << this);
::protocol_handler::RawMessagePtr frame(
new protocol_handler::RawMessage(0, 0, buffer, bytes_read));
controller_->DataReceiveDone(device_handle(), application_handle(),
@@ -301,36 +275,33 @@ bool ThreadedSocketConnection::Receive() {
if (EAGAIN != errno && EWOULDBLOCK != errno) {
LOG4CXX_ERROR_WITH_ERRNO(logger_,
"recv() failed for connection " << this);
- LOG4CXX_TRACE(logger_,
- "exit with FALSE. Condition: EAGAIN != errno && EWOULDBLOCK != errno");
return false;
}
} else {
LOG4CXX_WARN(logger_, "Connection " << this << " closed by remote peer");
- LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: bytes_read >= 0");
return false;
}
} while (bytes_read > 0);
- LOG4CXX_TRACE(logger_, "exit with TRUE");
+
return true;
}
bool ThreadedSocketConnection::Send() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
FrameQueue frames_to_send;
- pthread_mutex_lock(&frames_to_send_mutex_);
+ frames_to_send_mutex_.Acquire();
std::swap(frames_to_send, frames_to_send_);
- pthread_mutex_unlock(&frames_to_send_mutex_);
+ frames_to_send_mutex_.Release();
size_t offset = 0;
while (!frames_to_send.empty()) {
- LOG4CXX_INFO(logger_, "frames_to_send is not empty" << pthread_self() << ")");
+ LOG4CXX_INFO(logger_, "frames_to_send is not empty");
::protocol_handler::RawMessagePtr frame = frames_to_send.front();
const ssize_t bytes_sent = ::send(socket_, frame->data() + offset,
frame->data_size() - offset, 0);
if (bytes_sent >= 0) {
- LOG4CXX_DEBUG(logger_, "bytes_sent >= 0" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "bytes_sent >= 0");
offset += bytes_sent;
if (offset == frame->data_size()) {
frames_to_send.pop();
@@ -338,7 +309,7 @@ bool ThreadedSocketConnection::Send() {
controller_->DataSendDone(device_handle(), application_handle(), frame);
}
} else {
- LOG4CXX_DEBUG(logger_, "bytes_sent < 0" << pthread_self() << ")");
+ LOG4CXX_DEBUG(logger_, "bytes_sent < 0");
LOG4CXX_ERROR_WITH_ERRNO(logger_, "Send failed for connection " << this);
frames_to_send.pop();
offset = 0;
@@ -346,9 +317,24 @@ bool ThreadedSocketConnection::Send() {
DataSendError());
}
}
- LOG4CXX_TRACE(logger_, "exit with TRUE");
+
return true;
}
-} // namespace
-} // namespace
+ThreadedSocketConnection::SocketConnectionDelegate::SocketConnectionDelegate(
+ ThreadedSocketConnection* connection)
+ : connection_(connection) {
+}
+
+void ThreadedSocketConnection::SocketConnectionDelegate::threadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ DCHECK(connection_);
+ connection_->threadMain();
+}
+
+void ThreadedSocketConnection::SocketConnectionDelegate::exitThreadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+}
+
+} // namespace transport_adapter
+} // namespace transport_manager