summaryrefslogtreecommitdiff
path: root/src/components/transport_manager/src/tcp/tcp_client_listener.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/transport_manager/src/tcp/tcp_client_listener.cc')
-rw-r--r--src/components/transport_manager/src/tcp/tcp_client_listener.cc184
1 files changed, 104 insertions, 80 deletions
diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc
index c0f39cc49..28a3c389d 100644
--- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc
+++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc
@@ -53,7 +53,7 @@
#include <sstream>
#include "utils/logger.h"
-
+#include "utils/threads/thread.h"
#include "transport_manager/transport_adapter/transport_adapter_controller.h"
#include "transport_manager/tcp/tcp_device.h"
#include "transport_manager/tcp/tcp_socket_connection.h"
@@ -66,35 +66,78 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager")
TcpClientListener::TcpClientListener(TransportAdapterController* controller,
const uint16_t port,
const bool enable_keepalive)
- : port_(port),
- enable_keepalive_(enable_keepalive),
- controller_(controller),
- thread_(threads::CreateThread("TcpClientListener", this)),
- socket_(-1),
- thread_stop_requested_(false) { }
+ : port_(port),
+ enable_keepalive_(enable_keepalive),
+ controller_(controller),
+ thread_(0),
+ socket_(-1),
+ thread_stop_requested_(false) {
+ thread_ = threads::CreateThread("TcpClientListener",
+ new ListeningThreadDelegate(this));
+}
TransportAdapter::Error TcpClientListener::Init() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_stop_requested_ = false;
+
+ socket_ = socket(AF_INET, SOCK_STREAM, 0);
+ if (-1 == socket_) {
+ LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket");
+ return TransportAdapter::FAIL;
+ }
+
+ sockaddr_in server_address = { 0 };
+ server_address.sin_family = AF_INET;
+ server_address.sin_port = htons(port_);
+ server_address.sin_addr.s_addr = INADDR_ANY;
+
+ int optval = 1;
+ setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
+
+ if (bind(socket_, reinterpret_cast<sockaddr*>(&server_address),
+ sizeof(server_address)) != 0) {
+ LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed");
+ return TransportAdapter::FAIL;
+ }
+
+ const int kBacklog = 128;
+ if (0 != listen(socket_, kBacklog)) {
+ LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed");
+ return TransportAdapter::FAIL;
+ }
return TransportAdapter::OK;
}
void TcpClientListener::Terminate() {
- LOG4CXX_TRACE(logger_, "enter");
- if (TransportAdapter::OK != StopListening()) {
- LOG4CXX_ERROR(logger_, "Cannot stop listening TCP");
+ LOG4CXX_AUTO_TRACE(logger_);
+ if (socket_ == -1) {
+ LOG4CXX_WARN(logger_, "Socket has been closed");
+ return;
}
- LOG4CXX_TRACE(logger_, "exit");
+ if (shutdown(socket_, SHUT_RDWR) != 0) {
+ LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket");
+ }
+ if (close(socket_) != 0) {
+ LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket");
+ }
+ socket_ = -1;
}
bool TcpClientListener::IsInitialised() const {
- return true;
+ return thread_;
}
TcpClientListener::~TcpClientListener() {
- LOG4CXX_INFO(logger_, "destructor");
+ LOG4CXX_AUTO_TRACE(logger_);
+ StopListening();
+ delete thread_->delegate();
+ threads::DeleteThread(thread_);
+ Terminate();
}
void SetKeepaliveOptions(const int fd) {
- LOG4CXX_TRACE(logger_, "enter. fd: " << fd);
+ LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_DEBUG(logger_, "fd: " << fd);
int yes = 1;
int keepidle = 3; // 3 seconds to disconnection detecting
int keepcnt = 5;
@@ -108,7 +151,7 @@ void SetKeepaliveOptions(const int fd) {
setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &user_timeout,
sizeof(user_timeout));
#elif defined(__QNX__) // __linux__
- // TODO (KKolodiy): Out of order!
+ // TODO(KKolodiy): Out of order!
const int kMidLength = 4;
int mib[kMidLength];
@@ -130,23 +173,24 @@ void SetKeepaliveOptions(const int fd) {
mib[3] = TCPCTL_KEEPINTVL;
sysctl(mib, kMidLength, NULL, NULL, &keepintvl, sizeof(keepintvl));
- struct timeval tval = { 0 };
+ struct timeval tval = {0};
tval.tv_sec = keepidle;
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes));
setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &tval, sizeof(tval));
#endif // __QNX__
- LOG4CXX_TRACE(logger_, "exit");
}
-void TcpClientListener::threadMain() {
- LOG4CXX_TRACE(logger_, "enter");
+void TcpClientListener::Loop() {
+ LOG4CXX_AUTO_TRACE(logger_);
while (!thread_stop_requested_) {
sockaddr_in client_address;
socklen_t client_address_size = sizeof(client_address);
- const int connection_fd = accept(socket_, (struct sockaddr*)&client_address,
+ const int connection_fd = accept(socket_,
+ (struct sockaddr*) &client_address,
&client_address_size);
if (thread_stop_requested_) {
LOG4CXX_DEBUG(logger_, "thread_stop_requested_");
+ close(connection_fd);
break;
}
@@ -157,6 +201,7 @@ void TcpClientListener::threadMain() {
if (AF_INET != client_address.sin_family) {
LOG4CXX_DEBUG(logger_, "Address of connected client is invalid");
+ close(connection_fd);
continue;
}
@@ -169,99 +214,78 @@ void TcpClientListener::threadMain() {
SetKeepaliveOptions(connection_fd);
}
- TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr, device_name);
+ TcpDevice* tcp_device = new TcpDevice(client_address.sin_addr.s_addr,
+ device_name);
DeviceSptr device = controller_->AddDevice(tcp_device);
tcp_device = static_cast<TcpDevice*>(device.get());
const ApplicationHandle app_handle = tcp_device->AddIncomingApplication(
- connection_fd);
+ connection_fd);
TcpSocketConnection* connection(
- new TcpSocketConnection(device->unique_device_id(), app_handle,
- controller_));
+ new TcpSocketConnection(device->unique_device_id(), app_handle,
+ controller_));
connection->set_socket(connection_fd);
const TransportAdapter::Error error = connection->Start();
if (error != TransportAdapter::OK) {
delete connection;
}
}
- LOG4CXX_TRACE(logger_, "exit");
}
-TransportAdapter::Error TcpClientListener::StartListening() {
- LOG4CXX_TRACE(logger_, "enter");
- if (thread_->is_running()) {
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::BAD_STATE. Condition: thread_started_");
- return TransportAdapter::BAD_STATE;
- }
-
- socket_ = socket(AF_INET, SOCK_STREAM, 0);
-
- if (-1 == socket_) {
- LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket");
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL. Condition: -1 == socket_");
- return TransportAdapter::FAIL;
- }
-
+void TcpClientListener::StopLoop() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_stop_requested_ = true;
+ // We need to connect to the listening socket to unblock accept() call
+ int byesocket = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in server_address = { 0 };
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port_);
server_address.sin_addr.s_addr = INADDR_ANY;
+ connect(byesocket, reinterpret_cast<sockaddr*>(&server_address),
+ sizeof(server_address));
+ shutdown(byesocket, SHUT_RDWR);
+ close(byesocket);
+}
- int optval = 1;
- setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
-
- if (0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))) {
- LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed");
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::FAIL. Condition: 0 != bind(socket_, (sockaddr*) &server_address, sizeof(server_address))");
- return TransportAdapter::FAIL;
- }
-
- if (0 != listen(socket_, 128)) {
- LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed");
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::FAIL. Condition: 0 != listen(socket_, 128)");
- return TransportAdapter::FAIL;
+TransportAdapter::Error TcpClientListener::StartListening() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ if (thread_->is_running()) {
+ LOG4CXX_WARN(logger_,
+ "TransportAdapter::BAD_STATE. Listener has already been started");
+ return TransportAdapter::BAD_STATE;
}
- if (thread_->start()) {
- LOG4CXX_DEBUG(logger_, "Tcp client listener thread started");
- } else {
+ if (!thread_->start()) {
LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed");
return TransportAdapter::FAIL;
}
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
+ LOG4CXX_INFO(logger_, "Tcp client listener has started successfully");
return TransportAdapter::OK;
}
-bool TcpClientListener::exitThreadMain() {
- StopListening();
- return true;
+void TcpClientListener::ListeningThreadDelegate::exitThreadMain() {
+ parent_->StopLoop();
+}
+
+void TcpClientListener::ListeningThreadDelegate::threadMain() {
+ parent_->Loop();
+}
+
+TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate(
+ TcpClientListener* parent)
+ : parent_(parent) {
}
TransportAdapter::Error TcpClientListener::StopListening() {
- LOG4CXX_TRACE(logger_, "enter");
+ LOG4CXX_AUTO_TRACE(logger_);
if (!thread_->is_running()) {
- LOG4CXX_TRACE(logger_,
- "exit with TransportAdapter::BAD_STATE. Condition !thread_started_");
+ LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now");
return TransportAdapter::BAD_STATE;
}
- thread_stop_requested_ = true;
- // We need to connect to the listening socket to unblock accept() call
- int byesocket = socket(AF_INET, SOCK_STREAM, 0);
- sockaddr_in server_address = { 0 };
- server_address.sin_family = AF_INET;
- server_address.sin_port = htons(port_);
- server_address.sin_addr.s_addr = INADDR_ANY;
- connect(byesocket, (sockaddr*)&server_address, sizeof(server_address));
- shutdown(byesocket, SHUT_RDWR);
- close(byesocket);
- LOG4CXX_DEBUG(logger_, "Tcp client listener thread terminated");
- close(socket_);
- socket_ = -1;
- LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK");
+ thread_->join();
+
+ LOG4CXX_INFO(logger_, "Tcp client listener has stopped successfully");
return TransportAdapter::OK;
}