diff options
author | Konrad Grochowski <hcorg@apache.org> | 2014-11-13 15:33:38 +0100 |
---|---|---|
committer | Konrad Grochowski <hcorg@apache.org> | 2014-11-18 11:39:10 +0100 |
commit | 16a23a6618754a5a87aeb8df99a72516b0272fb3 (patch) | |
tree | 3d3a5250cc06e9010c0e0bef5eed4454a3c6be18 /lib/cpp/src/thrift/server/TNonblockingServer.cpp | |
parent | 240120c8434b49d1f76d207aff4e3530d3ada14b (diff) | |
download | thrift-16a23a6618754a5a87aeb8df99a72516b0272fb3.tar.gz |
THRIFT-2729: C++ - .clang-format created and applied
Client: C++
Patch: Konrad Grochowski
make style command added
Diffstat (limited to 'lib/cpp/src/thrift/server/TNonblockingServer.cpp')
-rw-r--r-- | lib/cpp/src/thrift/server/TNonblockingServer.cpp | 415 |
1 files changed, 188 insertions, 227 deletions
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index 86a96c673..1cfdef8fa 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -65,7 +65,9 @@ #define PRIu64 "I64u" #endif -namespace apache { namespace thrift { namespace server { +namespace apache { +namespace thrift { +namespace server { using namespace apache::thrift::protocol; using namespace apache::thrift::transport; @@ -76,11 +78,7 @@ using apache::thrift::transport::TTransportException; using boost::shared_ptr; /// Three states for sockets: recv frame size, recv data, and send mode -enum TSocketState { - SOCKET_RECV_FRAMING, - SOCKET_RECV, - SOCKET_SEND -}; +enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND }; /** * Five states for the nonblocking server: @@ -104,7 +102,7 @@ enum TAppState { * essentially encapsulates a socket that has some associated libevent state. */ class TNonblockingServer::TConnection { - private: +private: /// Server IO Thread handling this connection TNonblockingIOThread* ioThread_; @@ -176,22 +174,16 @@ class TNonblockingServer::TConnection { boost::shared_ptr<TServerEventHandler> serverEventHandler_; /// Thrift call context, if any - void *connectionContext_; + void* connectionContext_; /// Go into read mode - void setRead() { - setFlags(EV_READ | EV_PERSIST); - } + void setRead() { setFlags(EV_READ | EV_PERSIST); } /// Go into write mode - void setWrite() { - setFlags(EV_WRITE | EV_PERSIST); - } + void setWrite() { setFlags(EV_WRITE | EV_PERSIST); } /// Set socket idle - void setIdle() { - setFlags(0); - } + void setIdle() { setFlags(0); } /** * Set event flags for this connection. @@ -208,13 +200,14 @@ class TNonblockingServer::TConnection { */ void workSocket(); - public: - +public: class Task; /// Constructor - TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread, - const sockaddr* addr, socklen_t addrLen) { + TConnection(THRIFT_SOCKET socket, + TNonblockingIOThread* ioThread, + const sockaddr* addr, + socklen_t addrLen) { readBuffer_ = NULL; readBufferSize_ = 0; @@ -225,29 +218,29 @@ class TNonblockingServer::TConnection { // once per TConnection (they don't need to be reallocated on init() call) inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_.reset( - new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()))); + new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()))); tSocket_.reset(new TSocket()); init(socket, ioThread, addr, addrLen); } - ~TConnection() { - std::free(readBuffer_); - } + ~TConnection() { std::free(readBuffer_); } /// Close this connection and free or reset its resources. void close(); - /** - * Check buffers against any size limits and shrink it if exceeded. - * - * @param readLimit we reduce read buffer size to this (if nonzero). - * @param writeLimit if nonzero and write buffer is larger, replace it. - */ + /** + * Check buffers against any size limits and shrink it if exceeded. + * + * @param readLimit we reduce read buffer size to this (if nonzero). + * @param writeLimit if nonzero and write buffer is larger, replace it. + */ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); /// Initialize - void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread, - const sockaddr* addr, socklen_t addrLen); + void init(THRIFT_SOCKET socket, + TNonblockingIOThread* ioThread, + const sockaddr* addr, + socklen_t addrLen); /** * This is called when the application transitions from one state into @@ -278,17 +271,13 @@ class TNonblockingServer::TConnection { * * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR). */ - bool notifyIOThread() { - return ioThread_->notify(this); - } + bool notifyIOThread() { return ioThread_->notify(this); } /* * Returns the number of this connection's currently assigned IO * thread. */ - int getIOThreadNumber() const { - return ioThread_->getThreadNumber(); - } + int getIOThreadNumber() const { return ioThread_->getThreadNumber(); } /// Force connection shutdown for this connection. void forceClose() { @@ -299,44 +288,33 @@ class TNonblockingServer::TConnection { } /// return the server this connection was initialized for. - TNonblockingServer* getServer() const { - return server_; - } + TNonblockingServer* getServer() const { return server_; } /// get state of connection. - TAppState getState() const { - return appState_; - } + TAppState getState() const { return appState_; } /// return the TSocket transport wrapping this network connection - boost::shared_ptr<TSocket> getTSocket() const { - return tSocket_; - } + boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; } /// return the server event handler if any - boost::shared_ptr<TServerEventHandler> getServerEventHandler() { - return serverEventHandler_; - } + boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; } /// return the Thrift connection context if any - void* getConnectionContext() { - return connectionContext_; - } - + void* getConnectionContext() { return connectionContext_; } }; -class TNonblockingServer::TConnection::Task: public Runnable { - public: +class TNonblockingServer::TConnection::Task : public Runnable { +public: Task(boost::shared_ptr<TProcessor> processor, boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output, - TConnection* connection) : - processor_(processor), - input_(input), - output_(output), - connection_(connection), - serverEventHandler_(connection_->getServerEventHandler()), - connectionContext_(connection_->getConnectionContext()) {} + TConnection* connection) + : processor_(processor), + input_(input), + output_(output), + connection_(connection), + serverEventHandler_(connection_->getServerEventHandler()), + connectionContext_(connection_->getConnectionContext()) {} void run() { try { @@ -344,8 +322,8 @@ class TNonblockingServer::TConnection::Task: public Runnable { if (serverEventHandler_) { serverEventHandler_->processContext(connectionContext_, connection_->getTSocket()); } - if (!processor_->process(input_, output_, connectionContext_) || - !input_->getTransport()->peek()) { + if (!processor_->process(input_, output_, connectionContext_) + || !input_->getTransport()->peek()) { break; } } @@ -356,10 +334,10 @@ class TNonblockingServer::TConnection::Task: public Runnable { exit(1); } catch (const std::exception& x) { GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s", - typeid(x).name(), x.what()); + typeid(x).name(), + x.what()); } catch (...) { - GlobalOutput.printf( - "TNonblockingServer: unknown exception while processing."); + GlobalOutput.printf("TNonblockingServer: unknown exception while processing."); } // Signal completion back to the libevent thread via a pipe @@ -368,11 +346,9 @@ class TNonblockingServer::TConnection::Task: public Runnable { } } - TConnection* getTConnection() { - return connection_; - } + TConnection* getTConnection() { return connection_; } - private: +private: boost::shared_ptr<TProcessor> processor_; boost::shared_ptr<TProtocol> input_; boost::shared_ptr<TProtocol> output_; @@ -405,22 +381,17 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket, callsForResize_ = 0; // get input/transports - factoryInputTransport_ = server_->getInputTransportFactory()->getTransport( - inputTransport_); - factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport( - outputTransport_); + factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_); + factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_); // Create protocol - inputProtocol_ = server_->getInputProtocolFactory()->getProtocol( - factoryInputTransport_); - outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol( - factoryOutputTransport_); + inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); + outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); // Set up for any server event handler serverEventHandler_ = server_->getEventHandler(); if (serverEventHandler_) { - connectionContext_ = serverEventHandler_->createContext(inputProtocol_, - outputProtocol_); + connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_); } else { connectionContext_ = NULL; } @@ -430,7 +401,7 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket, } void TNonblockingServer::TConnection::workSocket() { - int got=0, left=0, sent=0; + int got = 0, left = 0, sent = 0; uint32_t fetch = 0; switch (socketState_) { @@ -470,12 +441,14 @@ void TNonblockingServer::TConnection::workSocket() { if (readWant_ > server_->getMaxFrameSize()) { // Don't allow giant frame sizes. This prevents bad clients from // causing us to try and allocate a giant buffer. - GlobalOutput.printf("TNonblockingServer: frame size too large " - "(%" PRIu32 " > %" PRIu64 ") from client %s. " - "Remote side not using TFramedTransport?", - readWant_, - (uint64_t)server_->getMaxFrameSize(), - tSocket_->getSocketInfo().c_str()); + GlobalOutput.printf( + "TNonblockingServer: frame size too large " + "(%" PRIu32 " > %" PRIu64 + ") from client %s. " + "Remote side not using TFramedTransport?", + readWant_, + (uint64_t)server_->getMaxFrameSize(), + tSocket_->getSocketInfo().c_str()); close(); return; } @@ -491,8 +464,7 @@ void TNonblockingServer::TConnection::workSocket() { // Read from the socket fetch = readWant_ - readBufferPos_; got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); - } - catch (TTransportException& te) { + } catch (TTransportException& te) { GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); close(); @@ -532,8 +504,7 @@ void TNonblockingServer::TConnection::workSocket() { try { left = writeBufferSize_ - writeBufferPos_; sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); - } - catch (TTransportException& te) { + } catch (TTransportException& te) { GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); close(); return; @@ -586,21 +557,18 @@ void TNonblockingServer::TConnection::transition() { // We are setting up a Task to do this work and we will wait on it // Create task and dispatch to the thread manager - boost::shared_ptr<Runnable> task = - boost::shared_ptr<Runnable>(new Task(processor_, - inputProtocol_, - outputProtocol_, - this)); + boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>( + new Task(processor_, inputProtocol_, outputProtocol_, this)); // The application is now waiting on the task to finish appState_ = APP_WAIT_TASK; - try { - server_->addTask(task); - } catch (IllegalStateException & ise) { - // The ThreadManager is not ready to handle any more tasks (it's probably shutting down). - GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what()); - close(); - } + try { + server_->addTask(task); + } catch (IllegalStateException& ise) { + // The ThreadManager is not ready to handle any more tasks (it's probably shutting down). + GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what()); + close(); + } // Set this connection idle so that libevent doesn't process more // data on it while we're still waiting for the threadmanager to @@ -610,21 +578,22 @@ void TNonblockingServer::TConnection::transition() { } else { try { if (serverEventHandler_) { - serverEventHandler_->processContext(connectionContext_, - getTSocket()); + serverEventHandler_->processContext(connectionContext_, getTSocket()); } // Invoke the processor - processor_->process(inputProtocol_, outputProtocol_, - connectionContext_); - } catch (const TTransportException &ttx) { - GlobalOutput.printf("TNonblockingServer transport error in " - "process(): %s", ttx.what()); + processor_->process(inputProtocol_, outputProtocol_, connectionContext_); + } catch (const TTransportException& ttx) { + GlobalOutput.printf( + "TNonblockingServer transport error in " + "process(): %s", + ttx.what()); server_->decrementActiveProcessors(); close(); return; - } catch (const std::exception &x) { + } catch (const std::exception& x) { GlobalOutput.printf("Server::process() uncaught exception: %s: %s", - typeid(x).name(), x.what()); + typeid(x).name(), + x.what()); server_->decrementActiveProcessors(); close(); return; @@ -636,8 +605,8 @@ void TNonblockingServer::TConnection::transition() { } } - // Intentionally fall through here, the call to process has written into - // the writeBuffer_ + // Intentionally fall through here, the call to process has written into + // the writeBuffer_ case APP_WAIT_TASK: // We have now finished processing a task and the result has been written @@ -687,7 +656,7 @@ void TNonblockingServer::TConnection::transition() { callsForResize_ = 0; } - // N.B.: We also intentionally fall through here into the INIT state! + // N.B.: We also intentionally fall through here into the INIT state! LABEL_APP_INIT: case APP_INIT: @@ -732,7 +701,7 @@ void TNonblockingServer::TConnection::transition() { readBufferSize_ = newSize; } - readBufferPos_= 0; + readBufferPos_ = 0; // Move into read request state socketState_ = SOCKET_RECV; @@ -803,8 +772,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { * ev structure for multiple monitored descriptors; each descriptor needs * its own ev. */ - event_set(&event_, tSocket_->getSocketFD(), eventFlags_, - TConnection::eventHandler, this); + event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this); event_base_set(ioThread_->getEventBase(), &event_); // Add the event @@ -841,9 +809,7 @@ void TNonblockingServer::TConnection::close() { server_->returnConnection(this); } -void TNonblockingServer::TConnection::checkIdleBufferMemLimit( - size_t readLimit, - size_t writeLimit) { +void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) { if (readLimit > 0 && readBufferSize_ > readLimit) { free(readBuffer_); readBuffer_ = NULL; @@ -860,7 +826,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit( TNonblockingServer::~TNonblockingServer() { // Close any active connections (moves them to the idle connection stack) while (activeConnections_.size()) { - activeConnections_.front()->close(); + activeConnections_.front()->close(); } // Clean up unused TConnection objects in connectionStack_ while (!connectionStack_.empty()) { @@ -872,9 +838,9 @@ TNonblockingServer::~TNonblockingServer() { // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread // objects (as runnable) so these objects will never deallocate without help. while (!ioThreads_.empty()) { - boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back(); - ioThreads_.pop_back(); - iot->setThread(boost::shared_ptr<Thread>()); + boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back(); + ioThreads_.pop_back(); + iot->setThread(boost::shared_ptr<Thread>()); } } @@ -882,8 +848,9 @@ TNonblockingServer::~TNonblockingServer() { * Creates a new connection either by reusing an object off the stack or * by allocating a new one entirely */ -TNonblockingServer::TConnection* TNonblockingServer::createConnection( - THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) { +TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket, + const sockaddr* addr, + socklen_t addrLen) { // Check the stack Guard g(connMutex_); @@ -914,10 +881,12 @@ TNonblockingServer::TConnection* TNonblockingServer::createConnection( void TNonblockingServer::returnConnection(TConnection* connection) { Guard g(connMutex_); - activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end()); + activeConnections_.erase(std::remove(activeConnections_.begin(), + activeConnections_.end(), + connection), + activeConnections_.end()); - if (connectionStackLimit_ && - (connectionStack_.size() >= connectionStackLimit_)) { + if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) { delete connection; --numTConnections_; } else { @@ -931,7 +900,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) { * connections on fd and assign TConnection objects to handle those requests. */ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { - (void) which; + (void)which; // Make sure that libevent didn't mess up the socket handles assert(fd == serverSocket_); @@ -967,16 +936,16 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { // Explicitly set this socket to NONBLOCK mode int flags; - if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 || - THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { - GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR); + if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 + || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { + GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", + THRIFT_GET_SOCKET_ERROR); ::THRIFT_CLOSESOCKET(clientSocket); return; } // Create a new TConnection for this client socket. - TConnection* clientConnection = - createConnection(clientSocket, addrp, addrLen); + TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen); // Fail fast if we could not create a TConnection object if (clientConnection == NULL) { @@ -1007,7 +976,6 @@ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { addrLen = sizeof(addrStorage); } - // Done looping accept, now we have to make sure the error is due to // blocking. Any other error is a problem if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) { @@ -1034,8 +1002,8 @@ void TNonblockingServer::createAndListenOnSocket() { // Wildcard address error = getaddrinfo(NULL, port, &hints, &res0); if (error) { - throw TException("TNonblockingServer::serve() getaddrinfo " + - string(THRIFT_GAI_STRERROR(error))); + throw TException("TNonblockingServer::serve() getaddrinfo " + + string(THRIFT_GAI_STRERROR(error))); } // Pick the ipv6 address first since ipv4 addresses can be mapped @@ -1052,15 +1020,14 @@ void TNonblockingServer::createAndListenOnSocket() { throw TException("TNonblockingServer::serve() socket() -1"); } - #ifdef IPV6_V6ONLY +#ifdef IPV6_V6ONLY if (res->ai_family == AF_INET6) { int zero = 0; if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) { GlobalOutput("TServerSocket::listen() IPV6_V6ONLY"); } } - #endif // #ifdef IPV6_V6ONLY - +#endif // #ifdef IPV6_V6ONLY int one = 1; @@ -1089,8 +1056,8 @@ void TNonblockingServer::createAndListenOnSocket() { void TNonblockingServer::listenSocket(THRIFT_SOCKET s) { // Set socket to nonblocking mode int flags; - if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 || - THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { + if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 + || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { ::THRIFT_CLOSESOCKET(s); throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK"); } @@ -1104,17 +1071,17 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) { // Turn linger off to avoid hung sockets setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling)); - // Set TCP nodelay if available, MAC OS X Hack - // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html - #ifndef TCP_NOPUSH +// Set TCP nodelay if available, MAC OS X Hack +// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html +#ifndef TCP_NOPUSH setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one)); - #endif +#endif - #ifdef TCP_LOW_MIN_RTO +#ifdef TCP_LOW_MIN_RTO if (TSocket::getUseLowMinRto()) { setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one)); } - #endif +#endif if (listen(s, LISTEN_BACKLOG) == -1) { ::THRIFT_CLOSESOCKET(s); @@ -1128,28 +1095,31 @@ void TNonblockingServer::listenSocket(THRIFT_SOCKET s) { void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) { threadManager_ = threadManager; if (threadManager) { - threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1)); + threadManager->setExpireCallback( + apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, + this, + apache::thrift::stdcxx::placeholders::_1)); threadPoolProcessing_ = true; } else { threadPoolProcessing_ = false; } } -bool TNonblockingServer::serverOverloaded() { +bool TNonblockingServer::serverOverloaded() { size_t activeConnections = numTConnections_ - connectionStack_.size(); - if (numActiveProcessors_ > maxActiveProcessors_ || - activeConnections > maxConnections_) { + if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) { if (!overloaded_) { - GlobalOutput.printf("TNonblockingServer: overload condition begun."); + GlobalOutput.printf("TNonblockingServer: overload condition begun."); overloaded_ = true; } } else { - if (overloaded_ && - (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) && - (activeConnections <= overloadHysteresis_ * maxConnections_)) { - GlobalOutput.printf("TNonblockingServer: overload ended; " - "%u dropped (%llu total)", - nConnectionsDropped_, nTotalConnectionsDropped_); + if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) + && (activeConnections <= overloadHysteresis_ * maxConnections_)) { + GlobalOutput.printf( + "TNonblockingServer: overload ended; " + "%u dropped (%llu total)", + nConnectionsDropped_, + nTotalConnectionsDropped_); nConnectionsDropped_ = 0; overloaded_ = false; } @@ -1162,10 +1132,8 @@ bool TNonblockingServer::drainPendingTask() { if (threadManager_) { boost::shared_ptr<Runnable> task = threadManager_->removeNextPending(); if (task) { - TConnection* connection = - static_cast<TConnection::Task*>(task.get())->getTConnection(); - assert(connection && connection->getServer() - && connection->getState() == APP_WAIT_TASK); + TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection(); + assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); connection->forceClose(); return true; } @@ -1174,10 +1142,8 @@ bool TNonblockingServer::drainPendingTask() { } void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) { - TConnection* connection = - static_cast<TConnection::Task*>(task.get())->getTConnection(); - assert(connection && connection->getServer() && - connection->getState() == APP_WAIT_TASK); + TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection(); + assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); connection->forceClose(); } @@ -1206,7 +1172,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) { THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET); shared_ptr<TNonblockingIOThread> thread( - new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); + new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); ioThreads_.push_back(thread); } @@ -1221,18 +1187,19 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) { assert(ioThreads_.size() > 0); GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.", - port_, ioThreads_.size()); + port_, + ioThreads_.size()); // Launch all the secondary IO threads in separate threads if (ioThreads_.size() > 1) { ioThreadFactory_.reset(new PlatformThreadFactory( #if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD) - PlatformThreadFactory::OTHER, // scheduler - PlatformThreadFactory::NORMAL, // priority - 1, // stack size (MB) + PlatformThreadFactory::OTHER, // scheduler + PlatformThreadFactory::NORMAL, // priority + 1, // stack size (MB) #endif - false // detached - )); + false // detached + )); assert(ioThreadFactory_.get()); @@ -1271,12 +1238,12 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, int number, THRIFT_SOCKET listenSocket, bool useHighPriority) - : server_(server) - , number_(number) - , listenSocket_(listenSocket) - , useHighPriority_(useHighPriority) - , eventBase_(NULL) - , ownEventBase_(false) { + : server_(server), + number_(number), + listenSocket_(listenSocket), + useHighPriority_(useHighPriority), + eventBase_(NULL), + ownEventBase_(false) { notificationPipeFDs_[0] = -1; notificationPipeFDs_[1] = -1; } @@ -1292,8 +1259,7 @@ TNonblockingIOThread::~TNonblockingIOThread() { if (listenSocket_ >= 0) { if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) { - GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", - THRIFT_GET_SOCKET_ERROR); + GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR); } listenSocket_ = THRIFT_INVALID_SOCKET; } @@ -1310,12 +1276,12 @@ TNonblockingIOThread::~TNonblockingIOThread() { } void TNonblockingIOThread::createNotificationPipe() { - if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { + if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); throw TException("can't create notification pipe"); } - if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 || - evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) { + if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0 + || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) { ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK"); @@ -1323,15 +1289,16 @@ void TNonblockingIOThread::createNotificationPipe() { for (int i = 0; i < 2; ++i) { #if LIBEVENT_VERSION_NUMBER < 0x02000000 int flags; - if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 || - THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { + if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 + || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { #else if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) { #endif ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); - throw TException("TNonblockingServer::createNotificationPipe() " - "FD_CLOEXEC"); + throw TException( + "TNonblockingServer::createNotificationPipe() " + "FD_CLOEXEC"); } } } @@ -1352,8 +1319,8 @@ void TNonblockingIOThread::registerEvents() { // Print some libevent stats if (number_ == 0) { GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", - event_get_version(), - event_base_get_method(eventBase_)); + event_get_version(), + event_base_get_method(eventBase_)); } if (listenSocket_ >= 0) { @@ -1367,11 +1334,11 @@ void TNonblockingIOThread::registerEvents() { // Add the event and start up the server if (-1 == event_add(&serverEvent_, 0)) { - throw TException("TNonblockingServer::serve(): " - "event_add() failed on server listen event"); + throw TException( + "TNonblockingServer::serve(): " + "event_add() failed on server listen event"); } - GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", - number_); + GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_); } createNotificationPipe(); @@ -1388,11 +1355,11 @@ void TNonblockingIOThread::registerEvents() { // Add the event and start up the server if (-1 == event_add(¬ificationEvent_, 0)) { - throw TException("TNonblockingServer::serve(): " - "event_add() failed on task-done notification event"); + throw TException( + "TNonblockingServer::serve(): " + "event_add() failed on task-done notification event"); } - GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", - number_); + GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_); } bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { @@ -1411,7 +1378,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { /* static */ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { - TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v; + TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v; assert(ioThread); (void)which; @@ -1427,8 +1394,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* connection->transition(); } else if (nBytes > 0) { // throw away these bytes and hope that next time we get a solid read - GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", - nBytes, kSize); + GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize); ioThread->breakLoop(true); return; } else if (nBytes == 0) { @@ -1436,11 +1402,11 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* // exit the loop break; } else { // nBytes < 0 - if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) { - GlobalOutput.perror( - "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR); - ioThread->breakLoop(true); - return; + if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK + && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) { + GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR); + ioThread->breakLoop(true); + return; } // exit the loop break; @@ -1450,8 +1416,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* void TNonblockingIOThread::breakLoop(bool error) { if (error) { - GlobalOutput.printf( - "TNonblockingServer: IO thread #%d exiting with error.", number_); + GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_); // TODO: figure out something better to do here, but for now kill the // whole process. GlobalOutput.printf("TNonblockingServer: aborting process."); @@ -1478,7 +1443,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { #ifdef HAVE_SCHED_H // Start out with a standard, low-priority setup for the sched params. struct sched_param sp; - bzero((void*) &sp, sizeof(sp)); + bzero((void*)&sp, sizeof(sp)); int policy = SCHED_OTHER; // If desired, set up high-priority sched params structure. @@ -1487,16 +1452,14 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { policy = SCHED_FIFO; // The priority only compares us to other SCHED_FIFO threads, so we // just pick a random priority halfway between min & max. - const int priority = (sched_get_priority_max(policy) + - sched_get_priority_min(policy)) / 2; + const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2; sp.sched_priority = priority; } // Actually set the sched params for the current thread. if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) { - GlobalOutput.printf( - "TNonblocking: IO Thread #%d using high-priority scheduler!", number_); + GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_); } else { GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR); } @@ -1509,8 +1472,7 @@ void TNonblockingIOThread::run() { if (eventBase_ == NULL) registerEvents(); - GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", - number_); + GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); if (useHighPriority_) { setCurrentThreadHighPriority(true); @@ -1526,8 +1488,7 @@ void TNonblockingIOThread::run() { // cleans up our registered events cleanupEvents(); - GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", - number_); + GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_); } void TNonblockingIOThread::cleanupEvents() { @@ -1541,7 +1502,6 @@ void TNonblockingIOThread::cleanupEvents() { event_del(¬ificationEvent_); } - void TNonblockingIOThread::stop() { // This should cause the thread to fall out of its event loop ASAP. breakLoop(false); @@ -1555,10 +1515,11 @@ void TNonblockingIOThread::join() { // Note that it is safe to both join() ourselves twice, as well as join // the current thread as the pthread implementation checks for deadlock. thread_->join(); - } catch(...) { + } catch (...) { // swallow everything } } } - -}}} // apache::thrift::server +} +} +} // apache::thrift::server |