summaryrefslogtreecommitdiff
path: root/lib/cpp/src/server/TNonblockingServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/server/TNonblockingServer.cpp')
-rw-r--r--lib/cpp/src/server/TNonblockingServer.cpp750
1 files changed, 750 insertions, 0 deletions
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
new file mode 100644
index 000000000..45f635cbe
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TNonblockingServer.h"
+#include <concurrency/Exception.h>
+
+#include <iostream>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+
+class TConnection::Task: public Runnable {
+ public:
+ Task(boost::shared_ptr<TProcessor> processor,
+ boost::shared_ptr<TProtocol> input,
+ boost::shared_ptr<TProtocol> output,
+ int taskHandle) :
+ processor_(processor),
+ input_(input),
+ output_(output),
+ taskHandle_(taskHandle) {}
+
+ void run() {
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ cerr << "TNonblockingServer client died: " << ttx.what() << endl;
+ } catch (TException& x) {
+ cerr << "TNonblockingServer exception: " << x.what() << endl;
+ } catch (...) {
+ cerr << "TNonblockingServer uncaught exception." << endl;
+ }
+
+ // Signal completion back to the libevent thread via a socketpair
+ int8_t b = 0;
+ if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
+ }
+ if (-1 == ::close(taskHandle_)) {
+ GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
+ }
+ }
+
+ private:
+ boost::shared_ptr<TProcessor> processor_;
+ boost::shared_ptr<TProtocol> input_;
+ boost::shared_ptr<TProtocol> output_;
+ int taskHandle_;
+};
+
+void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+ socket_ = socket;
+ server_ = s;
+ appState_ = APP_INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = NULL;
+ writeBufferSize_ = 0;
+ writeBufferPos_ = 0;
+
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_INIT;
+
+ taskHandle_ = -1;
+
+ // Set flags, which also registers the event
+ setFlags(eventFlags);
+
+ // get input/transports
+ factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+
+ // Create protocol
+ inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+}
+
+void TConnection::workSocket() {
+ int flags=0, got=0, left=0, sent=0;
+ uint32_t fetch = 0;
+
+ switch (socketState_) {
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ while (readWant_ > readBufferSize_) {
+ readBufferSize_ *= 2;
+ }
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::workSocket() realloc");
+ close();
+ return;
+ }
+ }
+
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
+
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+ return;
+ } else if (got == -1) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+
+ if (errno != ECONNRESET) {
+ GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
+ }
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return;
+
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ GlobalOutput("WARNING: Send state with no data to send\n");
+ transition();
+ return;
+ }
+
+ flags = 0;
+ #ifdef MSG_NOSIGNAL
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ flags |= MSG_NOSIGNAL;
+ #endif // ifdef MSG_NOSIGNAL
+
+ left = writeBufferSize_ - writeBufferPos_;
+ sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
+
+ if (sent <= 0) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+ if (errno != EPIPE) {
+ GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
+ }
+ close();
+ return;
+ }
+
+ writeBufferPos_ += sent;
+
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
+
+ return;
+
+ default:
+ GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
+ assert(0);
+ }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TConnection::transition() {
+
+ int sz = 0;
+
+ // Switch upon the state that we are currently in and move to a new state
+ switch (appState_) {
+
+ case APP_READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ // If we've used these transport buffers enough times, reset them to avoid bloating
+
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ ++numReadsSinceReset_;
+ if (numWritesSinceReset_ < 512) {
+ outputTransport_->resetBuffer();
+ } else {
+ // reset the capacity of the output transport if we used it enough times that it might be bloated
+ try {
+ outputTransport_->resetBuffer(true);
+ numWritesSinceReset_ = 0;
+ } catch (TTransportException &ttx) {
+ GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
+ close();
+ return;
+ }
+ }
+
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ outputTransport_->getWritePtr(4);
+ outputTransport_->wroteBytes(4);
+
+ if (server_->isThreadPoolProcessing()) {
+ // We are setting up a Task to do this work and we will wait on it
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TConnection::socketpair() failed ", errno);
+ // Now we will fall through to the APP_WAIT_TASK block with no response
+ } else {
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ sv[1]));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
+
+ // Create an event to be notified when the task finishes
+ event_set(&taskEvent_,
+ taskHandle_ = sv[0],
+ EV_READ,
+ TConnection::taskHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(server_->getEventBase(), &taskEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&taskEvent_, 0)) {
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+ try {
+ server_->addTask(task);
+ } catch (IllegalStateException & ise) {
+ // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+ GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+ close();
+ }
+
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
+ }
+ } else {
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
+ } catch (TTransportException &ttx) {
+ GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+ close();
+ return;
+ } catch (TException &x) {
+ GlobalOutput.printf("TException: Server::process() %s", x.what());
+ close();
+ return;
+ } catch (...) {
+ GlobalOutput.printf("Server::process() unknown exception");
+ close();
+ return;
+ }
+ }
+
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
+
+ case APP_WAIT_TASK:
+ // We have now finished processing a task and the result has been written
+ // into the outputTransport_, so we grab its contents and place them into
+ // the writeBuffer_ for actual writing by the libevent thread
+
+ // Get the result of the operation
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+ // If the function call generated return data, then move into the send
+ // state and get going
+ // 4 bytes were reserved for frame size
+ if (writeBufferSize_ > 4) {
+
+ // Move into write state
+ writeBufferPos_ = 0;
+ socketState_ = SOCKET_SEND;
+
+ // Put the frame size into the write buffer
+ int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+ memcpy(writeBuffer_, &frameSize, 4);
+
+ // Socket into write mode
+ appState_ = APP_SEND_RESULT;
+ setWrite();
+
+ // Try to work the socket immediately
+ // workSocket();
+
+ return;
+ }
+
+ // In this case, the request was oneway and we should fall through
+ // right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_RESULT:
+
+ ++numWritesSinceReset_;
+
+ // N.B.: We also intentionally fall through here into the INIT state!
+
+ LABEL_APP_INIT:
+ case APP_INIT:
+
+ // reset the input buffer if we used it enough times that it might be bloated
+ if (numReadsSinceReset_ > 512)
+ {
+ void * new_buffer = std::realloc(readBuffer_, 1024);
+ if (new_buffer == NULL) {
+ GlobalOutput("TConnection::transition() realloc");
+ close();
+ return;
+ }
+ readBuffer_ = (uint8_t*) new_buffer;
+ readBufferSize_ = 1024;
+ numReadsSinceReset_ = 0;
+ }
+
+ // Clear write buffer variables
+ writeBuffer_ = NULL;
+ writeBufferPos_ = 0;
+ writeBufferSize_ = 0;
+
+ // Set up read buffer for getting 4 bytes
+ readBufferPos_ = 0;
+ readWant_ = 4;
+
+ // Into read4 state we go
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_READ_FRAME_SIZE;
+
+ // Register read event
+ setRead();
+
+ // Try to work the socket right away
+ // workSocket();
+
+ return;
+
+ case APP_READ_FRAME_SIZE:
+ // We just read the request length, deserialize it
+ sz = *(int32_t*)readBuffer_;
+ sz = (int32_t)ntohl(sz);
+
+ if (sz <= 0) {
+ GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
+ close();
+ return;
+ }
+
+ // Reset the read buffer
+ readWant_ = (uint32_t)sz;
+ readBufferPos_= 0;
+
+ // Move into read request state
+ appState_ = APP_READ_REQUEST;
+
+ // Work the socket right away
+ // workSocket();
+
+ return;
+
+ default:
+ GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
+ assert(0);
+ }
+}
+
+void TConnection::setFlags(short eventFlags) {
+ // Catch the do nothing case
+ if (eventFlags_ == eventFlags) {
+ return;
+ }
+
+ // Delete a previously existing event
+ if (eventFlags_ != 0) {
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::setFlags event_del");
+ return;
+ }
+ }
+
+ // Update in memory structure
+ eventFlags_ = eventFlags;
+
+ // Do not call event_set if there are no flags
+ if (!eventFlags_) {
+ return;
+ }
+
+ /**
+ * event_set:
+ *
+ * Prepares the event structure &event to be used in future calls to
+ * event_add() and event_del(). The event will be prepared to call the
+ * eventHandler using the 'sock' file descriptor to monitor events.
+ *
+ * The events can be either EV_READ, EV_WRITE, or both, indicating
+ * that an application can read or write from the file respectively without
+ * blocking.
+ *
+ * The eventHandler will be called with the file descriptor that triggered
+ * the event and the type of event which will be one of: EV_TIMEOUT,
+ * EV_SIGNAL, EV_READ, EV_WRITE.
+ *
+ * The additional flag EV_PERSIST makes an event_add() persistent until
+ * event_del() has been called.
+ *
+ * Once initialized, the &event struct can be used repeatedly with
+ * event_add() and event_del() and does not need to be reinitialized unless
+ * the eventHandler and/or the argument to it are to be changed. However,
+ * when an ev structure has been added to libevent using event_add() the
+ * structure must persist until the event occurs (assuming EV_PERSIST
+ * is not set) or is removed using event_del(). You may not reuse the same
+ * ev structure for multiple monitored descriptors; each descriptor needs
+ * its own ev.
+ */
+ event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
+ event_base_set(server_->getEventBase(), &event_);
+
+ // Add the event
+ if (event_add(&event_, 0) == -1) {
+ GlobalOutput("TConnection::setFlags(): could not event_add");
+ }
+}
+
+/**
+ * Closes a connection
+ */
+void TConnection::close() {
+ // Delete the registered libevent
+ if (event_del(&event_) == -1) {
+ GlobalOutput("TConnection::close() event_del");
+ }
+
+ // Close the socket
+ if (socket_ > 0) {
+ ::close(socket_);
+ }
+ socket_ = 0;
+
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ factoryOutputTransport_->close();
+
+ // Give this object back to the server that owns it
+ server_->returnConnection(this);
+}
+
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+ if (readBufferSize_ > limit) {
+ readBufferSize_ = limit;
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+ close();
+ }
+ }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TConnection* TNonblockingServer::createConnection(int socket, short flags) {
+ // Check the stack
+ if (connectionStack_.empty()) {
+ return new TConnection(socket, flags, this);
+ } else {
+ TConnection* result = connectionStack_.top();
+ connectionStack_.pop();
+ result->init(socket, flags, this);
+ return result;
+ }
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ } else {
+ connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connectionStack_.push(connection);
+ }
+}
+
+/**
+ * Server socket had something happen. We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+ // Make sure that libevent didn't fuck up the socket handles
+ assert(fd == serverSocket_);
+
+ // Server socket accepted a new connection
+ socklen_t addrLen;
+ struct sockaddr addr;
+ addrLen = sizeof(addr);
+
+ // Going to accept a new client socket
+ int clientSocket;
+
+ // Accept as many new clients as possible, even though libevent signaled only
+ // one, this helps us to avoid having to go back into the libevent engine so
+ // many times
+ while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
+
+ // Explicitly set this socket to NONBLOCK mode
+ int flags;
+ if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+ fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+ GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
+ close(clientSocket);
+ return;
+ }
+
+ // Create a new TConnection for this client socket.
+ TConnection* clientConnection =
+ createConnection(clientSocket, EV_READ | EV_PERSIST);
+
+ // Fail fast if we could not create a TConnection object
+ if (clientConnection == NULL) {
+ GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+ close(clientSocket);
+ return;
+ }
+
+ // Put this client connection into the proper state
+ clientConnection->transition();
+ }
+
+ // Done looping accept, now we have to make sure the error is due to
+ // blocking. Any other error is a problem
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+ }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::listenSocket() {
+ int s;
+ struct addrinfo hints, *res, *res0;
+ int error;
+
+ char port[sizeof("65536") + 1];
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
+ GlobalOutput(errStr.c_str());
+ return;
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ // Create the server socket
+ s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (s == -1) {
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() socket() -1");
+ }
+
+ #ifdef IPV6_V6ONLY
+ int zero = 0;
+ if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+ GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+
+ int one = 1;
+
+ // Set reuseaddr to avoid 2MSL delay on server restart
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+
+ if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+ close(s);
+ freeaddrinfo(res0);
+ throw TException("TNonblockingServer::serve() bind");
+ }
+
+ // Done with the addr info
+ freeaddrinfo(res0);
+
+ // Set up this file descriptor for listening
+ listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(int s) {
+ // Set socket to nonblocking mode
+ int flags;
+ if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
+ fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
+ close(s);
+ throw TException("TNonblockingServer::serve() O_NONBLOCK");
+ }
+
+ int one = 1;
+ struct linger ling = {0, 0};
+
+ // Keepalive to ensure full result flushing
+ setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+
+ // Turn linger off to avoid hung sockets
+ setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
+ setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+ #endif
+
+ if (listen(s, LISTEN_BACKLOG) == -1) {
+ close(s);
+ throw TException("TNonblockingServer::serve() listen");
+ }
+
+ // Cool, this socket is good to go, set it as the serverSocket_
+ serverSocket_ = s;
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingServer::registerEvents(event_base* base) {
+ assert(serverSocket_ != -1);
+ assert(!eventBase_);
+ eventBase_ = base;
+
+ // Print some libevent stats
+ GlobalOutput.printf("libevent %s method %s",
+ event_get_version(),
+ event_get_method());
+
+ // Register the server event
+ event_set(&serverEvent_,
+ serverSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingServer::eventHandler,
+ this);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): coult not event_add");
+ }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+ // Init socket
+ listenSocket();
+
+ // Initialize libevent core
+ registerEvents(static_cast<event_base*>(event_init()));
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+}
+
+}}} // apache::thrift::server