summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Meier <roger@apache.org>2011-07-08 12:23:31 +0000
committerRoger Meier <roger@apache.org>2011-07-08 12:23:31 +0000
commit30aae0ca877c9f5863ff881b29edc6a38df9d85a (patch)
tree1bbc59bc6947cebbd4baf942959c4be851a41976
parent0bb3db2eedc2ae91088b6caffc2b75fb3a2a5e27 (diff)
downloadthrift-30aae0ca877c9f5863ff881b29edc6a38df9d85a.tar.gz
THRIFT-1217 Use evutil_socketpair instead of pipe
Patch: alexandre parenteau git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1144286 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--lib/cpp/src/server/TNonblockingServer.cpp35
-rw-r--r--lib/cpp/src/server/TNonblockingServer.h46
2 files changed, 60 insertions, 21 deletions
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 69ae23515..4774b361c 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -22,10 +22,20 @@
#include <transport/TSocket.h>
#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_NETDB_H
#include <netdb.h>
+#endif
+
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
@@ -708,7 +718,7 @@ void TNonblockingServer::listenSocket() {
#ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6) {
int zero = 0;
- if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
}
}
@@ -718,9 +728,9 @@ void TNonblockingServer::listenSocket() {
int one = 1;
// Set reuseaddr to avoid 2MSL delay on server restart
- setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
- if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+ if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
close(s);
freeaddrinfo(res0);
throw TException("TNonblockingServer::serve() bind");
@@ -750,20 +760,20 @@ void TNonblockingServer::listenSocket(int s) {
struct linger ling = {0, 0};
// Keepalive to ensure full result flushing
- setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
// Turn linger off to avoid hung sockets
- setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+ setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
// Set TCP nodelay if available, MAC OS X Hack
// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
#ifndef TCP_NOPUSH
- setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
#endif
#ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
- setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
+ setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
}
#endif
@@ -777,13 +787,12 @@ void TNonblockingServer::listenSocket(int s) {
}
void TNonblockingServer::createNotificationPipe() {
- if (pipe(notificationPipeFDs_) != 0) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
- throw TException("can't create notification pipe");
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+ throw TException("can't create notification pipe");
}
- int flags;
- if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
- fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 0252f10d3..7b1cf4dd8 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -44,6 +44,36 @@ using apache::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
+#ifdef LIBEVENT_VERSION_NUMBER
+#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
+#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
+#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
+#else
+// assume latest version 1 series
+#define LIBEVENT_VERSION_MAJOR 1
+#define LIBEVENT_VERSION_MINOR 14
+#define LIBEVENT_VERSION_REL 13
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#endif
+
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ typedef int evutil_socket_t;
+#endif
+
+#ifndef SOCKOPT_CAST_T
+#define SOCKOPT_CAST_T void
+#endif
+
+template<class T>
+inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
+ return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
+}
+
+template<class T>
+inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
+ return reinterpret_cast<SOCKOPT_CAST_T*>(v);
+}
+
/**
* This is a non-blocking server in C++ for high performance that operates a
* single IO thread. It assumes that all incoming requests are framed with a
@@ -176,7 +206,7 @@ class TNonblockingServer : public TServer {
uint64_t nTotalConnectionsDropped_;
/// File descriptors for pipe used for task completion notification.
- int notificationPipeFDs_[2];
+ evutil_socket_t notificationPipeFDs_[2];
/**
* This is a stack of all the objects that have been created but that
@@ -634,7 +664,7 @@ class TNonblockingServer : public TServer {
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
- static void eventHandler(int fd, short which, void* v) {
+ static void eventHandler(evutil_socket_t fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
@@ -874,7 +904,7 @@ class TConnection {
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TConnection's "this".
*/
- static void eventHandler(int fd, short /* which */, void* v) {
+ static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
((TConnection*)v)->workSocket();
}
@@ -887,17 +917,17 @@ class TConnection {
*
* @param fd the descriptor the event occured on.
*/
- static void taskHandler(int fd, short /* which */, void* /* v */) {
+ static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
TConnection* connection;
ssize_t nBytes;
- while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
+ while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
== sizeof(TConnection*)) {
connection->transition();
}
if (nBytes > 0) {
throw TException("TConnection::taskHandler unexpected partial read");
}
- if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
}
}
@@ -911,8 +941,8 @@ class TConnection {
*/
bool notifyServer() {
TConnection* connection = this;
- if (write(server_->getNotificationSendFD(), (const void*)&connection,
- sizeof(TConnection*)) != sizeof(TConnection*)) {
+ if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
+ sizeof(TConnection*), 0) != sizeof(TConnection*)) {
return false;
}