diff options
Diffstat (limited to 'src/components/transport_manager/src/tcp')
6 files changed, 114 insertions, 96 deletions
diff --git a/src/components/transport_manager/src/tcp/dnssd_service_browser.cc b/src/components/transport_manager/src/tcp/dnssd_service_browser.cc index 5580585766..e8ba2ea874 100644 --- a/src/components/transport_manager/src/tcp/dnssd_service_browser.cc +++ b/src/components/transport_manager/src/tcp/dnssd_service_browser.cc @@ -1,4 +1,4 @@ -/** +/* * * Copyright (c) 2013, Ford Motor Company * All rights reserved. @@ -56,12 +56,15 @@ void DnssdServiceBrowser::Terminate() { } if (0 != avahi_service_browser_) { avahi_service_browser_free(avahi_service_browser_); + avahi_service_browser_ = NULL; } if (0 != avahi_client_) { avahi_client_free(avahi_client_); + avahi_client_ = NULL; } if (0 != avahi_threaded_poll_) { avahi_threaded_poll_free(avahi_threaded_poll_); + avahi_threaded_poll_ = NULL; } LOG4CXX_TRACE(logger_, "exit"); } @@ -78,11 +81,9 @@ DnssdServiceBrowser::DnssdServiceBrowser(TransportAdapterController* controller) service_records_(), mutex_(), initialised_(false) { - pthread_mutex_init(&mutex_, 0); } DnssdServiceBrowser::~DnssdServiceBrowser() { - pthread_mutex_destroy(&mutex_); } void DnssdServiceBrowser::OnClientConnected() { @@ -172,8 +173,8 @@ void AvahiServiceBrowserCallback(AvahiServiceBrowser* avahi_service_browser, void DnssdServiceBrowser::ServiceResolved( const DnssdServiceRecord& service_record) { - LOG4CXX_TRACE(logger_, "enter"); - pthread_mutex_lock(&mutex_); + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock locker(mutex_); ServiceRecords::iterator service_record_it = std::find( service_records_.begin(), service_records_.end(), service_record); if (service_record_it != service_records_.end()) { @@ -181,23 +182,19 @@ void DnssdServiceBrowser::ServiceResolved( } DeviceVector device_vector = PrepareDeviceVector(); controller_->SearchDeviceDone(device_vector); - pthread_mutex_unlock(&mutex_); - LOG4CXX_TRACE(logger_, "exit"); } void DnssdServiceBrowser::ServiceResolveFailed( const DnssdServiceRecord& service_record) { - LOG4CXX_TRACE(logger_, "enter"); - LOG4CXX_ERROR(logger_, + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "AvahiServiceResolver failure for: " << service_record.name); - pthread_mutex_lock(&mutex_); + sync_primitives::AutoLock locker(mutex_); ServiceRecords::iterator service_record_it = std::find( service_records_.begin(), service_records_.end(), service_record); if (service_record_it != service_records_.end()) { service_records_.erase(service_record_it); } - pthread_mutex_unlock(&mutex_); - LOG4CXX_TRACE(logger_, "exit"); } void AvahiServiceResolverCallback(AvahiServiceResolver* avahi_service_resolver, @@ -246,9 +243,11 @@ TransportAdapter::Error DnssdServiceBrowser::CreateAvahiClientAndBrowser() { LOG4CXX_TRACE(logger_, "enter"); if (0 != avahi_service_browser_) { avahi_service_browser_free(avahi_service_browser_); + avahi_service_browser_ = NULL; } if (0 != avahi_client_) { avahi_client_free(avahi_client_); + avahi_client_ = NULL; } int avahi_error; @@ -261,9 +260,9 @@ TransportAdapter::Error DnssdServiceBrowser::CreateAvahiClientAndBrowser() { return TransportAdapter::FAIL; } - pthread_mutex_lock(&mutex_); + mutex_.Acquire(); service_records_.clear(); - pthread_mutex_unlock(&mutex_); + mutex_.Release(); avahi_service_browser_ = avahi_service_browser_new( avahi_client_, AVAHI_IF_UNSPEC, /* TODO use only required iface */ @@ -305,7 +304,8 @@ TransportAdapter::Error DnssdServiceBrowser::Scan() { void DnssdServiceBrowser::AddService(AvahiIfIndex interface, AvahiProtocol protocol, const char* name, const char* type, const char* domain) { - LOG4CXX_TRACE(logger_, "enter: interface " << interface << " protocol " << protocol << + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "interface " << interface << " protocol " << protocol << " name " << name << " type " << type << " domain " << domain); DnssdServiceRecord record; record.interface = interface; @@ -314,7 +314,7 @@ void DnssdServiceBrowser::AddService(AvahiIfIndex interface, record.name = name; record.type = type; - pthread_mutex_lock(&mutex_); + sync_primitives::AutoLock locker(mutex_); if (service_records_.end() == std::find(service_records_.begin(), service_records_.end(), record)) { service_records_.push_back(record); @@ -323,15 +323,14 @@ void DnssdServiceBrowser::AddService(AvahiIfIndex interface, AVAHI_PROTO_INET, static_cast<AvahiLookupFlags>(0), AvahiServiceResolverCallback, this); } - pthread_mutex_unlock(&mutex_); - LOG4CXX_TRACE(logger_, "exit"); } void DnssdServiceBrowser::RemoveService(AvahiIfIndex interface, AvahiProtocol protocol, const char* name, const char* type, const char* domain) { - LOG4CXX_TRACE(logger_, "enter: interface " << interface << " protocol " << protocol << + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "interface " << interface << " protocol " << protocol << " name " << name << " type " << type << " domain " << domain); DnssdServiceRecord record; record.interface = interface; @@ -340,16 +339,14 @@ void DnssdServiceBrowser::RemoveService(AvahiIfIndex interface, record.type = type; record.domain_name = domain; - pthread_mutex_lock(&mutex_); + sync_primitives::AutoLock locker(mutex_); service_records_.erase( std::remove(service_records_.begin(), service_records_.end(), record), service_records_.end()); - pthread_mutex_unlock(&mutex_); - LOG4CXX_TRACE(logger_, "exit"); } DeviceVector DnssdServiceBrowser::PrepareDeviceVector() const { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); std::map<uint32_t, TcpDevice*> devices; for (ServiceRecords::const_iterator it = service_records_.begin(); it != service_records_.end(); ++it) { @@ -372,7 +369,6 @@ DeviceVector DnssdServiceBrowser::PrepareDeviceVector() const { it != devices.end(); ++it) { device_vector.push_back(DeviceSptr(it->second)); } - LOG4CXX_TRACE(logger_, "exit"); return device_vector; } diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index c0f39cc490..954c734d68 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -53,7 +53,7 @@ #include <sstream> #include "utils/logger.h" - +#include "utils/threads/thread.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "transport_manager/tcp/tcp_device.h" #include "transport_manager/tcp/tcp_socket_connection.h" @@ -66,31 +66,35 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") TcpClientListener::TcpClientListener(TransportAdapterController* controller, const uint16_t port, const bool enable_keepalive) - : port_(port), - enable_keepalive_(enable_keepalive), - controller_(controller), - thread_(threads::CreateThread("TcpClientListener", this)), - socket_(-1), - thread_stop_requested_(false) { } + : port_(port), + enable_keepalive_(enable_keepalive), + controller_(controller), + thread_(0), + socket_(-1), + thread_stop_requested_(false) { + thread_ = threads::CreateThread("TcpClientListener", + new ListeningThreadDelegate(this)); +} TransportAdapter::Error TcpClientListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = false; return TransportAdapter::OK; } void TcpClientListener::Terminate() { - LOG4CXX_TRACE(logger_, "enter"); - if (TransportAdapter::OK != StopListening()) { - LOG4CXX_ERROR(logger_, "Cannot stop listening TCP"); - } - LOG4CXX_TRACE(logger_, "exit"); + thread_->stop(); } bool TcpClientListener::IsInitialised() const { - return true; + return thread_; } TcpClientListener::~TcpClientListener() { - LOG4CXX_INFO(logger_, "destructor"); + LOG4CXX_AUTO_TRACE(logger_); + thread_->join(); + delete thread_->delegate(); + threads::DeleteThread(thread_); } void SetKeepaliveOptions(const int fd) { @@ -130,7 +134,7 @@ void SetKeepaliveOptions(const int fd) { mib[3] = TCPCTL_KEEPINTVL; sysctl(mib, kMidLength, NULL, NULL, &keepintvl, sizeof(keepintvl)); - struct timeval tval = { 0 }; + struct timeval tval = {0}; tval.tv_sec = keepidle; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &tval, sizeof(tval)); @@ -138,12 +142,13 @@ void SetKeepaliveOptions(const int fd) { LOG4CXX_TRACE(logger_, "exit"); } -void TcpClientListener::threadMain() { +void TcpClientListener::Loop() { LOG4CXX_TRACE(logger_, "enter"); while (!thread_stop_requested_) { sockaddr_in client_address; socklen_t client_address_size = sizeof(client_address); - const int connection_fd = accept(socket_, (struct sockaddr*)&client_address, + const int connection_fd = accept(socket_, + (struct sockaddr*) &client_address, &client_address_size); if (thread_stop_requested_) { LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); @@ -157,6 +162,7 @@ void TcpClientListener::threadMain() { if (AF_INET != client_address.sin_family) { LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); + close(connection_fd); continue; } @@ -169,15 +175,16 @@ void TcpClientListener::threadMain() { SetKeepaliveOptions(connection_fd); } - TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, device_name); + TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, + device_name); DeviceSptr device = controller_->AddDevice(tcp_device); tcp_device = static_cast<TcpDevice*>(device.get()); const ApplicationHandle app_handle = tcp_device->AddIncomingApplication( - connection_fd); + connection_fd); TcpSocketConnection* connection( - new TcpSocketConnection(device->unique_device_id(), app_handle, - controller_)); + new TcpSocketConnection(device->unique_device_id(), app_handle, + controller_)); connection->set_socket(connection_fd); const TransportAdapter::Error error = connection->Start(); if (error != TransportAdapter::OK) { @@ -187,11 +194,26 @@ void TcpClientListener::threadMain() { LOG4CXX_TRACE(logger_, "exit"); } +void TcpClientListener::StopLoop() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = true; + // We need to connect to the listening socket to unblock accept() call + int byesocket = socket(AF_INET, SOCK_STREAM, 0); + sockaddr_in server_address = { 0 }; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port_); + server_address.sin_addr.s_addr = INADDR_ANY; + connect(byesocket, (sockaddr*) &server_address, sizeof(server_address)); + shutdown(byesocket, SHUT_RDWR); + close(byesocket); +} + TransportAdapter::Error TcpClientListener::StartListening() { LOG4CXX_TRACE(logger_, "enter"); - if (thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition: thread_started_"); + if (!thread_ || thread_->is_running()) { + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::BAD_STATE. Condition: thread_started_"); return TransportAdapter::BAD_STATE; } @@ -199,7 +221,8 @@ TransportAdapter::Error TcpClientListener::StartListening() { if (-1 == socket_) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL. Condition: -1 == socket_"); + LOG4CXX_TRACE(logger_, + "exit with TransportAdapter::FAIL. Condition: -1 == socket_"); return TransportAdapter::FAIL; } @@ -213,15 +236,17 @@ TransportAdapter::Error TcpClientListener::StartListening() { if (0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))"); + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))"); return TransportAdapter::FAIL; } if (0 != listen(socket_, 128)) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)"); + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)"); return TransportAdapter::FAIL; } @@ -235,33 +260,29 @@ TransportAdapter::Error TcpClientListener::StartListening() { return TransportAdapter::OK; } -bool TcpClientListener::exitThreadMain() { - StopListening(); - return true; +void TcpClientListener::ListeningThreadDelegate::exitThreadMain() { + parent_->StopLoop(); +} + +void TcpClientListener::ListeningThreadDelegate::threadMain() { + parent_->Loop(); +} + +TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate( + TcpClientListener* parent) + : parent_(parent) { } TransportAdapter::Error TcpClientListener::StopListening() { - LOG4CXX_TRACE(logger_, "enter"); - if (!thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition !thread_started_"); + LOG4CXX_AUTO_TRACE(logger_); + if (!thread_ || !thread_->is_running()) { + LOG4CXX_TRACE(logger_, "TcpClientListener is not running now"); return TransportAdapter::BAD_STATE; } - thread_stop_requested_ = true; - // We need to connect to the listening socket to unblock accept() call - int byesocket = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in server_address = { 0 }; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - connect(byesocket, (sockaddr*)&server_address, sizeof(server_address)); - shutdown(byesocket, SHUT_RDWR); - close(byesocket); - LOG4CXX_DEBUG(logger_, "Tcp client listener thread terminated"); + thread_->stop(); close(socket_); socket_ = -1; - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); return TransportAdapter::OK; } diff --git a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc index 69173a0e06..a8c2bda600 100644 --- a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc +++ b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc @@ -1,4 +1,4 @@ -/** +/* * * Copyright (c) 2013, Ford Motor Company * All rights reserved. diff --git a/src/components/transport_manager/src/tcp/tcp_device.cc b/src/components/transport_manager/src/tcp/tcp_device.cc index 2540c26ed0..130187c384 100644 --- a/src/components/transport_manager/src/tcp/tcp_device.cc +++ b/src/components/transport_manager/src/tcp/tcp_device.cc @@ -1,4 +1,4 @@ -/** +/* * * Copyright (c) 2013, Ford Motor Company * All rights reserved. @@ -43,9 +43,9 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") TcpDevice::TcpDevice(const in_addr_t& in_addr, const std::string& name) : Device(name, name), + applications_mutex_(), in_addr_(in_addr), last_handle_(0) { - pthread_mutex_init(&applications_mutex_, 0); } bool TcpDevice::IsSameAs(const Device* other) const { @@ -63,57 +63,53 @@ bool TcpDevice::IsSameAs(const Device* other) const { } ApplicationList TcpDevice::GetApplicationList() const { - LOG4CXX_TRACE(logger_, "enter"); - pthread_mutex_lock(&applications_mutex_); + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock locker(applications_mutex_); ApplicationList app_list; for (std::map<ApplicationHandle, Application>::const_iterator it = applications_.begin(); it != applications_.end(); ++it) { app_list.push_back(it->first); } - pthread_mutex_unlock(&applications_mutex_); - LOG4CXX_TRACE(logger_, "exit with app_list. It's size = " << app_list.size()); return app_list; } ApplicationHandle TcpDevice::AddIncomingApplication(int socket_fd) { - LOG4CXX_TRACE(logger_, "enter. Socket_fd: " << socket_fd); + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Socket_fd: " << socket_fd); Application app; app.incoming = true; app.socket = socket_fd; app.port = 0; // this line removes compiler warning - pthread_mutex_lock(&applications_mutex_); + sync_primitives::AutoLock locker(applications_mutex_); const ApplicationHandle app_handle = ++last_handle_; applications_[app_handle] = app; - pthread_mutex_unlock(&applications_mutex_); - LOG4CXX_TRACE(logger_, "exit with app_handle " << app_handle); + LOG4CXX_DEBUG(logger_, "App_handle " << app_handle); return app_handle; } ApplicationHandle TcpDevice::AddDiscoveredApplication(int port) { - LOG4CXX_TRACE(logger_, "enter. port " << port); + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Port " << port); Application app; app.incoming = false; app.socket = 0; // this line removes compiler warning app.port = port; - pthread_mutex_lock(&applications_mutex_); + sync_primitives::AutoLock locker(applications_mutex_); const ApplicationHandle app_handle = ++last_handle_; applications_[app_handle] = app; - pthread_mutex_unlock(&applications_mutex_); - LOG4CXX_TRACE(logger_, "exit with app_handle " << app_handle); + LOG4CXX_DEBUG(logger_, "App_handle " << app_handle); return app_handle; } void TcpDevice::RemoveApplication(const ApplicationHandle app_handle) { - LOG4CXX_TRACE(logger_, "enter. ApplicationHandle: " << app_handle); - pthread_mutex_lock(&applications_mutex_); + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "ApplicationHandle: " << app_handle); + sync_primitives::AutoLock locker(applications_mutex_); applications_.erase(app_handle); - pthread_mutex_unlock(&applications_mutex_); - LOG4CXX_TRACE(logger_, "exit"); } TcpDevice::~TcpDevice() { - pthread_mutex_destroy(&applications_mutex_); } int TcpDevice::GetApplicationSocket(const ApplicationHandle app_handle) const { diff --git a/src/components/transport_manager/src/tcp/tcp_socket_connection.cc b/src/components/transport_manager/src/tcp/tcp_socket_connection.cc index 3b208d8a07..a1160a84a8 100644 --- a/src/components/transport_manager/src/tcp/tcp_socket_connection.cc +++ b/src/components/transport_manager/src/tcp/tcp_socket_connection.cc @@ -1,4 +1,4 @@ -/** +/* * * Copyright (c) 2013, Ford Motor Company * All rights reserved. @@ -35,10 +35,12 @@ #include "transport_manager/tcp/tcp_socket_connection.h" #include "transport_manager/tcp/tcp_device.h" #include "utils/logger.h" +#include "utils/threads/thread.h" #include <memory.h> #include <signal.h> #include <errno.h> +#include <unistd.h> namespace transport_manager { namespace transport_adapter { @@ -107,6 +109,9 @@ bool TcpServerOiginatedSocketConnection::Establish(ConnectError** error) { << application_handle() << ", error " << errno); *error = new ConnectError(); LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: failed to connect to application"); + + ::close(socket); + return false; } diff --git a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc index 3747225a89..6c4ee0cd89 100644 --- a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc +++ b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc @@ -1,5 +1,4 @@ -/** - * +/* * Copyright (c) 2013, Ford Motor Company * All rights reserved. * @@ -36,6 +35,7 @@ #include <errno.h> #include <sstream> #include <cstdlib> +#include <stdio.h> #include "resumption/last_state.h" @@ -101,7 +101,7 @@ void TcpTransportAdapter::Store() const { if (port != -1) { // don't want to store incoming applications Json::Value application_dictionary; char port_record[12]; - sprintf(port_record, "%d", port); + snprintf(port_record, sizeof(port_record), "%d", port); application_dictionary["port"] = std::string(port_record); applications_dictionary.append(application_dictionary); } |