diff options
author | Bryan Duxbury <bryanduxbury@apache.org> | 2010-09-28 14:36:07 +0000 |
---|---|---|
committer | Bryan Duxbury <bryanduxbury@apache.org> | 2010-09-28 14:36:07 +0000 |
commit | a18364ac88b035e9433a0238b8fbe75285aa04f3 (patch) | |
tree | 8e0e14db71433dde1ebd6f34a4fab028431ab535 | |
parent | ca67b899594004966f002a82ff126e0916f133d4 (diff) | |
download | thrift-a18364ac88b035e9433a0238b8fbe75285aa04f3.tar.gz |
THRIFT-900. cpp: Unix domain socket
This patch adds a new Unix Socket transport.
Patch: Roger Meier
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1002179 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | lib/cpp/src/transport/TServerSocket.cpp | 83 | ||||
-rw-r--r-- | lib/cpp/src/transport/TServerSocket.h | 2 | ||||
-rw-r--r-- | lib/cpp/src/transport/TSocket.cpp | 64 | ||||
-rw-r--r-- | lib/cpp/src/transport/TSocket.h | 15 |
4 files changed, 146 insertions, 18 deletions
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp index 2f14fd58a..836f6ba06 100644 --- a/lib/cpp/src/transport/TServerSocket.cpp +++ b/lib/cpp/src/transport/TServerSocket.cpp @@ -20,6 +20,7 @@ #include <cstring> #include <sys/types.h> #include <sys/socket.h> +#include <sys/un.h> #include <sys/poll.h> #include <sys/types.h> #include <netinet/in.h> @@ -68,6 +69,20 @@ TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) : intSock1_(-1), intSock2_(-1) {} +TServerSocket::TServerSocket(string path) : + port_(0), + path_(path), + serverSocket_(-1), + acceptBacklog_(1024), + sendTimeout_(0), + recvTimeout_(0), + retryLimit_(0), + retryDelay_(0), + tcpSendBuffer_(0), + tcpRecvBuffer_(0), + intSock1_(-1), + intSock2_(-1) {} + TServerSocket::~TServerSocket() { close(); } @@ -131,7 +146,12 @@ void TServerSocket::listen() { break; } - serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (! path_.empty()) { + serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); + } else { + serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + } + if (serverSocket_ == -1) { int errno_copy = errno; GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); @@ -201,13 +221,16 @@ void TServerSocket::listen() { throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy); } - // TCP Nodelay, speed over bandwidth - if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, - &one, sizeof(one))) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); - close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy); + // Unix Sockets do not need that + if (path_.empty()) { + // TCP Nodelay, speed over bandwidth + if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, + &one, sizeof(one))) { + int errno_copy = errno; + GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); + close(); + throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy); + } } // Set NONBLOCK on the accept socket @@ -228,21 +251,49 @@ void TServerSocket::listen() { // we may want to try to bind more than once, since SO_REUSEADDR doesn't // always seem to work. The client can configure the retry variables. int retries = 0; - do { - if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) { - break; + + if (! path_.empty()) { + // Unix Domain Socket + struct sockaddr_un address; + socklen_t len; + + if (path_.length() > sizeof(address.sun_path)) { + int errno_copy = errno; + GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); } - // use short circuit evaluation here to only sleep if we need to - } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0)); + address.sun_family = AF_UNIX; + sprintf(address.sun_path, path_.c_str()); + len = sizeof(address); + + do { + if (0 == bind(serverSocket_, (struct sockaddr *) &address, len)) { + break; + } + // use short circuit evaluation here to only sleep if we need to + } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0)); + } else { + do { + if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) { + break; + } + // use short circuit evaluation here to only sleep if we need to + } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0)); - // free addrinfo - freeaddrinfo(res0); + // free addrinfo + freeaddrinfo(res0); + } // throw an error if we failed to bind properly if (retries > retryLimit_) { char errbuf[1024]; - sprintf(errbuf, "TServerSocket::listen() BIND %d", port_); + if (! path_.empty()) { + sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str()); + } + else { + sprintf(errbuf, "TServerSocket::listen() BIND %d", port_); + } GlobalOutput(errbuf); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not bind"); diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h index a6be01737..8cd521fbb 100644 --- a/lib/cpp/src/transport/TServerSocket.h +++ b/lib/cpp/src/transport/TServerSocket.h @@ -36,6 +36,7 @@ class TServerSocket : public TServerTransport { public: TServerSocket(int port); TServerSocket(int port, int sendTimeout, int recvTimeout); + TServerSocket(std::string path); ~TServerSocket(); @@ -58,6 +59,7 @@ class TServerSocket : public TServerTransport { private: int port_; + std::string path_; int serverSocket_; int acceptBacklog_; int sendTimeout_; diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 5da33bb1d..951ddcf11 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -21,6 +21,7 @@ #include <cstring> #include <sstream> #include <sys/socket.h> +#include <sys/un.h> #include <sys/poll.h> #include <sys/types.h> #include <arpa/inet.h> @@ -50,6 +51,23 @@ uint32_t g_socket_syscalls = 0; TSocket::TSocket(string host, int port) : host_(host), port_(port), + path_(""), + socket_(-1), + connTimeout_(0), + sendTimeout_(0), + recvTimeout_(0), + lingerOn_(1), + lingerVal_(0), + noDelay_(1), + maxRecvRetries_(5) { + recvTimeval_.tv_sec = (int)(recvTimeout_/1000); + recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); +} + +TSocket::TSocket(string path) : + host_(""), + port_(0), + path_(path), socket_(-1), connTimeout_(0), sendTimeout_(0), @@ -65,6 +83,7 @@ TSocket::TSocket(string host, int port) : TSocket::TSocket() : host_(""), port_(0), + path_(""), socket_(-1), connTimeout_(0), sendTimeout_(0), @@ -80,6 +99,7 @@ TSocket::TSocket() : TSocket::TSocket(int socket) : host_(""), port_(0), + path_(""), socket_(socket), connTimeout_(0), sendTimeout_(0), @@ -130,7 +150,12 @@ void TSocket::openConnection(struct addrinfo *res) { return; } - socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (! path_.empty()) { + socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP); + } else { + socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + } + if (socket_ == -1) { int errno_copy = errno; GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy); @@ -179,7 +204,24 @@ void TSocket::openConnection(struct addrinfo *res) { } // Connect the socket - int ret = connect(socket_, res->ai_addr, res->ai_addrlen); + int ret; + if (! path_.empty()) { + struct sockaddr_un address; + socklen_t len; + + if (path_.length() > sizeof(address.sun_path)) { + int errno_copy = errno; + GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); + } + + address.sun_family = AF_UNIX; + sprintf(address.sun_path, path_.c_str()); + len = sizeof(address); + ret = connect(socket_, (struct sockaddr *) &address, len); + } else { + ret = connect(socket_, res->ai_addr, res->ai_addrlen); + } // success case if (ret == 0) { @@ -237,6 +279,24 @@ void TSocket::open() { if (isOpen()) { return; } + if (! path_.empty()) { + unix_open(); + } else { + local_open(); + } +} + +void TSocket::unix_open(){ + if (! path_.empty()) { + // Unix Domain SOcket does not need addrinfo struct, so we pass NULL + openConnection(NULL); + } +} + +void TSocket::local_open(){ + if (isOpen()) { + return; + } // Validate port number if (port_ < 0 || port_ > 0xFFFF) { diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index 0184362c1..f69a9a1ac 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -60,6 +60,14 @@ class TSocket : public TTransport { TSocket(std::string host, int port); /** + * Constructs a new Unix domain socket. + * Note that this does NOT actually connect the socket. + * + * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift" + */ + TSocket(std::string path); + + /** * Destroyes the socket object, closing it if necessary. */ virtual ~TSocket(); @@ -217,6 +225,9 @@ class TSocket : public TTransport { /** Port number to connect on */ int port_; + /** UNIX domain socket path */ + std::string path_; + /** Underlying UNIX socket handle */ int socket_; @@ -246,6 +257,10 @@ class TSocket : public TTransport { /** Whether to use low minimum TCP retransmission timeout */ static bool useLowMinRto_; + + private: + void unix_open(); + void local_open(); }; }}} // apache::thrift::transport |