diff options
Diffstat (limited to 'src/components/transport_manager/src/tcp/tcp_client_listener.cc')
-rw-r--r-- | src/components/transport_manager/src/tcp/tcp_client_listener.cc | 555 |
1 files changed, 448 insertions, 107 deletions
diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index 207149eb8c..abcca43509 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -3,6 +3,9 @@ * Copyright (c) 2017, Ford Motor Company * All rights reserved. * + * Copyright (c) 2018 Xevo Inc. + * All rights reserved. + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -14,7 +17,7 @@ * disclaimer in the documentation and/or other materials provided with the * distribution. * - * Neither the name of the Ford Motor Company nor the names of its contributors + * Neither the name of the copyright holders nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * @@ -36,11 +39,14 @@ #include <memory.h> #include <signal.h> #include <errno.h> +#include <fcntl.h> #include <arpa/inet.h> #include <unistd.h> #include <sys/types.h> +#include <sys/select.h> #include <sys/sysctl.h> #include <sys/socket.h> +#include <ifaddrs.h> #ifdef __linux__ #include <linux/tcp.h> #else // __linux__ @@ -53,9 +59,10 @@ #include <sstream> #include "utils/logger.h" -#include "utils/make_shared.h" + #include "utils/threads/thread.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" +#include "transport_manager/tcp/network_interface_listener_impl.h" #include "transport_manager/tcp/tcp_device.h" #include "transport_manager/tcp/tcp_socket_connection.h" @@ -64,72 +71,88 @@ namespace transport_adapter { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") +static bool SetNonblocking(int s); + +#ifdef BUILD_TESTS +bool TcpClientListener::testing_ = false; +#endif // BUILD_TESTS + TcpClientListener::TcpClientListener(TransportAdapterController* controller, const uint16_t port, - const bool enable_keepalive) + const bool enable_keepalive, + const std::string designated_interface) : port_(port) , enable_keepalive_(enable_keepalive) , controller_(controller) + , initialized_(false) + , started_(false) , thread_(0) , socket_(-1) - , thread_stop_requested_(false) { + , thread_stop_requested_(false) + , designated_interface_(designated_interface) { + pipe_fds_[0] = pipe_fds_[1] = -1; thread_ = threads::CreateThread("TcpClientListener", new ListeningThreadDelegate(this)); + interface_listener_ = + new NetworkInterfaceListenerImpl(this, designated_interface); } TransportAdapter::Error TcpClientListener::Init() { LOG4CXX_AUTO_TRACE(logger_); thread_stop_requested_ = false; - socket_ = socket(AF_INET, SOCK_STREAM, 0); - if (-1 == socket_) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); - return TransportAdapter::FAIL; - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - - int optval = 1; - if (0 != - setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "setsockopt SO_REUSEADDR failed"); + if (!IsListeningOnSpecificInterface()) { + // Network interface is not specified. We will listen on all interfaces + // using INADDR_ANY. If socket creation fails, we will treat it an error. + socket_ = CreateIPv4ServerSocket(port_); + if (-1 == socket_) { + LOG4CXX_ERROR(logger_, "Failed to create TCP socket"); + return TransportAdapter::FAIL; + } + } else { + // Network interface is specified and we wiill listen only on the interface. + // In this case, the server socket will be created once + // NetworkInterfaceListener notifies the interface's IP address. + LOG4CXX_INFO(logger_, + "TCP server socket will listen on " + << designated_interface_ + << " once it has an IPv4 address."); } - if (bind(socket_, - reinterpret_cast<sockaddr*>(&server_address), - sizeof(server_address)) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); + if (!interface_listener_->Init()) { + if (socket_ >= 0) { + close(socket_); + socket_ = -1; + } return TransportAdapter::FAIL; } - const int kBacklog = 128; - if (0 != listen(socket_, kBacklog)) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); - return TransportAdapter::FAIL; - } + initialized_ = true; return TransportAdapter::OK; } void TcpClientListener::Terminate() { LOG4CXX_AUTO_TRACE(logger_); - if (socket_ == -1) { - LOG4CXX_WARN(logger_, "Socket has been closed"); + + if (!initialized_) { return; } - if (shutdown(socket_, SHUT_RDWR) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket"); - } - if (close(socket_) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + + if (!IsListeningOnSpecificInterface()) { + DestroyServerSocket(socket_); + socket_ = -1; + } else { + sync_primitives::AutoLock auto_lock(start_stop_lock_); + DestroyServerSocket(socket_); + socket_ = -1; } - socket_ = -1; + + interface_listener_->Deinit(); + initialized_ = false; } bool TcpClientListener::IsInitialised() const { - return thread_; + return initialized_; } TcpClientListener::~TcpClientListener() { @@ -138,6 +161,7 @@ TcpClientListener::~TcpClientListener() { delete thread_->delegate(); threads::DeleteThread(thread_); Terminate(); + delete interface_listener_; } void SetKeepaliveOptions(const int fd) { @@ -203,104 +227,151 @@ void SetKeepaliveOptions(const int fd) { void TcpClientListener::Loop() { LOG4CXX_AUTO_TRACE(logger_); - while (!thread_stop_requested_) { - sockaddr_in client_address; - socklen_t client_address_size = sizeof(client_address); - const int connection_fd = accept( - socket_, (struct sockaddr*)&client_address, &client_address_size); - if (thread_stop_requested_) { - LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); - close(connection_fd); - break; - } - - if (connection_fd < 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "accept() failed"); - continue; - } + fd_set rfds; + char dummy[16]; - if (AF_INET != client_address.sin_family) { - LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); - close(connection_fd); - continue; + while (!thread_stop_requested_) { + FD_ZERO(&rfds); + FD_SET(socket_, &rfds); + FD_SET(pipe_fds_[0], &rfds); + int nfds = socket_ > pipe_fds_[0] ? socket_ : pipe_fds_[0]; + + int ret = select(nfds + 1, &rfds, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + LOG4CXX_WARN(logger_, "select failed for TCP server socket"); + break; + } } - char device_name[32]; - strncpy(device_name, - inet_ntoa(client_address.sin_addr), - sizeof(device_name) / sizeof(device_name[0])); - LOG4CXX_INFO(logger_, "Connected client " << device_name); - LOG4CXX_INFO(logger_, "Port is: " << port_); - - if (enable_keepalive_) { - SetKeepaliveOptions(connection_fd); + if (FD_ISSET(pipe_fds_[0], &rfds)) { + ret = read(pipe_fds_[0], dummy, sizeof(dummy)); + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + LOG4CXX_WARN( + logger_, + "Failed to read from pipe, aborting TCP server socket loop."); + break; + } + } else if (ret == 0) { + LOG4CXX_WARN(logger_, + "Pipe disconnected, aborting TCP server socket loop."); + break; + } else { + LOG4CXX_DEBUG(logger_, + "received stop command of TCP server socket loop"); + break; + } } - const auto device_uid = - device_name + std::string(":") + std::to_string(port_); + if (FD_ISSET(socket_, &rfds)) { + sockaddr_in client_address; + socklen_t client_address_size = sizeof(client_address); + const int connection_fd = accept( + socket_, (struct sockaddr*)&client_address, &client_address_size); + if (thread_stop_requested_) { + LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); + close(connection_fd); + break; + } + + if (connection_fd < 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "accept() failed"); + continue; + } + + if (AF_INET != client_address.sin_family) { + LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); + close(connection_fd); + continue; + } + + char device_name[32]; + strncpy(device_name, + inet_ntoa(client_address.sin_addr), + sizeof(device_name) / sizeof(device_name[0])); + LOG4CXX_INFO(logger_, "Connected client " << device_name); + LOG4CXX_INFO(logger_, "Port is: " << port_); + + if (enable_keepalive_) { + SetKeepaliveOptions(connection_fd); + } + + const auto device_uid = + device_name + std::string(":") + std::to_string(port_); #if defined(BUILD_TESTS) - TcpDevice* tcp_device = - new TcpDevice(client_address.sin_addr.s_addr, device_uid, device_name); + auto tcp_device = std::make_shared<TcpDevice>( + client_address.sin_addr.s_addr, device_uid, device_name); #else - TcpDevice* tcp_device = - new TcpDevice(client_address.sin_addr.s_addr, device_uid); + auto tcp_device = std::make_shared<TcpDevice>( + client_address.sin_addr.s_addr, device_uid); #endif // BUILD_TESTS - DeviceSptr device = controller_->AddDevice(tcp_device); - tcp_device = static_cast<TcpDevice*>(device.get()); - const ApplicationHandle app_handle = - tcp_device->AddIncomingApplication(connection_fd); - - utils::SharedPtr<TcpSocketConnection> connection = - utils::MakeShared<TcpSocketConnection>( - device->unique_device_id(), app_handle, controller_); - controller_->ConnectionCreated( - connection, device->unique_device_id(), app_handle); - connection->set_socket(connection_fd); - const TransportAdapter::Error error = connection->Start(); - if (TransportAdapter::OK != error) { - LOG4CXX_ERROR(logger_, - "TCP connection::Start() failed with error: " << error); + DeviceSptr device = controller_->AddDevice(tcp_device); + auto tcp_device_raw = static_cast<TcpDevice*>(device.get()); + const ApplicationHandle app_handle = + tcp_device_raw->AddIncomingApplication(connection_fd); + + std::shared_ptr<TcpSocketConnection> connection = + std::make_shared<TcpSocketConnection>( + device->unique_device_id(), app_handle, controller_); + controller_->ConnectionCreated( + connection, device->unique_device_id(), app_handle); + connection->set_socket(connection_fd); + const TransportAdapter::Error error = connection->Start(); + if (TransportAdapter::OK != error) { + LOG4CXX_ERROR(logger_, + "TCP connection::Start() failed with error: " << error); + } } } + + LOG4CXX_INFO(logger_, "TCP server socket loop is terminated."); } void TcpClientListener::StopLoop() { LOG4CXX_AUTO_TRACE(logger_); + if (pipe_fds_[1] < 0) { + LOG4CXX_WARN(logger_, "StopLoop called in invalid state"); + return; + } + thread_stop_requested_ = true; - // We need to connect to the listening socket to unblock accept() call - int byesocket = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - if (0 != connect(byesocket, - reinterpret_cast<sockaddr*>(&server_address), - sizeof(server_address))) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "Failed to connect byesocket"); - } else { - // Can only shutdown socket if connected - if (0 != shutdown(byesocket, SHUT_RDWR)) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "Failed to shutdown byesocket"); - } + + char dummy[1] = {0}; + int ret = write(pipe_fds_[1], dummy, sizeof(dummy)); + if (ret <= 0) { + LOG4CXX_WARN_WITH_ERRNO( + logger_, "Failed to send stop message to TCP server socket loop"); } - close(byesocket); } TransportAdapter::Error TcpClientListener::StartListening() { LOG4CXX_AUTO_TRACE(logger_); - if (thread_->is_running()) { + if (started_) { LOG4CXX_WARN( logger_, "TransportAdapter::BAD_STATE. Listener has already been started"); return TransportAdapter::BAD_STATE; } - if (!thread_->start()) { - LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed"); + if (!interface_listener_->Start()) { return TransportAdapter::FAIL; } + + if (!IsListeningOnSpecificInterface()) { + TransportAdapter::Error ret = StartListeningThread(); + if (TransportAdapter::OK != ret) { + LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed"); + interface_listener_->Stop(); + return ret; + } + } + + started_ = true; LOG4CXX_INFO(logger_, "Tcp client listener has started successfully"); return TransportAdapter::OK; } @@ -319,16 +390,286 @@ TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate( TransportAdapter::Error TcpClientListener::StopListening() { LOG4CXX_AUTO_TRACE(logger_); - if (!thread_->is_running()) { + if (!started_) { LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now"); return TransportAdapter::BAD_STATE; } - thread_->join(); + interface_listener_->Stop(); + + StopListeningThread(); + started_ = false; LOG4CXX_INFO(logger_, "Tcp client listener has stopped successfully"); return TransportAdapter::OK; } +TransportAdapter::Error TcpClientListener::StartListeningThread() { + LOG4CXX_AUTO_TRACE(logger_); + + // StartListening() can be called from multiple threads + sync_primitives::AutoLock auto_lock(start_stop_lock_); + + if (pipe_fds_[0] < 0 || pipe_fds_[1] < 0) { + // recreate the pipe every time, so that the thread loop will not get + // leftover + // data inside pipe after it is started + if (pipe(pipe_fds_) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create internal pipe"); + return TransportAdapter::FAIL; + } + if (!SetNonblocking(pipe_fds_[0])) { + LOG4CXX_WARN(logger_, "Failed to configure pipe to non-blocking"); + } + } + + thread_stop_requested_ = false; + + if (!thread_->start()) { + return TransportAdapter::FAIL; + } + return TransportAdapter::OK; +} + +TransportAdapter::Error TcpClientListener::StopListeningThread() { + LOG4CXX_AUTO_TRACE(logger_); + + // StopListening() can be called from multiple threads + sync_primitives::AutoLock auto_lock(start_stop_lock_); + + thread_->join(); + + close(pipe_fds_[1]); + pipe_fds_[1] = -1; + close(pipe_fds_[0]); + pipe_fds_[0] = -1; + + return TransportAdapter::OK; +} + +void TcpClientListener::OnIPAddressUpdated(const std::string ipv4_addr, + const std::string ipv6_addr) { + LOG4CXX_AUTO_TRACE(logger_); + + // Since we only create a TCP socket with IPv4 option (AF_INET), currently we + // do not use IPv6 address. + if (ipv4_addr != current_ip_address_) { + if (IsListeningOnSpecificInterface()) { + if (!current_ip_address_.empty()) { + // the server socket is running, terminate it + LOG4CXX_DEBUG(logger_, + "Stopping current TCP server socket on " + << designated_interface_); + StopOnNetworkInterface(); + } + if (!ipv4_addr.empty()) { + // start (or restart) server socket with the new IP address + LOG4CXX_DEBUG( + logger_, "Starting TCP server socket on " << designated_interface_); + StartOnNetworkInterface(); + } + } + + current_ip_address_ = ipv4_addr; + + std::string enabled = !current_ip_address_.empty() ? "true" : "false"; + std::ostringstream oss; + oss << port_; + + TransportConfig config; + config.insert(std::make_pair(tc_enabled, enabled)); + config.insert(std::make_pair(tc_tcp_ip_address, current_ip_address_)); + config.insert(std::make_pair(tc_tcp_port, oss.str())); + + controller_->TransportConfigUpdated(config); + } +} + +bool TcpClientListener::StartOnNetworkInterface() { + LOG4CXX_AUTO_TRACE(logger_); + + // this method is only for the case that network interface is specified + if (IsListeningOnSpecificInterface()) { + { + // make sure that two threads will not update socket_ at the same time + sync_primitives::AutoLock auto_lock(start_stop_lock_); + if (socket_ < 0) { + socket_ = CreateIPv4ServerSocket(port_, designated_interface_); + if (-1 == socket_) { + LOG4CXX_WARN(logger_, "Failed to create TCP socket"); + return false; + } + } + } + + if (TransportAdapter::OK != StartListeningThread()) { + LOG4CXX_WARN(logger_, "Failed to start TCP client listener"); + return false; + } + LOG4CXX_INFO(logger_, + "TCP server socket started on " << designated_interface_); + } + return true; +} + +bool TcpClientListener::StopOnNetworkInterface() { + LOG4CXX_AUTO_TRACE(logger_); + + if (IsListeningOnSpecificInterface()) { + if (TransportAdapter::OK != StopListeningThread()) { + LOG4CXX_WARN(logger_, "Failed to stop TCP client listener"); + return false; + } + + { + sync_primitives::AutoLock auto_lock(start_stop_lock_); + DestroyServerSocket(socket_); + socket_ = -1; + } + + LOG4CXX_INFO(logger_, + "TCP server socket on " << designated_interface_ + << " stopped"); + } + return true; +} + +bool TcpClientListener::IsListeningOnSpecificInterface() const { + return !designated_interface_.empty(); +} + +int TcpClientListener::CreateIPv4ServerSocket( + uint16_t port, const std::string interface_name) { + LOG4CXX_AUTO_TRACE(logger_); + + struct in_addr ipv4_address; + memset(&ipv4_address, 0, sizeof(ipv4_address)); + if (interface_name.empty()) { + ipv4_address.s_addr = htonl(INADDR_ANY); + } else if (!GetIPv4Address(interface_name, &ipv4_address)) { + return -1; + } + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (-1 == sock) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); + return -1; + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + server_address.sin_addr = ipv4_address; + + int optval = 1; + if (0 != + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))) { + LOG4CXX_WARN_WITH_ERRNO(logger_, "setsockopt SO_REUSEADDR failed"); + } + + if (bind(sock, + reinterpret_cast<sockaddr*>(&server_address), + sizeof(server_address)) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); + close(sock); + return -1; + } + + const int kBacklog = 128; + if (0 != listen(sock, kBacklog)) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); + close(sock); + return -1; + } + + return sock; +} + +void TcpClientListener::DestroyServerSocket(int sock) { + LOG4CXX_AUTO_TRACE(logger_); + if (sock >= 0) { + if (shutdown(sock, SHUT_RDWR) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket"); + } + if (close(sock) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + } + } +} + +bool TcpClientListener::GetIPv4Address(const std::string interface_name, + struct in_addr* ip_address) { + LOG4CXX_AUTO_TRACE(logger_); + +#ifdef BUILD_TESTS + if (testing_) { + // don't actually call getifaddrs(), instead return a dummy address of + // INADDR_LOOPBACK + struct in_addr dummy_addr; + dummy_addr.s_addr = htonl(INADDR_LOOPBACK); + + if (ip_address != NULL) { + *ip_address = dummy_addr; + } + return true; + } +#endif // BUILD_TESTS + + struct ifaddrs* if_list; + if (getifaddrs(&if_list) != 0) { + LOG4CXX_WARN(logger_, "getifaddrs failed"); + return false; + } + + struct ifaddrs* interface; + bool found = false; + + for (interface = if_list; interface != NULL; + interface = interface->ifa_next) { + if (interface->ifa_name == NULL) { + continue; + } + if (interface_name == interface->ifa_name) { + if (interface->ifa_addr == NULL) { + continue; + } + switch (interface->ifa_addr->sa_family) { + case AF_INET: { + struct sockaddr_in* addr = + reinterpret_cast<struct sockaddr_in*>(interface->ifa_addr); + if (ip_address != NULL) { + *ip_address = addr->sin_addr; + } + found = true; + break; + } + default: + break; + } + } + } + + freeifaddrs(if_list); + + return found; +} + +static bool SetNonblocking(int s) { + int prev_flag = fcntl(s, F_GETFL, 0); + if (prev_flag == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to acquire socket flag"); + return false; + } + + int ret = fcntl(s, F_SETFL, prev_flag | O_NONBLOCK); + if (ret == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, + "Failed to configure socket to non-blocking"); + return false; + } + + return true; +} + } // namespace transport_adapter } // namespace transport_manager |