diff options
Diffstat (limited to 'src/components/transport_manager/src/tcp/tcp_client_listener.cc')
-rw-r--r-- | src/components/transport_manager/src/tcp/tcp_client_listener.cc | 117 |
1 files changed, 69 insertions, 48 deletions
diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index c0f39cc490..954c734d68 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -53,7 +53,7 @@ #include <sstream> #include "utils/logger.h" - +#include "utils/threads/thread.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "transport_manager/tcp/tcp_device.h" #include "transport_manager/tcp/tcp_socket_connection.h" @@ -66,31 +66,35 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") TcpClientListener::TcpClientListener(TransportAdapterController* controller, const uint16_t port, const bool enable_keepalive) - : port_(port), - enable_keepalive_(enable_keepalive), - controller_(controller), - thread_(threads::CreateThread("TcpClientListener", this)), - socket_(-1), - thread_stop_requested_(false) { } + : port_(port), + enable_keepalive_(enable_keepalive), + controller_(controller), + thread_(0), + socket_(-1), + thread_stop_requested_(false) { + thread_ = threads::CreateThread("TcpClientListener", + new ListeningThreadDelegate(this)); +} TransportAdapter::Error TcpClientListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = false; return TransportAdapter::OK; } void TcpClientListener::Terminate() { - LOG4CXX_TRACE(logger_, "enter"); - if (TransportAdapter::OK != StopListening()) { - LOG4CXX_ERROR(logger_, "Cannot stop listening TCP"); - } - LOG4CXX_TRACE(logger_, "exit"); + thread_->stop(); } bool TcpClientListener::IsInitialised() const { - return true; + return thread_; } TcpClientListener::~TcpClientListener() { - LOG4CXX_INFO(logger_, "destructor"); + LOG4CXX_AUTO_TRACE(logger_); + thread_->join(); + delete thread_->delegate(); + threads::DeleteThread(thread_); } void SetKeepaliveOptions(const int fd) { @@ -130,7 +134,7 @@ void SetKeepaliveOptions(const int fd) { mib[3] = TCPCTL_KEEPINTVL; sysctl(mib, kMidLength, NULL, NULL, &keepintvl, sizeof(keepintvl)); - struct timeval tval = { 0 }; + struct timeval tval = {0}; tval.tv_sec = keepidle; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &tval, sizeof(tval)); @@ -138,12 +142,13 @@ void SetKeepaliveOptions(const int fd) { LOG4CXX_TRACE(logger_, "exit"); } -void TcpClientListener::threadMain() { +void TcpClientListener::Loop() { LOG4CXX_TRACE(logger_, "enter"); while (!thread_stop_requested_) { sockaddr_in client_address; socklen_t client_address_size = sizeof(client_address); - const int connection_fd = accept(socket_, (struct sockaddr*)&client_address, + const int connection_fd = accept(socket_, + (struct sockaddr*) &client_address, &client_address_size); if (thread_stop_requested_) { LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); @@ -157,6 +162,7 @@ void TcpClientListener::threadMain() { if (AF_INET != client_address.sin_family) { LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); + close(connection_fd); continue; } @@ -169,15 +175,16 @@ void TcpClientListener::threadMain() { SetKeepaliveOptions(connection_fd); } - TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, device_name); + TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, + device_name); DeviceSptr device = controller_->AddDevice(tcp_device); tcp_device = static_cast<TcpDevice*>(device.get()); const ApplicationHandle app_handle = tcp_device->AddIncomingApplication( - connection_fd); + connection_fd); TcpSocketConnection* connection( - new TcpSocketConnection(device->unique_device_id(), app_handle, - controller_)); + new TcpSocketConnection(device->unique_device_id(), app_handle, + controller_)); connection->set_socket(connection_fd); const TransportAdapter::Error error = connection->Start(); if (error != TransportAdapter::OK) { @@ -187,11 +194,26 @@ void TcpClientListener::threadMain() { LOG4CXX_TRACE(logger_, "exit"); } +void TcpClientListener::StopLoop() { + LOG4CXX_AUTO_TRACE(logger_); + thread_stop_requested_ = true; + // We need to connect to the listening socket to unblock accept() call + int byesocket = socket(AF_INET, SOCK_STREAM, 0); + sockaddr_in server_address = { 0 }; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port_); + server_address.sin_addr.s_addr = INADDR_ANY; + connect(byesocket, (sockaddr*) &server_address, sizeof(server_address)); + shutdown(byesocket, SHUT_RDWR); + close(byesocket); +} + TransportAdapter::Error TcpClientListener::StartListening() { LOG4CXX_TRACE(logger_, "enter"); - if (thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition: thread_started_"); + if (!thread_ || thread_->is_running()) { + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::BAD_STATE. Condition: thread_started_"); return TransportAdapter::BAD_STATE; } @@ -199,7 +221,8 @@ TransportAdapter::Error TcpClientListener::StartListening() { if (-1 == socket_) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL. Condition: -1 == socket_"); + LOG4CXX_TRACE(logger_, + "exit with TransportAdapter::FAIL. Condition: -1 == socket_"); return TransportAdapter::FAIL; } @@ -213,15 +236,17 @@ TransportAdapter::Error TcpClientListener::StartListening() { if (0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))"); + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))"); return TransportAdapter::FAIL; } if (0 != listen(socket_, 128)) { LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)"); + LOG4CXX_TRACE( + logger_, + "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)"); return TransportAdapter::FAIL; } @@ -235,33 +260,29 @@ TransportAdapter::Error TcpClientListener::StartListening() { return TransportAdapter::OK; } -bool TcpClientListener::exitThreadMain() { - StopListening(); - return true; +void TcpClientListener::ListeningThreadDelegate::exitThreadMain() { + parent_->StopLoop(); +} + +void TcpClientListener::ListeningThreadDelegate::threadMain() { + parent_->Loop(); +} + +TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate( + TcpClientListener* parent) + : parent_(parent) { } TransportAdapter::Error TcpClientListener::StopListening() { - LOG4CXX_TRACE(logger_, "enter"); - if (!thread_->is_running()) { - LOG4CXX_TRACE(logger_, - "exit with TransportAdapter::BAD_STATE. Condition !thread_started_"); + LOG4CXX_AUTO_TRACE(logger_); + if (!thread_ || !thread_->is_running()) { + LOG4CXX_TRACE(logger_, "TcpClientListener is not running now"); return TransportAdapter::BAD_STATE; } - thread_stop_requested_ = true; - // We need to connect to the listening socket to unblock accept() call - int byesocket = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in server_address = { 0 }; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - connect(byesocket, (sockaddr*)&server_address, sizeof(server_address)); - shutdown(byesocket, SHUT_RDWR); - close(byesocket); - LOG4CXX_DEBUG(logger_, "Tcp client listener thread terminated"); + thread_->stop(); close(socket_); socket_ = -1; - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); return TransportAdapter::OK; } |