diff options
Diffstat (limited to 'src/components/transport_manager/src/tcp/tcp_client_listener.cc')
-rw-r--r-- | src/components/transport_manager/src/tcp/tcp_client_listener.cc | 184 |
1 files changed, 104 insertions, 80 deletions
diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index c0f39cc490..28a3c389da 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -53,7 +53,7 @@ #include <sstream> #include "utils/logger.h" - +#include "utils/threads/thread.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "transport_manager/tcp/tcp_device.h" #include "transport_manager/tcp/tcp_socket_connection.h" @@ -66,35 +66,78 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") TcpClientListener::TcpClientListener(TransportAdapterController* controller, const uint16_t port, const bool enable_keepalive) - : port_(port), - enable_keepalive_(enable_keepalive), - controller_(controller), - thread_(threads::CreateThread("TcpClientListener", this)), - socket_(-1), - thread_stop_requested_(false) { } + : port_(port), + enable_keepalive_(enable_keepalive), + controller_(controller), + thread_(0), + socket_(-1), + thread_stop_requested_(false) { + thread_ = threads::CreateThread("TcpClientListener", + new ListeningThreadDelegate(this)); +} TransportAdapter::Error TcpClientListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = false; + + socket_ = socket(AF_INET, SOCK_STREAM, 0); + if (-1 == socket_) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); + return TransportAdapter::FAIL; + } + + sockaddr_in server_address = { 0 }; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port_); + server_address.sin_addr.s_addr = INADDR_ANY; + + int optval = 1; + setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + + if (bind(socket_, reinterpret_cast<sockaddr*>(&server_address), + sizeof(server_address)) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); + return TransportAdapter::FAIL; + } + + const int kBacklog = 128; + if (0 != listen(socket_, kBacklog)) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); + return TransportAdapter::FAIL; + } return TransportAdapter::OK; } void TcpClientListener::Terminate() { - LOG4CXX_TRACE(logger_, "enter"); - if (TransportAdapter::OK != StopListening()) { - LOG4CXX_ERROR(logger_, "Cannot stop listening TCP"); + LOG4CXX_AUTO_TRACE(logger_); + if (socket_ == -1) { + LOG4CXX_WARN(logger_, "Socket has been closed"); + return; } - LOG4CXX_TRACE(logger_, "exit"); + if (shutdown(socket_, SHUT_RDWR) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket"); + } + if (close(socket_) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + } + socket_ = -1; } bool TcpClientListener::IsInitialised() const { - return true; + return thread_; } TcpClientListener::~TcpClientListener() { - LOG4CXX_INFO(logger_, "destructor"); + LOG4CXX_AUTO_TRACE(logger_); + StopListening(); + delete thread_->delegate(); + threads::DeleteThread(thread_); + Terminate(); } void SetKeepaliveOptions(const int fd) { - LOG4CXX_TRACE(logger_, "enter. fd: " << fd); + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "fd: " << fd); int yes = 1; int keepidle = 3; // 3 seconds to disconnection detecting int keepcnt = 5; @@ -108,7 +151,7 @@ void SetKeepaliveOptions(const int fd) { setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &user_timeout, sizeof(user_timeout)); #elif defined(__QNX__) // __linux__ - // TODO (KKolodiy): Out of order! + // TODO(KKolodiy): Out of order! const int kMidLength = 4; int mib[kMidLength]; @@ -130,23 +173,24 @@ void SetKeepaliveOptions(const int fd) { mib[3] = TCPCTL_KEEPINTVL; sysctl(mib, kMidLength, NULL, NULL, &keepintvl, sizeof(keepintvl)); - struct timeval tval = { 0 }; + struct timeval tval = {0}; tval.tv_sec = keepidle; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &tval, sizeof(tval)); #endif // __QNX__ - LOG4CXX_TRACE(logger_, "exit"); } -void TcpClientListener::threadMain() { - LOG4CXX_TRACE(logger_, "enter"); +void TcpClientListener::Loop() { + LOG4CXX_AUTO_TRACE(logger_); while (!thread_stop_requested_) { sockaddr_in client_address; socklen_t client_address_size = sizeof(client_address); - const int connection_fd = accept(socket_, (struct sockaddr*)&client_address, + const int connection_fd = accept(socket_, + (struct sockaddr*) &client_address, &client_address_size); if (thread_stop_requested_) { LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); + close(connection_fd); break; } @@ -157,6 +201,7 @@ void TcpClientListener::threadMain() { if (AF_INET != client_address.sin_family) { LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); + close(connection_fd); continue; } @@ -169,99 +214,78 @@ void TcpClientListener::threadMain() { SetKeepaliveOptions(connection_fd); } - TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, device_name); + TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, + device_name); DeviceSptr device = controller_->AddDevice(tcp_device); tcp_device = static_cast<TcpDevice*>(device.get()); const ApplicationHandle app_handle = tcp_device->AddIncomingApplication( - connection_fd); + connection_fd); TcpSocketConnection* connection( - new TcpSocketConnection(device->unique_device_id(), app_handle, - controller_)); + new TcpSocketConnection(device->unique_device_id(), app_handle, + controller_)); connection->set_socket(connection_fd); const TransportAdapter::Error error = connection->Start(); if (error != TransportAdapter::OK) { delete connection; } } - LOG4CXX_TRACE(logger_, "exit"); } -TransportAdapter::Error TcpClientListener::StartListening() { - LOG4CXX_TRACE(logger_, "enter"); - if (thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition: thread_started_"); - return TransportAdapter::BAD_STATE; - } - - socket_ = socket(AF_INET, SOCK_STREAM, 0); - - if (-1 == socket_) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL. Condition: -1 == socket_"); - return TransportAdapter::FAIL; - } - +void TcpClientListener::StopLoop() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = true; + // We need to connect to the listening socket to unblock accept() call + int byesocket = socket(AF_INET, SOCK_STREAM, 0); sockaddr_in server_address = { 0 }; server_address.sin_family = AF_INET; server_address.sin_port = htons(port_); server_address.sin_addr.s_addr = INADDR_ANY; + connect(byesocket, reinterpret_cast<sockaddr*>(&server_address), + sizeof(server_address)); + shutdown(byesocket, SHUT_RDWR); + close(byesocket); +} - int optval = 1; - setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); - - if (0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))"); - return TransportAdapter::FAIL; - } - - if (0 != listen(socket_, 128)) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)"); - return TransportAdapter::FAIL; +TransportAdapter::Error TcpClientListener::StartListening() { + LOG4CXX_AUTO_TRACE(logger_); + if (thread_->is_running()) { + LOG4CXX_WARN(logger_, + "TransportAdapter::BAD_STATE. Listener has already been started"); + return TransportAdapter::BAD_STATE; } - if (thread_->start()) { - LOG4CXX_DEBUG(logger_, "Tcp client listener thread started"); - } else { + if (!thread_->start()) { LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed"); return TransportAdapter::FAIL; } - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); + LOG4CXX_INFO(logger_, "Tcp client listener has started successfully"); return TransportAdapter::OK; } -bool TcpClientListener::exitThreadMain() { - StopListening(); - return true; +void TcpClientListener::ListeningThreadDelegate::exitThreadMain() { + parent_->StopLoop(); +} + +void TcpClientListener::ListeningThreadDelegate::threadMain() { + parent_->Loop(); +} + +TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate( + TcpClientListener* parent) + : parent_(parent) { } TransportAdapter::Error TcpClientListener::StopListening() { - LOG4CXX_TRACE(logger_, "enter"); + LOG4CXX_AUTO_TRACE(logger_); if (!thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition !thread_started_"); + LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now"); return TransportAdapter::BAD_STATE; } - thread_stop_requested_ = true; - // We need to connect to the listening socket to unblock accept() call - int byesocket = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in server_address = { 0 }; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - connect(byesocket, (sockaddr*)&server_address, sizeof(server_address)); - shutdown(byesocket, SHUT_RDWR); - close(byesocket); - LOG4CXX_DEBUG(logger_, "Tcp client listener thread terminated"); - close(socket_); - socket_ = -1; - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); + thread_->join(); + + LOG4CXX_INFO(logger_, "Tcp client listener has stopped successfully"); return TransportAdapter::OK; } |