summaryrefslogtreecommitdiff
path: root/lib/cpp/src/thrift/server/TNonblockingServer.cpp
diff options
context:
space:
mode:
authorKonrad Grochowski <hcorg@apache.org>2014-11-13 15:33:38 +0100
committerKonrad Grochowski <hcorg@apache.org>2014-11-18 11:39:10 +0100
commit16a23a6618754a5a87aeb8df99a72516b0272fb3 (patch)
tree3d3a5250cc06e9010c0e0bef5eed4454a3c6be18 /lib/cpp/src/thrift/server/TNonblockingServer.cpp
parent240120c8434b49d1f76d207aff4e3530d3ada14b (diff)
downloadthrift-16a23a6618754a5a87aeb8df99a72516b0272fb3.tar.gz
THRIFT-2729: C++ - .clang-format created and applied
Client: C++ Patch: Konrad Grochowski make style command added
Diffstat (limited to 'lib/cpp/src/thrift/server/TNonblockingServer.cpp')
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.cpp415
1 files changed, 188 insertions, 227 deletions
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 86a96c673..1cfdef8fa 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -65,7 +65,9 @@
#define PRIu64 "I64u"
#endif
-namespace apache { namespace thrift { namespace server {
+namespace apache {
+namespace thrift {
+namespace server {
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
@@ -76,11 +78,7 @@ using apache::thrift::transport::TTransportException;
using boost::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
- SOCKET_RECV_FRAMING,
- SOCKET_RECV,
- SOCKET_SEND
-};
+enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
/**
* Five states for the nonblocking server:
@@ -104,7 +102,7 @@ enum TAppState {
* essentially encapsulates a socket that has some associated libevent state.
*/
class TNonblockingServer::TConnection {
- private:
+private:
/// Server IO Thread handling this connection
TNonblockingIOThread* ioThread_;
@@ -176,22 +174,16 @@ class TNonblockingServer::TConnection {
boost::shared_ptr<TServerEventHandler> serverEventHandler_;
/// Thrift call context, if any
- void *connectionContext_;
+ void* connectionContext_;
/// Go into read mode
- void setRead() {
- setFlags(EV_READ | EV_PERSIST);
- }
+ void setRead() { setFlags(EV_READ | EV_PERSIST); }
/// Go into write mode
- void setWrite() {
- setFlags(EV_WRITE | EV_PERSIST);
- }
+ void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
/// Set socket idle
- void setIdle() {
- setFlags(0);
- }
+ void setIdle() { setFlags(0); }
/**
* Set event flags for this connection.
@@ -208,13 +200,14 @@ class TNonblockingServer::TConnection {
*/
void workSocket();
- public:
-
+public:
class Task;
/// Constructor
- TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
- const sockaddr* addr, socklen_t addrLen) {
+ TConnection(THRIFT_SOCKET socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
@@ -225,29 +218,29 @@ class TNonblockingServer::TConnection {
// once per TConnection (they don't need to be reallocated on init() call)
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(
- new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
+ new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
tSocket_.reset(new TSocket());
init(socket, ioThread, addr, addrLen);
}
- ~TConnection() {
- std::free(readBuffer_);
- }
+ ~TConnection() { std::free(readBuffer_); }
/// Close this connection and free or reset its resources.
void close();
- /**
- * Check buffers against any size limits and shrink it if exceeded.
- *
- * @param readLimit we reduce read buffer size to this (if nonzero).
- * @param writeLimit if nonzero and write buffer is larger, replace it.
- */
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread,
- const sockaddr* addr, socklen_t addrLen);
+ void init(THRIFT_SOCKET socket,
+ TNonblockingIOThread* ioThread,
+ const sockaddr* addr,
+ socklen_t addrLen);
/**
* This is called when the application transitions from one state into
@@ -278,17 +271,13 @@ class TNonblockingServer::TConnection {
*
* @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
*/
- bool notifyIOThread() {
- return ioThread_->notify(this);
- }
+ bool notifyIOThread() { return ioThread_->notify(this); }
/*
* Returns the number of this connection's currently assigned IO
* thread.
*/
- int getIOThreadNumber() const {
- return ioThread_->getThreadNumber();
- }
+ int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
/// Force connection shutdown for this connection.
void forceClose() {
@@ -299,44 +288,33 @@ class TNonblockingServer::TConnection {
}
/// return the server this connection was initialized for.
- TNonblockingServer* getServer() const {
- return server_;
- }
+ TNonblockingServer* getServer() const { return server_; }
/// get state of connection.
- TAppState getState() const {
- return appState_;
- }
+ TAppState getState() const { return appState_; }
/// return the TSocket transport wrapping this network connection
- boost::shared_ptr<TSocket> getTSocket() const {
- return tSocket_;
- }
+ boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
/// return the server event handler if any
- boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
- return serverEventHandler_;
- }
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
/// return the Thrift connection context if any
- void* getConnectionContext() {
- return connectionContext_;
- }
-
+ void* getConnectionContext() { return connectionContext_; }
};
-class TNonblockingServer::TConnection::Task: public Runnable {
- public:
+class TNonblockingServer::TConnection::Task : public Runnable {
+public:
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
- TConnection* connection) :
- processor_(processor),
- input_(input),
- output_(output),
- connection_(connection),
- serverEventHandler_(connection_->getServerEventHandler()),
- connectionContext_(connection_->getConnectionContext()) {}
+ TConnection* connection)
+ : processor_(processor),
+ input_(input),
+ output_(output),
+ connection_(connection),
+ serverEventHandler_(connection_->getServerEventHandler()),
+ connectionContext_(connection_->getConnectionContext()) {}
void run() {
try {
@@ -344,8 +322,8 @@ class TNonblockingServer::TConnection::Task: public Runnable {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
- if (!processor_->process(input_, output_, connectionContext_) ||
- !input_->getTransport()->peek()) {
+ if (!processor_->process(input_, output_, connectionContext_)
+ || !input_->getTransport()->peek()) {
break;
}
}
@@ -356,10 +334,10 @@ class TNonblockingServer::TConnection::Task: public Runnable {
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
- typeid(x).name(), x.what());
+ typeid(x).name(),
+ x.what());
} catch (...) {
- GlobalOutput.printf(
- "TNonblockingServer: unknown exception while processing.");
+ GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
@@ -368,11 +346,9 @@ class TNonblockingServer::TConnection::Task: public Runnable {
}
}
- TConnection* getTConnection() {
- return connection_;
- }
+ TConnection* getTConnection() { return connection_; }
- private:
+private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
@@ -405,22 +381,17 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
callsForResize_ = 0;
// get input/transports
- factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
- inputTransport_);
- factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
- outputTransport_);
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
- inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
- factoryInputTransport_);
- outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
- factoryOutputTransport_);
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_) {
- connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
- outputProtocol_);
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
} else {
connectionContext_ = NULL;
}
@@ -430,7 +401,7 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
}
void TNonblockingServer::TConnection::workSocket() {
- int got=0, left=0, sent=0;
+ int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;
switch (socketState_) {
@@ -470,12 +441,14 @@ void TNonblockingServer::TConnection::workSocket() {
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
- GlobalOutput.printf("TNonblockingServer: frame size too large "
- "(%" PRIu32 " > %" PRIu64 ") from client %s. "
- "Remote side not using TFramedTransport?",
- readWant_,
- (uint64_t)server_->getMaxFrameSize(),
- tSocket_->getSocketInfo().c_str());
+ GlobalOutput.printf(
+ "TNonblockingServer: frame size too large "
+ "(%" PRIu32 " > %" PRIu64
+ ") from client %s. "
+ "Remote side not using TFramedTransport?",
+ readWant_,
+ (uint64_t)server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
close();
return;
}
@@ -491,8 +464,7 @@ void TNonblockingServer::TConnection::workSocket() {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
- }
- catch (TTransportException& te) {
+ } catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
@@ -532,8 +504,7 @@ void TNonblockingServer::TConnection::workSocket() {
try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
- }
- catch (TTransportException& te) {
+ } catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
@@ -586,21 +557,18 @@ void TNonblockingServer::TConnection::transition() {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
- boost::shared_ptr<Runnable> task =
- boost::shared_ptr<Runnable>(new Task(processor_,
- inputProtocol_,
- outputProtocol_,
- this));
+ boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
+ new Task(processor_, inputProtocol_, outputProtocol_, this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
- try {
- server_->addTask(task);
- } catch (IllegalStateException & ise) {
- // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
- GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
- close();
- }
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException& ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
@@ -610,21 +578,22 @@ void TNonblockingServer::TConnection::transition() {
} else {
try {
if (serverEventHandler_) {
- serverEventHandler_->processContext(connectionContext_,
- getTSocket());
+ serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
- processor_->process(inputProtocol_, outputProtocol_,
- connectionContext_);
- } catch (const TTransportException &ttx) {
- GlobalOutput.printf("TNonblockingServer transport error in "
- "process(): %s", ttx.what());
+ processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
+ } catch (const TTransportException& ttx) {
+ GlobalOutput.printf(
+ "TNonblockingServer transport error in "
+ "process(): %s",
+ ttx.what());
server_->decrementActiveProcessors();
close();
return;
- } catch (const std::exception &x) {
+ } catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
- typeid(x).name(), x.what());
+ typeid(x).name(),
+ x.what());
server_->decrementActiveProcessors();
close();
return;
@@ -636,8 +605,8 @@ void TNonblockingServer::TConnection::transition() {
}
}
- // Intentionally fall through here, the call to process has written into
- // the writeBuffer_
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
@@ -687,7 +656,7 @@ void TNonblockingServer::TConnection::transition() {
callsForResize_ = 0;
}
- // N.B.: We also intentionally fall through here into the INIT state!
+ // N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
@@ -732,7 +701,7 @@ void TNonblockingServer::TConnection::transition() {
readBufferSize_ = newSize;
}
- readBufferPos_= 0;
+ readBufferPos_ = 0;
// Move into read request state
socketState_ = SOCKET_RECV;
@@ -803,8 +772,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
* ev structure for multiple monitored descriptors; each descriptor needs
* its own ev.
*/
- event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
- TConnection::eventHandler, this);
+ event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
@@ -841,9 +809,7 @@ void TNonblockingServer::TConnection::close() {
server_->returnConnection(this);
}
-void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
- size_t readLimit,
- size_t writeLimit) {
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
free(readBuffer_);
readBuffer_ = NULL;
@@ -860,7 +826,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
TNonblockingServer::~TNonblockingServer() {
// Close any active connections (moves them to the idle connection stack)
while (activeConnections_.size()) {
- activeConnections_.front()->close();
+ activeConnections_.front()->close();
}
// Clean up unused TConnection objects in connectionStack_
while (!connectionStack_.empty()) {
@@ -872,9 +838,9 @@ TNonblockingServer::~TNonblockingServer() {
// objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
// objects (as runnable) so these objects will never deallocate without help.
while (!ioThreads_.empty()) {
- boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
- ioThreads_.pop_back();
- iot->setThread(boost::shared_ptr<Thread>());
+ boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
+ ioThreads_.pop_back();
+ iot->setThread(boost::shared_ptr<Thread>());
}
}
@@ -882,8 +848,9 @@ TNonblockingServer::~TNonblockingServer() {
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(
- THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
+ const sockaddr* addr,
+ socklen_t addrLen) {
// Check the stack
Guard g(connMutex_);
@@ -914,10 +881,12 @@ TNonblockingServer::TConnection* TNonblockingServer::createConnection(
void TNonblockingServer::returnConnection(TConnection* connection) {
Guard g(connMutex_);
- activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end());
+ activeConnections_.erase(std::remove(activeConnections_.begin(),
+ activeConnections_.end(),
+ connection),
+ activeConnections_.end());
- if (connectionStackLimit_ &&
- (connectionStack_.size() >= connectionStackLimit_)) {
+ if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
--numTConnections_;
} else {
@@ -931,7 +900,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) {
* connections on fd and assign TConnection objects to handle those requests.
*/
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
- (void) which;
+ (void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
@@ -967,16 +936,16 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
// Explicitly set this socket to NONBLOCK mode
int flags;
- if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 ||
- THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
- GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR);
+ if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
+ || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
+ THRIFT_GET_SOCKET_ERROR);
::THRIFT_CLOSESOCKET(clientSocket);
return;
}
// Create a new TConnection for this client socket.
- TConnection* clientConnection =
- createConnection(clientSocket, addrp, addrLen);
+ TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
@@ -1007,7 +976,6 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
addrLen = sizeof(addrStorage);
}
-
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
@@ -1034,8 +1002,8 @@ void TNonblockingServer::createAndListenOnSocket() {
// Wildcard address
error = getaddrinfo(NULL, port, &hints, &res0);
if (error) {
- throw TException("TNonblockingServer::serve() getaddrinfo " +
- string(THRIFT_GAI_STRERROR(error)));
+ throw TException("TNonblockingServer::serve() getaddrinfo "
+ + string(THRIFT_GAI_STRERROR(error)));
}
// Pick the ipv6 address first since ipv4 addresses can be mapped
@@ -1052,15 +1020,14 @@ void TNonblockingServer::createAndListenOnSocket() {
throw TException("TNonblockingServer::serve() socket() -1");
}
- #ifdef IPV6_V6ONLY
+#ifdef IPV6_V6ONLY
if (res->ai_family == AF_INET6) {
int zero = 0;
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
}
}
- #endif // #ifdef IPV6_V6ONLY
-
+#endif // #ifdef IPV6_V6ONLY
int one = 1;
@@ -1089,8 +1056,8 @@ void TNonblockingServer::createAndListenOnSocket() {
void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Set socket to nonblocking mode
int flags;
- if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 ||
- THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
+ if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
+ || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
::THRIFT_CLOSESOCKET(s);
throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
}
@@ -1104,17 +1071,17 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
// Turn linger off to avoid hung sockets
setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
- // Set TCP nodelay if available, MAC OS X Hack
- // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
- #ifndef TCP_NOPUSH
+// Set TCP nodelay if available, MAC OS X Hack
+// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+#ifndef TCP_NOPUSH
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
- #endif
+#endif
- #ifdef TCP_LOW_MIN_RTO
+#ifdef TCP_LOW_MIN_RTO
if (TSocket::getUseLowMinRto()) {
setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
}
- #endif
+#endif
if (listen(s, LISTEN_BACKLOG) == -1) {
::THRIFT_CLOSESOCKET(s);
@@ -1128,28 +1095,31 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager) {
- threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
+ threadManager->setExpireCallback(
+ apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
+ this,
+ apache::thrift::stdcxx::placeholders::_1));
threadPoolProcessing_ = true;
} else {
threadPoolProcessing_ = false;
}
}
-bool TNonblockingServer::serverOverloaded() {
+bool TNonblockingServer::serverOverloaded() {
size_t activeConnections = numTConnections_ - connectionStack_.size();
- if (numActiveProcessors_ > maxActiveProcessors_ ||
- activeConnections > maxConnections_) {
+ if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
if (!overloaded_) {
- GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
- if (overloaded_ &&
- (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
- (activeConnections <= overloadHysteresis_ * maxConnections_)) {
- GlobalOutput.printf("TNonblockingServer: overload ended; "
- "%u dropped (%llu total)",
- nConnectionsDropped_, nTotalConnectionsDropped_);
+ if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
+ && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf(
+ "TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
+ nConnectionsDropped_,
+ nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
}
@@ -1162,10 +1132,8 @@ bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
- TConnection* connection =
- static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer()
- && connection->getState() == APP_WAIT_TASK);
+ TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
@@ -1174,10 +1142,8 @@ bool TNonblockingServer::drainPendingTask() {
}
void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
- TConnection* connection =
- static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer() &&
- connection->getState() == APP_WAIT_TASK);
+ TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
@@ -1206,7 +1172,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
- new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
ioThreads_.push_back(thread);
}
@@ -1221,18 +1187,19 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
- port_, ioThreads_.size());
+ port_,
+ ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new PlatformThreadFactory(
#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD)
- PlatformThreadFactory::OTHER, // scheduler
- PlatformThreadFactory::NORMAL, // priority
- 1, // stack size (MB)
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
#endif
- false // detached
- ));
+ false // detached
+ ));
assert(ioThreadFactory_.get());
@@ -1271,12 +1238,12 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
int number,
THRIFT_SOCKET listenSocket,
bool useHighPriority)
- : server_(server)
- , number_(number)
- , listenSocket_(listenSocket)
- , useHighPriority_(useHighPriority)
- , eventBase_(NULL)
- , ownEventBase_(false) {
+ : server_(server),
+ number_(number),
+ listenSocket_(listenSocket),
+ useHighPriority_(useHighPriority),
+ eventBase_(NULL),
+ ownEventBase_(false) {
notificationPipeFDs_[0] = -1;
notificationPipeFDs_[1] = -1;
}
@@ -1292,8 +1259,7 @@ TNonblockingIOThread::~TNonblockingIOThread() {
if (listenSocket_ >= 0) {
if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
- GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
- THRIFT_GET_SOCKET_ERROR);
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
}
listenSocket_ = THRIFT_INVALID_SOCKET;
}
@@ -1310,12 +1276,12 @@ TNonblockingIOThread::~TNonblockingIOThread() {
}
void TNonblockingIOThread::createNotificationPipe() {
- if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
- if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
- evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+ if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
+ || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
@@ -1323,15 +1289,16 @@ void TNonblockingIOThread::createNotificationPipe() {
for (int i = 0; i < 2; ++i) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
- if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
- THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
+ || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
#endif
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
- throw TException("TNonblockingServer::createNotificationPipe() "
- "FD_CLOEXEC");
+ throw TException(
+ "TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
}
}
}
@@ -1352,8 +1319,8 @@ void TNonblockingIOThread::registerEvents() {
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
+ event_get_version(),
+ event_base_get_method(eventBase_));
}
if (listenSocket_ >= 0) {
@@ -1367,11 +1334,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): "
- "event_add() failed on server listen event");
+ throw TException(
+ "TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
- number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}
createNotificationPipe();
@@ -1388,11 +1355,11 @@ void TNonblockingIOThread::registerEvents() {
// Add the event and start up the server
if (-1 == event_add(&notificationEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): "
- "event_add() failed on task-done notification event");
+ throw TException(
+ "TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
}
- GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
- number_);
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
@@ -1411,7 +1378,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
- TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;
@@ -1427,8 +1394,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
connection->transition();
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
- GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
- nBytes, kSize);
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
@@ -1436,11 +1402,11 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
// exit the loop
break;
} else { // nBytes < 0
- if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
- GlobalOutput.perror(
- "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
- ioThread->breakLoop(true);
- return;
+ if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
+ && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
+ GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
+ ioThread->breakLoop(true);
+ return;
}
// exit the loop
break;
@@ -1450,8 +1416,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
void TNonblockingIOThread::breakLoop(bool error) {
if (error) {
- GlobalOutput.printf(
- "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
// TODO: figure out something better to do here, but for now kill the
// whole process.
GlobalOutput.printf("TNonblockingServer: aborting process.");
@@ -1478,7 +1443,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
- bzero((void*) &sp, sizeof(sp));
+ bzero((void*)&sp, sizeof(sp));
int policy = SCHED_OTHER;
// If desired, set up high-priority sched params structure.
@@ -1487,16 +1452,14 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
policy = SCHED_FIFO;
// The priority only compares us to other SCHED_FIFO threads, so we
// just pick a random priority halfway between min & max.
- const int priority = (sched_get_priority_max(policy) +
- sched_get_priority_min(policy)) / 2;
+ const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
sp.sched_priority = priority;
}
// Actually set the sched params for the current thread.
if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
- GlobalOutput.printf(
- "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
@@ -1509,8 +1472,7 @@ void TNonblockingIOThread::run() {
if (eventBase_ == NULL)
registerEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
- number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
@@ -1526,8 +1488,7 @@ void TNonblockingIOThread::run() {
// cleans up our registered events
cleanupEvents();
- GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
- number_);
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
void TNonblockingIOThread::cleanupEvents() {
@@ -1541,7 +1502,6 @@ void TNonblockingIOThread::cleanupEvents() {
event_del(&notificationEvent_);
}
-
void TNonblockingIOThread::stop() {
// This should cause the thread to fall out of its event loop ASAP.
breakLoop(false);
@@ -1555,10 +1515,11 @@ void TNonblockingIOThread::join() {
// Note that it is safe to both join() ourselves twice, as well as join
// the current thread as the pthread implementation checks for deadlock.
thread_->join();
- } catch(...) {
+ } catch (...) {
// swallow everything
}
}
}
-
-}}} // apache::thrift::server
+}
+}
+} // apache::thrift::server