summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md1
-rwxr-xr-xlib/cpp/Makefile.am1
-rw-r--r--lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp223
-rw-r--r--lib/cpp/src/thrift/transport/TNonblockingServerSocket.h5
-rw-r--r--lib/cpp/src/thrift/transport/TServerSocket.cpp306
-rw-r--r--lib/cpp/src/thrift/transport/TServerSocket.h19
-rw-r--r--lib/cpp/src/thrift/transport/TSocketUtils.h163
7 files changed, 452 insertions, 266 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 175d07fca..fbaf35dff 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@
- [THRIFT-5116](https://issues.apache.org/jira/browse/THRIFT-5116) - Upgrade NodeJS to 10.x
- [THRIFT-5138](https://issues.apache.org/jira/browse/THRIFT-5138) - Swift generator does not escape keywords properly
- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - In Go library TProcessor interface now includes ProcessorMap and AddToProcessorMap functions.
+- [THRIFT-5186](https://issues.apache.org/jira/browse/THRIFT-5186) - cpp: use all getaddrinfo() results when retrying failed bind() in T{Nonblocking,}ServerSocket
### Java
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index c73a17d75..a536d1719 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -189,6 +189,7 @@ include_transport_HEADERS = \
src/thrift/transport/THttpClient.h \
src/thrift/transport/THttpServer.h \
src/thrift/transport/TSocket.h \
+ src/thrift/transport/TSocketUtils.h \
src/thrift/transport/TPipe.h \
src/thrift/transport/TPipeServer.h \
src/thrift/transport/TSSLSocket.h \
diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
index c50ce381c..7bac37eb2 100644
--- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
@@ -44,9 +44,10 @@
#include <unistd.h>
#endif
-#include <thrift/transport/TSocket.h>
-#include <thrift/transport/TNonblockingServerSocket.h>
#include <thrift/transport/PlatformSocket.h>
+#include <thrift/transport/TNonblockingServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TSocketUtils.h>
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
@@ -74,8 +75,8 @@ namespace apache {
namespace thrift {
namespace transport {
-using std::string;
using std::shared_ptr;
+using std::string;
TNonblockingServerSocket::TNonblockingServerSocket(int port)
: port_(port),
@@ -171,60 +172,7 @@ void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
tcpRecvBuffer_ = tcpRecvBuffer;
}
-void TNonblockingServerSocket::listen() {
- listening_ = true;
-#ifdef _WIN32
- TWinsockSingleton::create();
-#endif // _WIN32
-
- // Validate port number
- if (port_ < 0 || port_ > 0xFFFF) {
- throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
- }
-
- const struct addrinfo *res;
- int error;
- char port[sizeof("65535")];
- THRIFT_SNPRINTF(port, sizeof(port), "%d", port_);
-
- struct addrinfo hints;
- std::memset(&hints, 0, sizeof(hints));
- hints.ai_family = PF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
-
- // If address is not specified use wildcard address (NULL)
- TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints);
-
- error = info.init();
- if (error) {
- GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not resolve host for server socket.");
- }
-
- // Pick the ipv6 address first since ipv4 addresses can be mapped
- // into ipv6 space.
- for (res = info.res(); res; res = res->ai_next) {
- if (res->ai_family == AF_INET6 || res->ai_next == nullptr)
- break;
- }
-
- if (!path_.empty()) {
- serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
- } else if (res != nullptr) {
- serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- }
-
- if (serverSocket_ == THRIFT_INVALID_SOCKET) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not create server socket.",
- errno_copy);
- }
+void TNonblockingServerSocket::_setup_sockopts() {
// Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
int one = 1;
@@ -278,19 +226,6 @@ void TNonblockingServerSocket::listen() {
}
}
-#ifdef IPV6_V6ONLY
- if (res->ai_family == AF_INET6 && path_.empty()) {
- int zero = 0;
- if (-1 == setsockopt(serverSocket_,
- IPPROTO_IPV6,
- IPV6_V6ONLY,
- cast_sockopt(&zero),
- sizeof(zero))) {
- GlobalOutput.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
- }
- }
-#endif // #ifdef IPV6_V6ONLY
-
// Turn linger off, don't want to block on calls to close
struct linger ling = {0, 0};
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
@@ -310,24 +245,6 @@ void TNonblockingServerSocket::listen() {
errno_copy);
}
- // Set TCP nodelay if available, MAC OS X Hack
- // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
-#ifndef TCP_NOPUSH
- // Unix Sockets do not need that
- if (path_.empty()) {
- // TCP Nodelay, speed over bandwidth
- if (-1
- == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not set TCP_NODELAY",
- errno_copy);
- }
- }
-#endif
-
// Set NONBLOCK on the accept socket
int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
if (flags == -1) {
@@ -348,6 +265,26 @@ void TNonblockingServerSocket::listen() {
errno_copy);
}
+} // _setup_sockopts()
+
+void TNonblockingServerSocket::_setup_tcp_sockopts() {
+ int one = 1;
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+#ifndef TCP_NOPUSH
+ // TCP Nodelay, speed over bandwidth
+ if (-1
+ == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not set TCP_NODELAY",
+ errno_copy);
+ }
+#endif
+
#ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
if (-1 == setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one))) {
@@ -361,17 +298,60 @@ void TNonblockingServerSocket::listen() {
}
#endif
- // prepare the port information
+} // _setup_tcp_sockopts()
+
+void TNonblockingServerSocket::listen() {
+ listening_ = true;
+#ifdef _WIN32
+ TWinsockSingleton::create();
+#endif // _WIN32
+
+ // tcp == false means Unix Domain socket
+ bool tcp = (path_.empty());
+
+ // Validate port number
+ if (port_ < 0 || port_ > 0xFFFF) {
+ throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
+ }
+
+ // Resolve host:port strings into an iterable of struct addrinfo*
+ AddressResolutionHelper resolved_addresses;
+ if (tcp) {
+ try {
+ resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
+ AI_PASSIVE | AI_V4MAPPED);
+ } catch (const std::system_error& e) {
+ GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what());
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not resolve host for server socket.");
+ }
+ }
+
// we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
// always seem to work. The client can configure the retry variables.
int retries = 0;
int errno_copy = 0;
- if (!path_.empty()) {
+ if (!tcp) {
+ // -- Unix Domain Socket -- //
+
+ serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not create server socket.",
+ errno_copy);
+ }
+
+ _setup_sockopts();
+ //_setup_unixdomain_sockopts();
#ifndef _WIN32
- // Unix Domain Socket
size_t len = path_.size() + 1;
if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
errno_copy = THRIFT_GET_SOCKET_ERROR;
@@ -411,11 +391,48 @@ void TNonblockingServerSocket::listen() {
" Unix Domain socket path not supported");
#endif
} else {
+
+ // -- TCP socket -- //
+
+ auto addr_iter = AddressResolutionHelper::Iter{};
+
+ // Via DNS or somehow else, single hostname can resolve into many addresses.
+ // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate
+ // over what system gave us, picking the first address that works.
do {
- if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) {
+ if (!addr_iter) {
+ // init + recycle over many retries
+ addr_iter = resolved_addresses.iterate();
+ }
+ auto trybind = *addr_iter++;
+
+ serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
+ if (serverSocket_ == -1) {
+ errno_copy = THRIFT_GET_SOCKET_ERROR;
+ continue;
+ }
+
+ _setup_sockopts();
+ _setup_tcp_sockopts();
+
+#ifdef IPV6_V6ONLY
+ if (trybind->ai_family == AF_INET6) {
+ int zero = 0;
+ if (-1 == setsockopt(serverSocket_,
+ IPPROTO_IPV6,
+ IPV6_V6ONLY,
+ cast_sockopt(&zero),
+ sizeof(zero))) {
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
+ }
+ }
+#endif // #ifdef IPV6_V6ONLY
+
+ if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
break;
}
errno_copy = THRIFT_GET_SOCKET_ERROR;
+
// use short circuit evaluation here to only sleep if we need to
} while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
@@ -437,12 +454,21 @@ void TNonblockingServerSocket::listen() {
}
}
}
+ } // TCP socket //
+
+ // throw error if socket still wasn't created successfully
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not create server socket.",
+ errno_copy);
}
// throw an error if we failed to bind properly
if (retries > retryLimit_) {
char errbuf[1024];
- if (!path_.empty()) {
+ if (!tcp) {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
} else {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
@@ -478,9 +504,10 @@ int TNonblockingServerSocket::getListenPort() {
shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
if (serverSocket_ == THRIFT_INVALID_SOCKET) {
- throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServerSocket not listening");
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "TNonblockingServerSocket not listening");
}
-
+
struct sockaddr_storage clientAddress;
int size = sizeof(clientAddress);
THRIFT_SOCKET clientSocket
@@ -544,6 +571,6 @@ void TNonblockingServerSocket::close() {
serverSocket_ = THRIFT_INVALID_SOCKET;
listening_ = false;
}
-}
-}
-} // apache::thrift::transport
+} // namespace transport
+} // namespace thrift
+} // namespace apache
diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
index a68c28d22..1ed2b07f9 100644
--- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
@@ -100,7 +100,7 @@ public:
THRIFT_SOCKET getSocketFD() override { return serverSocket_; }
int getPort() override;
-
+
int getListenPort() override;
void listen() override;
@@ -128,6 +128,9 @@ private:
socket_func_t listenCallback_;
socket_func_t acceptCallback_;
+
+ void _setup_sockopts();
+ void _setup_tcp_sockopts();
};
}
}
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index 150e53096..6b7652560 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -44,9 +44,10 @@
#include <unistd.h>
#endif
-#include <thrift/transport/TSocket.h>
-#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/PlatformSocket.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TSocketUtils.h>
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
@@ -83,26 +84,6 @@ namespace transport {
using std::shared_ptr;
-TGetAddrInfoWrapper::TGetAddrInfoWrapper(const char* node,
- const char* service,
- const struct addrinfo* hints)
- : node_(node), service_(service), hints_(hints), res_(nullptr) {}
-
-TGetAddrInfoWrapper::~TGetAddrInfoWrapper() {
- if (this->res_ != nullptr)
- freeaddrinfo(this->res_);
-}
-
-int TGetAddrInfoWrapper::init() {
- if (this->res_ == nullptr)
- return getaddrinfo(this->node_, this->service_, this->hints_, &(this->res_));
- return 0;
-}
-
-const struct addrinfo* TGetAddrInfoWrapper::res() {
- return this->res_;
-}
-
TServerSocket::TServerSocket(int port)
: interruptableChildren_(true),
port_(port),
@@ -228,84 +209,7 @@ void TServerSocket::setInterruptableChildren(bool enable) {
interruptableChildren_ = enable;
}
-void TServerSocket::listen() {
- listening_ = true;
-#ifdef _WIN32
- TWinsockSingleton::create();
-#endif // _WIN32
- THRIFT_SOCKET sv[2];
- // Create the socket pair used to interrupt
- if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt", THRIFT_GET_SOCKET_ERROR);
- interruptSockWriter_ = THRIFT_INVALID_SOCKET;
- interruptSockReader_ = THRIFT_INVALID_SOCKET;
- } else {
- interruptSockWriter_ = sv[1];
- interruptSockReader_ = sv[0];
- }
-
- // Create the socket pair used to interrupt all clients
- if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
- THRIFT_GET_SOCKET_ERROR);
- childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
- pChildInterruptSockReader_.reset();
- } else {
- childInterruptSockWriter_ = sv[1];
- pChildInterruptSockReader_
- = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
- }
-
- // Validate port number
- if (port_ < 0 || port_ > 0xFFFF) {
- throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
- }
-
- const struct addrinfo *res = nullptr;
- int error;
- char port[sizeof("65535")];
- THRIFT_SNPRINTF(port, sizeof(port), "%d", port_);
-
- struct addrinfo hints;
- std::memset(&hints, 0, sizeof(hints));
- hints.ai_family = PF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
-
- // If address is not specified use wildcard address (NULL)
- TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints);
-
- if (path_.empty()) {
- error = info.init();
- if (error) {
- GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not resolve host for server socket.");
- }
-
- // Pick the ipv6 address first since ipv4 addresses can be mapped
- // into ipv6 space.
- for (res = info.res(); res; res = res->ai_next) {
- if (res->ai_family == AF_INET6 || res->ai_next == nullptr)
- break;
- }
- }
-
- if (!path_.empty()) {
- serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
- } else if (res != nullptr) {
- serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- }
-
- if (serverSocket_ == THRIFT_INVALID_SOCKET) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not create server socket.",
- errno_copy);
- }
+void TServerSocket::_setup_sockopts() {
// Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
int one = 1;
@@ -359,33 +263,6 @@ void TServerSocket::listen() {
}
}
-// Defer accept
-#ifdef TCP_DEFER_ACCEPT
- if (path_.empty()) {
- if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not set TCP_DEFER_ACCEPT",
- errno_copy);
- }
- }
-#endif // #ifdef TCP_DEFER_ACCEPT
-
-#ifdef IPV6_V6ONLY
- if (path_.empty() && res->ai_family == AF_INET6) {
- int zero = 0;
- if (-1 == setsockopt(serverSocket_,
- IPPROTO_IPV6,
- IPV6_V6ONLY,
- cast_sockopt(&zero),
- sizeof(zero))) {
- GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
- }
- }
-#endif // #ifdef IPV6_V6ONLY
-
// Turn linger off, don't want to block on calls to close
struct linger ling = {0, 0};
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
@@ -395,20 +272,6 @@ void TServerSocket::listen() {
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
}
- // Unix Sockets do not need that
- if (path_.empty()) {
- // TCP Nodelay, speed over bandwidth
- if (-1
- == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
- GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
- close();
- throw TTransportException(TTransportException::NOT_OPEN,
- "Could not set TCP_NODELAY",
- errno_copy);
- }
- }
-
// Set NONBLOCK on the accept socket
int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
if (flags == -1) {
@@ -419,7 +282,6 @@ void TServerSocket::listen() {
"THRIFT_FCNTL() THRIFT_F_GETFL failed",
errno_copy);
}
-
if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
@@ -428,18 +290,114 @@ void TServerSocket::listen() {
"THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
errno_copy);
}
+}
+
+void TServerSocket::_setup_unixdomain_sockopts() {
+}
+
+void TServerSocket::_setup_tcp_sockopts() {
+ int one = 1;
+
+ // Defer accept
+#ifdef TCP_DEFER_ACCEPT
+ if (path_.empty()) {
+ if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_DEFER_ACCEPT, &one, sizeof(one))) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT",
+ errno_copy);
+ }
+ }
+#endif // #ifdef TCP_DEFER_ACCEPT
+
+ // TCP Nodelay, speed over bandwidth
+ if (-1
+ == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not set TCP_NODELAY",
+ errno_copy);
+ }
+}
+
+void TServerSocket::listen() {
+ listening_ = true;
+#ifdef _WIN32
+ TWinsockSingleton::create();
+#endif // _WIN32
+
+ THRIFT_SOCKET sv[2];
+ // Create the socket pair used to interrupt
+ if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
+ THRIFT_GET_SOCKET_ERROR);
+ interruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ interruptSockReader_ = THRIFT_INVALID_SOCKET;
+ } else {
+ interruptSockWriter_ = sv[1];
+ interruptSockReader_ = sv[0];
+ }
+
+ // Create the socket pair used to interrupt all clients
+ if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
+ THRIFT_GET_SOCKET_ERROR);
+ childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
+ pChildInterruptSockReader_.reset();
+ } else {
+ childInterruptSockWriter_ = sv[1];
+ pChildInterruptSockReader_
+ = std::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]), destroyer_of_fine_sockets);
+ }
+
+ // tcp == false means Unix Domain socket
+ bool tcp = (path_.empty());
+
+ // Validate port number
+ if (port_ < 0 || port_ > 0xFFFF) {
+ throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
+ }
+
+ // Resolve host:port strings into an iterable of struct addrinfo*
+ AddressResolutionHelper resolved_addresses;
+ if (tcp) {
+ try {
+ resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
+ AI_PASSIVE | AI_V4MAPPED);
+ } catch (const std::system_error& e) {
+ GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what());
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not resolve host for server socket.");
+ }
+ }
- // prepare the port information
// we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
// always seem to work. The client can configure the retry variables.
int retries = 0;
int errno_copy = 0;
- if (!path_.empty()) {
+ if (!tcp) {
+ // -- Unix Domain Socket -- //
-#ifndef _WIN32
+ serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
+ int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not create server socket.",
+ errno_copy);
+ }
- // Unix Domain Socket
+ _setup_sockopts();
+ _setup_unixdomain_sockopts();
+
+#ifndef _WIN32
size_t len = path_.size() + 1;
if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
errno_copy = THRIFT_GET_SOCKET_ERROR;
@@ -479,11 +437,48 @@ void TServerSocket::listen() {
" Unix Domain socket path not supported");
#endif
} else {
+
+ // -- TCP socket -- //
+
+ auto addr_iter = AddressResolutionHelper::Iter{};
+
+ // Via DNS or somehow else, single hostname can resolve into many addresses.
+ // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate
+ // over what system gave us, picking the first address that works.
do {
- if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) {
+ if (!addr_iter) {
+ // init + recycle over many retries
+ addr_iter = resolved_addresses.iterate();
+ }
+ auto trybind = *addr_iter++;
+
+ serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
+ if (serverSocket_ == -1) {
+ errno_copy = THRIFT_GET_SOCKET_ERROR;
+ continue;
+ }
+
+ _setup_sockopts();
+ _setup_tcp_sockopts();
+
+#ifdef IPV6_V6ONLY
+ if (trybind->ai_family == AF_INET6) {
+ int zero = 0;
+ if (-1 == setsockopt(serverSocket_,
+ IPPROTO_IPV6,
+ IPV6_V6ONLY,
+ cast_sockopt(&zero),
+ sizeof(zero))) {
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
+ }
+ }
+#endif // #ifdef IPV6_V6ONLY
+
+ if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
break;
}
errno_copy = THRIFT_GET_SOCKET_ERROR;
+
// use short circuit evaluation here to only sleep if we need to
} while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
@@ -505,12 +500,21 @@ void TServerSocket::listen() {
}
}
}
+ } // TCP socket //
+
+ // throw error if socket still wasn't created successfully
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN,
+ "Could not create server socket.",
+ errno_copy);
}
// throw an error if we failed to bind properly
if (retries > retryLimit_) {
char errbuf[1024];
- if (!path_.empty()) {
+ if (!tcp) {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() PATH %s", path_.c_str());
} else {
THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TServerSocket::listen() BIND %d", port_);
@@ -699,6 +703,6 @@ void TServerSocket::close() {
pChildInterruptSockReader_.reset();
listening_ = false;
}
-}
-}
-} // apache::thrift::transport
+} // namespace transport
+} // namespace thrift
+} // namespace apache
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h
index 4562341b0..e4659a036 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TServerSocket.h
@@ -38,22 +38,6 @@ namespace transport {
class TSocket;
-class TGetAddrInfoWrapper {
-public:
- TGetAddrInfoWrapper(const char* node, const char* service, const struct addrinfo* hints);
-
- virtual ~TGetAddrInfoWrapper();
-
- int init();
- const struct addrinfo* res();
-
-private:
- const char* node_;
- const char* service_;
- const struct addrinfo* hints_;
- struct addrinfo* res_;
-};
-
/**
* Server socket implementation of TServerTransport. Wrapper around a unix
* socket listen and accept calls.
@@ -156,6 +140,9 @@ protected:
private:
void notify(THRIFT_SOCKET notifySock);
+ void _setup_sockopts();
+ void _setup_unixdomain_sockopts();
+ void _setup_tcp_sockopts();
int port_;
std::string address_;
diff --git a/lib/cpp/src/thrift/transport/TSocketUtils.h b/lib/cpp/src/thrift/transport/TSocketUtils.h
new file mode 100644
index 000000000..c9e0e57b8
--- /dev/null
+++ b/lib/cpp/src/thrift/transport/TSocketUtils.h
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_SOCKETUTILS_H_
+#define _THRIFT_TRANSPORT_SOCKETUTILS_H_ 1
+
+#include <memory>
+#include <string>
+#include <system_error>
+#include <vector>
+
+#include <sys/types.h>
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#include <thrift/transport/PlatformSocket.h>
+
+namespace apache {
+namespace thrift {
+
+/**
+ * A helper to resolve hostnames to struct addrinfo's -- and not leak memory.
+ *
+ * Use like this:
+ *
+ * apache::thrift::AddressResolutionHelper addresses("localhost", "80");
+ *
+ * for (auto addr : addresses.iterate()) {
+ * connect(sock, addr->ai_addr, addr->ai_addrlen);
+ * // ...
+ * }
+ */
+struct AddressResolutionHelper {
+
+private:
+ struct addrinfo_deleter {
+ void operator()(addrinfo* addr) {
+ ::freeaddrinfo(addr); // frees the whole list
+ }
+ };
+
+public:
+ using PtrOwnedList = std::unique_ptr<addrinfo, addrinfo_deleter>;
+
+ struct Iter : std::iterator<std::forward_iterator_tag, const addrinfo*> {
+ value_type ptr = nullptr;
+
+ Iter() = default;
+ Iter(const addrinfo* head) : ptr(head) {}
+
+ value_type operator*() const { return ptr; }
+
+ bool operator==(const Iter& other) { return this->ptr == other.ptr; }
+ bool operator!=(const Iter& other) { return this->ptr != other.ptr; }
+
+ operator bool() { return this->ptr != nullptr; }
+ bool operator!() { return this->ptr == nullptr; }
+
+ Iter& operator++() {
+ if (ptr == nullptr) {
+ throw std::out_of_range("won't go pass end of linked list");
+ }
+ ptr = ptr->ai_next;
+ return *this;
+ }
+ Iter operator++(int) {
+ Iter tmp(*this);
+ ++(*this);
+ return tmp;
+ }
+ };
+
+ struct gai_error : std::error_category {
+ virtual const char* name() const noexcept override { return "getaddrinfo"; }
+ virtual std::string message(int code) const override { return THRIFT_GAI_STRERROR(code); }
+ };
+
+private:
+ PtrOwnedList gai_results;
+
+ addrinfo* query(const std::string& host, const std::string& port, int socktype, int flags) {
+ addrinfo hints{};
+ hints.ai_flags = flags;
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = socktype;
+
+ addrinfo* head;
+ int ret = ::getaddrinfo(host.empty() ? NULL : host.c_str(), port.c_str(), &hints, &head);
+ if (ret == 0) {
+ return head;
+#ifdef _WIN32
+ } else {
+ throw std::system_error{THRIFT_GET_SOCKET_ERROR, std::system_category()};
+#else
+ } else if (ret == EAI_SYSTEM) {
+ throw std::system_error{THRIFT_GET_SOCKET_ERROR, std::system_category()};
+ } else {
+ throw std::system_error{ret, gai_error()};
+#endif
+ }
+ }
+
+public:
+ /**
+ * Constructor. May block. Throws errors.
+ *
+ * @param port Port number, or service name, as a string.
+ * @param socktype Socket type, SOCK_STREAM or SOCK_DGRAM.
+ * @param flags Standard getaddrinfo() flags.
+ */
+ AddressResolutionHelper(const std::string& host,
+ const std::string& port, // pass "25" or "smtp" for port 25
+ int socktype = SOCK_STREAM,
+ int flags = AI_V4MAPPED | AI_ADDRCONFIG)
+ : gai_results(query(host, port, socktype, flags)) {}
+
+ AddressResolutionHelper() = default;
+
+ /**
+ * Manual query. May block. Throws errors.
+ *
+ * @param port Port number, or service name, as a string.
+ * @param socktype Socket type, SOCK_STREAM or SOCK_DGRAM.
+ * @param flags Standard getaddrinfo() flags.
+ */
+ AddressResolutionHelper& resolve(const std::string& host,
+ const std::string& port, // pass "25" or "smtp" for port 25
+ int socktype = SOCK_STREAM,
+ int flags = AI_V4MAPPED | AI_ADDRCONFIG) {
+ gai_results.reset(query(host, port, socktype, flags));
+ return *this;
+ }
+
+ /**
+ * Return ForwardIterator to struct addrinfo* results.
+ */
+ Iter iterate() const { return Iter{gai_results.get()}; }
+};
+
+} // namespace thrift
+} // namespace apache
+
+#endif