summaryrefslogtreecommitdiff
path: root/src/components/transport_manager/src/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/transport_manager/src/tcp')
-rw-r--r--src/components/transport_manager/src/tcp/dnssd_service_browser.cc44
-rw-r--r--src/components/transport_manager/src/tcp/tcp_client_listener.cc117
-rw-r--r--src/components/transport_manager/src/tcp/tcp_connection_factory.cc2
-rw-r--r--src/components/transport_manager/src/tcp/tcp_device.cc34
-rw-r--r--src/components/transport_manager/src/tcp/tcp_socket_connection.cc7
-rw-r--r--src/components/transport_manager/src/tcp/tcp_transport_adapter.cc6
6 files changed, 114 insertions, 96 deletions
diff --git a/src/components/transport_manager/src/tcp/dnssd_service_browser.cc b/src/components/transport_manager/src/tcp/dnssd_service_browser.cc
index 5580585766..e8ba2ea874 100644
--- a/src/components/transport_manager/src/tcp/dnssd_service_browser.cc
+++ b/src/components/transport_manager/src/tcp/dnssd_service_browser.cc
@@ -1,4 +1,4 @@
-/**
+/*
*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
@@ -56,12 +56,15 @@ void DnssdServiceBrowser::Terminate() {
}
if (0 != avahi_service_browser_) {
avahi_service_browser_free(avahi_service_browser_);
+ avahi_service_browser_ = NULL;
}
if (0 != avahi_client_) {
avahi_client_free(avahi_client_);
+ avahi_client_ = NULL;
}
if (0 != avahi_threaded_poll_) {
avahi_threaded_poll_free(avahi_threaded_poll_);
+ avahi_threaded_poll_ = NULL;
}
LOG4CXX_TRACE(logger_, "exit");
}
@@ -78,11 +81,9 @@ DnssdServiceBrowser::DnssdServiceBrowser(TransportAdapterController* controller)
service_records_(),
mutex_(),
initialised_(false) {
- pthread_mutex_init(&mutex_, 0);
}
DnssdServiceBrowser::~DnssdServiceBrowser() {
- pthread_mutex_destroy(&mutex_);
}
void DnssdServiceBrowser::OnClientConnected() {
@@ -172,8 +173,8 @@ void AvahiServiceBrowserCallback(AvahiServiceBrowser* avahi_service_browser,
void DnssdServiceBrowser::ServiceResolved(
const DnssdServiceRecord& service_record) {
- LOG4CXX_TRACE(logger_, "enter");
- pthread_mutex_lock(&mutex_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock locker(mutex_);
ServiceRecords::iterator service_record_it = std::find(
service_records_.begin(), service_records_.end(), service_record);
if (service_record_it != service_records_.end()) {
@@ -181,23 +182,19 @@ void DnssdServiceBrowser::ServiceResolved(
}
DeviceVector device_vector = PrepareDeviceVector();
controller_->SearchDeviceDone(device_vector);
- pthread_mutex_unlock(&mutex_);
- LOG4CXX_TRACE(logger_, "exit");
}
void DnssdServiceBrowser::ServiceResolveFailed(
const DnssdServiceRecord& service_record) {
- LOG4CXX_TRACE(logger_, "enter");
- LOG4CXX_ERROR(logger_,
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_,
"AvahiServiceResolver failure for: " << service_record.name);
- pthread_mutex_lock(&mutex_);
+ sync_primitives::AutoLock locker(mutex_);
ServiceRecords::iterator service_record_it = std::find(
service_records_.begin(), service_records_.end(), service_record);
if (service_record_it != service_records_.end()) {
service_records_.erase(service_record_it);
}
- pthread_mutex_unlock(&mutex_);
- LOG4CXX_TRACE(logger_, "exit");
}
void AvahiServiceResolverCallback(AvahiServiceResolver* avahi_service_resolver,
@@ -246,9 +243,11 @@ TransportAdapter::Error DnssdServiceBrowser::CreateAvahiClientAndBrowser() {
LOG4CXX_TRACE(logger_, "enter");
if (0 != avahi_service_browser_) {
avahi_service_browser_free(avahi_service_browser_);
+ avahi_service_browser_ = NULL;
}
if (0 != avahi_client_) {
avahi_client_free(avahi_client_);
+ avahi_client_ = NULL;
}
int avahi_error;
@@ -261,9 +260,9 @@ TransportAdapter::Error DnssdServiceBrowser::CreateAvahiClientAndBrowser() {
return TransportAdapter::FAIL;
}
- pthread_mutex_lock(&mutex_);
+ mutex_.Acquire();
service_records_.clear();
- pthread_mutex_unlock(&mutex_);
+ mutex_.Release();
avahi_service_browser_ = avahi_service_browser_new(
avahi_client_, AVAHI_IF_UNSPEC, /* TODO use only required iface */
@@ -305,7 +304,8 @@ TransportAdapter::Error DnssdServiceBrowser::Scan() {
void DnssdServiceBrowser::AddService(AvahiIfIndex interface,
AvahiProtocol protocol, const char* name,
const char* type, const char* domain) {
- LOG4CXX_TRACE(logger_, "enter: interface " << interface << " protocol " << protocol <<
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "interface " << interface << " protocol " << protocol <<
" name " << name << " type " << type << " domain " << domain);
DnssdServiceRecord record;
record.interface = interface;
@@ -314,7 +314,7 @@ void DnssdServiceBrowser::AddService(AvahiIfIndex interface,
record.name = name;
record.type = type;
- pthread_mutex_lock(&mutex_);
+ sync_primitives::AutoLock locker(mutex_);
if (service_records_.end()
== std::find(service_records_.begin(), service_records_.end(), record)) {
service_records_.push_back(record);
@@ -323,15 +323,14 @@ void DnssdServiceBrowser::AddService(AvahiIfIndex interface,
AVAHI_PROTO_INET, static_cast<AvahiLookupFlags>(0),
AvahiServiceResolverCallback, this);
}
- pthread_mutex_unlock(&mutex_);
- LOG4CXX_TRACE(logger_, "exit");
}
void DnssdServiceBrowser::RemoveService(AvahiIfIndex interface,
AvahiProtocol protocol,
const char* name, const char* type,
const char* domain) {
- LOG4CXX_TRACE(logger_, "enter: interface " << interface << " protocol " << protocol <<
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "interface " << interface << " protocol " << protocol <<
" name " << name << " type " << type << " domain " << domain);
DnssdServiceRecord record;
record.interface = interface;
@@ -340,16 +339,14 @@ void DnssdServiceBrowser::RemoveService(AvahiIfIndex interface,
record.type = type;
record.domain_name = domain;
- pthread_mutex_lock(&mutex_);
+ sync_primitives::AutoLock locker(mutex_);
service_records_.erase(
std::remove(service_records_.begin(), service_records_.end(), record),
service_records_.end());
- pthread_mutex_unlock(&mutex_);
- LOG4CXX_TRACE(logger_, "exit");
}
DeviceVector DnssdServiceBrowser::PrepareDeviceVector() const {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
std::map<uint32_t, TcpDevice*> devices;
for (ServiceRecords::const_iterator it = service_records_.begin();
it != service_records_.end(); ++it) {
@@ -372,7 +369,6 @@ DeviceVector DnssdServiceBrowser::PrepareDeviceVector() const {
it != devices.end(); ++it) {
device_vector.push_back(DeviceSptr(it->second));
}
- LOG4CXX_TRACE(logger_, "exit");
return device_vector;
}
diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc
index c0f39cc490..954c734d68 100644
--- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc
+++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc
@@ -53,7 +53,7 @@
#include <sstream>
#include "utils/logger.h"
-
+#include "utils/threads/thread.h"
#include "transport_manager/transport_adapter/transport_adapter_controller.h"
#include "transport_manager/tcp/tcp_device.h"
#include "transport_manager/tcp/tcp_socket_connection.h"
@@ -66,31 +66,35 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager")
TcpClientListener::TcpClientListener(TransportAdapterController* controller,
const uint16_t port,
const bool enable_keepalive)
- : port_(port),
- enable_keepalive_(enable_keepalive),
- controller_(controller),
- thread_(threads::CreateThread("TcpClientListener", this)),
- socket_(-1),
- thread_stop_requested_(false) { }
+ : port_(port),
+ enable_keepalive_(enable_keepalive),
+ controller_(controller),
+ thread_(0),
+ socket_(-1),
+ thread_stop_requested_(false) {
+ thread_ = threads::CreateThread("TcpClientListener",
+ new ListeningThreadDelegate(this));
+}
TransportAdapter::Error TcpClientListener::Init() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_stop_requested_ = false;
return TransportAdapter::OK;
}
void TcpClientListener::Terminate() {
- LOG4CXX_TRACE(logger_, "enter");
- if (TransportAdapter::OK != StopListening()) {
- LOG4CXX_ERROR(logger_, "Cannot stop listening TCP");
- }
- LOG4CXX_TRACE(logger_, "exit");
+ thread_->stop();
}
bool TcpClientListener::IsInitialised() const {
- return true;
+ return thread_;
}
TcpClientListener::~TcpClientListener() {
- LOG4CXX_INFO(logger_, "destructor");
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_->join();
+ delete thread_->delegate();
+ threads::DeleteThread(thread_);
}
void SetKeepaliveOptions(const int fd) {
@@ -130,7 +134,7 @@ void SetKeepaliveOptions(const int fd) {
mib[3] = TCPCTL_KEEPINTVL;
sysctl(mib, kMidLength, NULL, NULL, &keepintvl, sizeof(keepintvl));
- struct timeval tval = { 0 };
+ struct timeval tval = {0};
tval.tv_sec = keepidle;
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes));
setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &tval, sizeof(tval));
@@ -138,12 +142,13 @@ void SetKeepaliveOptions(const int fd) {
LOG4CXX_TRACE(logger_, "exit");
}
-void TcpClientListener::threadMain() {
+void TcpClientListener::Loop() {
LOG4CXX_TRACE(logger_, "enter");
while (!thread_stop_requested_) {
sockaddr_in client_address;
socklen_t client_address_size = sizeof(client_address);
- const int connection_fd = accept(socket_, (struct sockaddr*)&client_address,
+ const int connection_fd = accept(socket_,
+ (struct sockaddr*) &client_address,
&client_address_size);
if (thread_stop_requested_) {
LOG4CXX_DEBUG(logger_, "thread_stop_requested_");
@@ -157,6 +162,7 @@ void TcpClientListener::threadMain() {
if (AF_INET != client_address.sin_family) {
LOG4CXX_DEBUG(logger_, "Address of connected client is invalid");
+ close(connection_fd);
continue;
}
@@ -169,15 +175,16 @@ void TcpClientListener::threadMain() {
SetKeepaliveOptions(connection_fd);
}
- TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, device_name);
+ TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr,
+ device_name);
DeviceSptr device = controller_->AddDevice(tcp_device);
tcp_device = static_cast<TcpDevice*>(device.get());
const ApplicationHandle app_handle = tcp_device->AddIncomingApplication(
- connection_fd);
+ connection_fd);
TcpSocketConnection* connection(
- new TcpSocketConnection(device->unique_device_id(), app_handle,
- controller_));
+ new TcpSocketConnection(device->unique_device_id(), app_handle,
+ controller_));
connection->set_socket(connection_fd);
const TransportAdapter::Error error = connection->Start();
if (error != TransportAdapter::OK) {
@@ -187,11 +194,26 @@ void TcpClientListener::threadMain() {
LOG4CXX_TRACE(logger_, "exit");
}
+void TcpClientListener::StopLoop() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_stop_requested_ = true;
+ // We need to connect to the listening socket to unblock accept() call
+ int byesocket = socket(AF_INET, SOCK_STREAM, 0);
+ sockaddr_in server_address = { 0 };
+ server_address.sin_family = AF_INET;
+ server_address.sin_port = htons(port_);
+ server_address.sin_addr.s_addr = INADDR_ANY;
+ connect(byesocket, (sockaddr*) &server_address, sizeof(server_address));
+ shutdown(byesocket, SHUT_RDWR);
+ close(byesocket);
+}
+
TransportAdapter::Error TcpClientListener::StartListening() {
LOG4CXX_TRACE(logger_, "enter");
- if (thread_->is_running()) {
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::BAD_STATE. Condition: thread_started_");
+ if (!thread_ || thread_->is_running()) {
+ LOG4CXX_TRACE(
+ logger_,
+ "exit with TransportAdapter::BAD_STATE. Condition: thread_started_");
return TransportAdapter::BAD_STATE;
}
@@ -199,7 +221,8 @@ TransportAdapter::Error TcpClientListener::StartListening() {
if (-1 == socket_) {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL. Condition: -1 == socket_");
+ LOG4CXX_TRACE(logger_,
+ "exit with TransportAdapter::FAIL. Condition: -1 == socket_");
return TransportAdapter::FAIL;
}
@@ -213,15 +236,17 @@ TransportAdapter::Error TcpClientListener::StartListening() {
if (0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))) {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed");
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))");
+ LOG4CXX_TRACE(
+ logger_,
+ "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))");
return TransportAdapter::FAIL;
}
if (0 != listen(socket_, 128)) {
LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed");
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)");
+ LOG4CXX_TRACE(
+ logger_,
+ "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)");
return TransportAdapter::FAIL;
}
@@ -235,33 +260,29 @@ TransportAdapter::Error TcpClientListener::StartListening() {
return TransportAdapter::OK;
}
-bool TcpClientListener::exitThreadMain() {
- StopListening();
- return true;
+void TcpClientListener::ListeningThreadDelegate::exitThreadMain() {
+ parent_->StopLoop();
+}
+
+void TcpClientListener::ListeningThreadDelegate::threadMain() {
+ parent_->Loop();
+}
+
+TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate(
+ TcpClientListener* parent)
+ : parent_(parent) {
}
TransportAdapter::Error TcpClientListener::StopListening() {
- LOG4CXX_TRACE(logger_, "enter");
- if (!thread_->is_running()) {
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::BAD_STATE. Condition !thread_started_");
+ LOG4CXX_AUTO_TRACE(logger_);
+ if (!thread_ || !thread_->is_running()) {
+ LOG4CXX_TRACE(logger_, "TcpClientListener is not running now");
return TransportAdapter::BAD_STATE;
}
- thread_stop_requested_ = true;
- // We need to connect to the listening socket to unblock accept() call
- int byesocket = socket(AF_INET, SOCK_STREAM, 0);
- sockaddr_in server_address = { 0 };
- server_address.sin_family = AF_INET;
- server_address.sin_port = htons(port_);
- server_address.sin_addr.s_addr = INADDR_ANY;
- connect(byesocket, (sockaddr*)&server_address, sizeof(server_address));
- shutdown(byesocket, SHUT_RDWR);
- close(byesocket);
- LOG4CXX_DEBUG(logger_, "Tcp client listener thread terminated");
+ thread_->stop();
close(socket_);
socket_ = -1;
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
return TransportAdapter::OK;
}
diff --git a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc
index 69173a0e06..a8c2bda600 100644
--- a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc
+++ b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc
@@ -1,4 +1,4 @@
-/**
+/*
*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
diff --git a/src/components/transport_manager/src/tcp/tcp_device.cc b/src/components/transport_manager/src/tcp/tcp_device.cc
index 2540c26ed0..130187c384 100644
--- a/src/components/transport_manager/src/tcp/tcp_device.cc
+++ b/src/components/transport_manager/src/tcp/tcp_device.cc
@@ -1,4 +1,4 @@
-/**
+/*
*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
@@ -43,9 +43,9 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager")
TcpDevice::TcpDevice(const in_addr_t& in_addr, const std::string& name)
:
Device(name, name),
+ applications_mutex_(),
in_addr_(in_addr),
last_handle_(0) {
- pthread_mutex_init(&applications_mutex_, 0);
}
bool TcpDevice::IsSameAs(const Device* other) const {
@@ -63,57 +63,53 @@ bool TcpDevice::IsSameAs(const Device* other) const {
}
ApplicationList TcpDevice::GetApplicationList() const {
- LOG4CXX_TRACE(logger_, "enter");
- pthread_mutex_lock(&applications_mutex_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock locker(applications_mutex_);
ApplicationList app_list;
for (std::map<ApplicationHandle, Application>::const_iterator it =
applications_.begin(); it != applications_.end(); ++it) {
app_list.push_back(it->first);
}
- pthread_mutex_unlock(&applications_mutex_);
- LOG4CXX_TRACE(logger_, "exit with app_list. It's size = " << app_list.size());
return app_list;
}
ApplicationHandle TcpDevice::AddIncomingApplication(int socket_fd) {
- LOG4CXX_TRACE(logger_, "enter. Socket_fd: " << socket_fd);
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Socket_fd: " << socket_fd);
Application app;
app.incoming = true;
app.socket = socket_fd;
app.port = 0; // this line removes compiler warning
- pthread_mutex_lock(&applications_mutex_);
+ sync_primitives::AutoLock locker(applications_mutex_);
const ApplicationHandle app_handle = ++last_handle_;
applications_[app_handle] = app;
- pthread_mutex_unlock(&applications_mutex_);
- LOG4CXX_TRACE(logger_, "exit with app_handle " << app_handle);
+ LOG4CXX_DEBUG(logger_, "App_handle " << app_handle);
return app_handle;
}
ApplicationHandle TcpDevice::AddDiscoveredApplication(int port) {
- LOG4CXX_TRACE(logger_, "enter. port " << port);
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "Port " << port);
Application app;
app.incoming = false;
app.socket = 0; // this line removes compiler warning
app.port = port;
- pthread_mutex_lock(&applications_mutex_);
+ sync_primitives::AutoLock locker(applications_mutex_);
const ApplicationHandle app_handle = ++last_handle_;
applications_[app_handle] = app;
- pthread_mutex_unlock(&applications_mutex_);
- LOG4CXX_TRACE(logger_, "exit with app_handle " << app_handle);
+ LOG4CXX_DEBUG(logger_, "App_handle " << app_handle);
return app_handle;
}
void TcpDevice::RemoveApplication(const ApplicationHandle app_handle) {
- LOG4CXX_TRACE(logger_, "enter. ApplicationHandle: " << app_handle);
- pthread_mutex_lock(&applications_mutex_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "ApplicationHandle: " << app_handle);
+ sync_primitives::AutoLock locker(applications_mutex_);
applications_.erase(app_handle);
- pthread_mutex_unlock(&applications_mutex_);
- LOG4CXX_TRACE(logger_, "exit");
}
TcpDevice::~TcpDevice() {
- pthread_mutex_destroy(&applications_mutex_);
}
int TcpDevice::GetApplicationSocket(const ApplicationHandle app_handle) const {
diff --git a/src/components/transport_manager/src/tcp/tcp_socket_connection.cc b/src/components/transport_manager/src/tcp/tcp_socket_connection.cc
index 3b208d8a07..a1160a84a8 100644
--- a/src/components/transport_manager/src/tcp/tcp_socket_connection.cc
+++ b/src/components/transport_manager/src/tcp/tcp_socket_connection.cc
@@ -1,4 +1,4 @@
-/**
+/*
*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
@@ -35,10 +35,12 @@
#include "transport_manager/tcp/tcp_socket_connection.h"
#include "transport_manager/tcp/tcp_device.h"
#include "utils/logger.h"
+#include "utils/threads/thread.h"
#include <memory.h>
#include <signal.h>
#include <errno.h>
+#include <unistd.h>
namespace transport_manager {
namespace transport_adapter {
@@ -107,6 +109,9 @@ bool TcpServerOiginatedSocketConnection::Establish(ConnectError** error) {
<< application_handle() << ", error " << errno);
*error = new ConnectError();
LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: failed to connect to application");
+
+ ::close(socket);
+
return false;
}
diff --git a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc
index 3747225a89..6c4ee0cd89 100644
--- a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc
+++ b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
*
@@ -36,6 +35,7 @@
#include <errno.h>
#include <sstream>
#include <cstdlib>
+#include <stdio.h>
#include "resumption/last_state.h"
@@ -101,7 +101,7 @@ void TcpTransportAdapter::Store() const {
if (port != -1) { // don't want to store incoming applications
Json::Value application_dictionary;
char port_record[12];
- sprintf(port_record, "%d", port);
+ snprintf(port_record, sizeof(port_record), "%d", port);
application_dictionary["port"] = std::string(port_record);
applications_dictionary.append(application_dictionary);
}