diff options
Diffstat (limited to 'lib/cpp/src/thrift/server/TThreadedServer.cpp')
-rw-r--r-- | lib/cpp/src/thrift/server/TThreadedServer.cpp | 100 |
1 files changed, 55 insertions, 45 deletions
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index 909c3cee8..929ed8610 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -28,7 +28,9 @@ #include <unistd.h> #endif -namespace apache { namespace thrift { namespace server { +namespace apache { +namespace thrift { +namespace server { using boost::shared_ptr; using namespace std; @@ -37,27 +39,24 @@ using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; -class TThreadedServer::Task: public Runnable { +class TThreadedServer::Task : public Runnable { public: - Task(TThreadedServer& server, shared_ptr<TProcessor> processor, shared_ptr<TProtocol> input, shared_ptr<TProtocol> output, - shared_ptr<TTransport> transport) : - server_(server), - processor_(processor), - input_(input), - output_(output), - transport_(transport) { - } + shared_ptr<TTransport> transport) + : server_(server), + processor_(processor), + input_(input), + output_(output), + transport_(transport) {} ~Task() {} void run() { - boost::shared_ptr<TServerEventHandler> eventHandler = - server_.getEventHandler(); + boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler(); void* connectionContext = NULL; if (eventHandler) { connectionContext = eventHandler->createContext(input_, output_); @@ -67,8 +66,8 @@ public: if (eventHandler) { eventHandler->processContext(connectionContext, transport_); } - if (!processor_->process(input_, output_, connectionContext) || - !input_->getTransport()->peek()) { + if (!processor_->process(input_, output_, connectionContext) + || !input_->getTransport()->peek()) { break; } } @@ -77,9 +76,8 @@ public: string errStr = string("TThreadedServer client died: ") + ttx.what(); GlobalOutput(errStr.c_str()); } - } catch (const std::exception &x) { - GlobalOutput.printf("TThreadedServer exception: %s: %s", - typeid(x).name(), x.what()); + } catch (const std::exception& x) { + GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what()); } catch (...) { GlobalOutput("TThreadedServer uncaught exception."); } @@ -108,10 +106,9 @@ public: server_.tasksMonitor_.notify(); } } - } - private: +private: TThreadedServer& server_; friend class TThreadedServer; @@ -129,7 +126,8 @@ void TThreadedServer::init() { } } -TThreadedServer::~TThreadedServer() {} +TThreadedServer::~TThreadedServer() { +} void TThreadedServer::serve() { @@ -164,22 +162,16 @@ void TThreadedServer::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - shared_ptr<TProcessor> processor = getProcessor(inputProtocol, - outputProtocol, client); + shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client); - TThreadedServer::Task* task = new TThreadedServer::Task(*this, - processor, - inputProtocol, - outputProtocol, - client); + TThreadedServer::Task* task + = new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client); // Create a task - shared_ptr<Runnable> runnable = - shared_ptr<Runnable>(task); + shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task); // Create a thread for this task - shared_ptr<Thread> thread = - shared_ptr<Thread>(threadFactory_->newThread(runnable)); + shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable)); // Insert thread into the set of threads { @@ -191,25 +183,43 @@ void TThreadedServer::serve() { thread->start(); } catch (TTransportException& ttx) { - if (inputTransport) { inputTransport->close(); } - if (outputTransport) { outputTransport->close(); } - if (client) { client->close(); } + if (inputTransport) { + inputTransport->close(); + } + if (outputTransport) { + outputTransport->close(); + } + if (client) { + client->close(); + } if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) { string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what(); GlobalOutput(errStr.c_str()); } continue; } catch (TException& tx) { - if (inputTransport) { inputTransport->close(); } - if (outputTransport) { outputTransport->close(); } - if (client) { client->close(); } + if (inputTransport) { + inputTransport->close(); + } + if (outputTransport) { + outputTransport->close(); + } + if (client) { + client->close(); + } string errStr = string("TThreadedServer: Caught TException: ") + tx.what(); GlobalOutput(errStr.c_str()); continue; } catch (string s) { - if (inputTransport) { inputTransport->close(); } - if (outputTransport) { outputTransport->close(); } - if (client) { client->close(); } + if (inputTransport) { + inputTransport->close(); + } + if (outputTransport) { + outputTransport->close(); + } + if (client) { + client->close(); + } string errStr = "TThreadedServer: Unknown exception: " + s; GlobalOutput(errStr.c_str()); break; @@ -220,7 +230,7 @@ void TThreadedServer::serve() { if (stop_) { try { serverTransport_->close(); - } catch (TException &tx) { + } catch (TException& tx) { string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what(); GlobalOutput(errStr.c_str()); } @@ -229,13 +239,13 @@ void TThreadedServer::serve() { while (!tasks_.empty()) { tasksMonitor_.wait(); } - } catch (TException &tx) { + } catch (TException& tx) { string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what(); GlobalOutput(errStr.c_str()); } stop_ = false; } - } - -}}} // apache::thrift::server +} +} +} // apache::thrift::server |