summaryrefslogtreecommitdiff
path: root/lib/cpp/src/thrift/server/TThreadedServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/thrift/server/TThreadedServer.cpp')
-rw-r--r--lib/cpp/src/thrift/server/TThreadedServer.cpp100
1 files changed, 55 insertions, 45 deletions
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 909c3cee8..929ed8610 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -28,7 +28,9 @@
#include <unistd.h>
#endif
-namespace apache { namespace thrift { namespace server {
+namespace apache {
+namespace thrift {
+namespace server {
using boost::shared_ptr;
using namespace std;
@@ -37,27 +39,24 @@ using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::concurrency;
-class TThreadedServer::Task: public Runnable {
+class TThreadedServer::Task : public Runnable {
public:
-
Task(TThreadedServer& server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
shared_ptr<TProtocol> output,
- shared_ptr<TTransport> transport) :
- server_(server),
- processor_(processor),
- input_(input),
- output_(output),
- transport_(transport) {
- }
+ shared_ptr<TTransport> transport)
+ : server_(server),
+ processor_(processor),
+ input_(input),
+ output_(output),
+ transport_(transport) {}
~Task() {}
void run() {
- boost::shared_ptr<TServerEventHandler> eventHandler =
- server_.getEventHandler();
+ boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
void* connectionContext = NULL;
if (eventHandler) {
connectionContext = eventHandler->createContext(input_, output_);
@@ -67,8 +66,8 @@ public:
if (eventHandler) {
eventHandler->processContext(connectionContext, transport_);
}
- if (!processor_->process(input_, output_, connectionContext) ||
- !input_->getTransport()->peek()) {
+ if (!processor_->process(input_, output_, connectionContext)
+ || !input_->getTransport()->peek()) {
break;
}
}
@@ -77,9 +76,8 @@ public:
string errStr = string("TThreadedServer client died: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
- } catch (const std::exception &x) {
- GlobalOutput.printf("TThreadedServer exception: %s: %s",
- typeid(x).name(), x.what());
+ } catch (const std::exception& x) {
+ GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what());
} catch (...) {
GlobalOutput("TThreadedServer uncaught exception.");
}
@@ -108,10 +106,9 @@ public:
server_.tasksMonitor_.notify();
}
}
-
}
- private:
+private:
TThreadedServer& server_;
friend class TThreadedServer;
@@ -129,7 +126,8 @@ void TThreadedServer::init() {
}
}
-TThreadedServer::~TThreadedServer() {}
+TThreadedServer::~TThreadedServer() {
+}
void TThreadedServer::serve() {
@@ -164,22 +162,16 @@ void TThreadedServer::serve() {
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
- outputProtocol, client);
+ shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
- TThreadedServer::Task* task = new TThreadedServer::Task(*this,
- processor,
- inputProtocol,
- outputProtocol,
- client);
+ TThreadedServer::Task* task
+ = new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client);
// Create a task
- shared_ptr<Runnable> runnable =
- shared_ptr<Runnable>(task);
+ shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
// Create a thread for this task
- shared_ptr<Thread> thread =
- shared_ptr<Thread>(threadFactory_->newThread(runnable));
+ shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
// Insert thread into the set of threads
{
@@ -191,25 +183,43 @@ void TThreadedServer::serve() {
thread->start();
} catch (TTransportException& ttx) {
- if (inputTransport) { inputTransport->close(); }
- if (outputTransport) { outputTransport->close(); }
- if (client) { client->close(); }
+ if (inputTransport) {
+ inputTransport->close();
+ }
+ if (outputTransport) {
+ outputTransport->close();
+ }
+ if (client) {
+ client->close();
+ }
if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
GlobalOutput(errStr.c_str());
}
continue;
} catch (TException& tx) {
- if (inputTransport) { inputTransport->close(); }
- if (outputTransport) { outputTransport->close(); }
- if (client) { client->close(); }
+ if (inputTransport) {
+ inputTransport->close();
+ }
+ if (outputTransport) {
+ outputTransport->close();
+ }
+ if (client) {
+ client->close();
+ }
string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
GlobalOutput(errStr.c_str());
continue;
} catch (string s) {
- if (inputTransport) { inputTransport->close(); }
- if (outputTransport) { outputTransport->close(); }
- if (client) { client->close(); }
+ if (inputTransport) {
+ inputTransport->close();
+ }
+ if (outputTransport) {
+ outputTransport->close();
+ }
+ if (client) {
+ client->close();
+ }
string errStr = "TThreadedServer: Unknown exception: " + s;
GlobalOutput(errStr.c_str());
break;
@@ -220,7 +230,7 @@ void TThreadedServer::serve() {
if (stop_) {
try {
serverTransport_->close();
- } catch (TException &tx) {
+ } catch (TException& tx) {
string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what();
GlobalOutput(errStr.c_str());
}
@@ -229,13 +239,13 @@ void TThreadedServer::serve() {
while (!tasks_.empty()) {
tasksMonitor_.wait();
}
- } catch (TException &tx) {
+ } catch (TException& tx) {
string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
GlobalOutput(errStr.c_str());
}
stop_ = false;
}
-
}
-
-}}} // apache::thrift::server
+}
+}
+} // apache::thrift::server