summaryrefslogtreecommitdiff
path: root/lib/cpp/src/server/TThreadPoolServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/server/TThreadPoolServer.cpp')
-rw-r--r--lib/cpp/src/server/TThreadPoolServer.cpp217
1 files changed, 217 insertions, 0 deletions
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
new file mode 100644
index 000000000..0894cfa5f
--- /dev/null
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/TThreadPoolServer.h"
+#include "transport/TTransportException.h"
+#include "concurrency/Thread.h"
+#include "concurrency/ThreadManager.h"
+#include <string>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;;
+using namespace apache::thrift::transport;
+
+class TThreadPoolServer::Task : public Runnable {
+
+public:
+
+ Task(TThreadPoolServer &server,
+ shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
+ server_(server),
+ processor_(processor),
+ input_(input),
+ output_(output) {
+ }
+
+ ~Task() {}
+
+ void run() {
+ boost::shared_ptr<TServerEventHandler> eventHandler =
+ server_.getEventHandler();
+ if (eventHandler != NULL) {
+ eventHandler->clientBegin(input_, output_);
+ }
+ try {
+ while (processor_->process(input_, output_)) {
+ if (!input_->getTransport()->peek()) {
+ break;
+ }
+ }
+ } catch (TTransportException& ttx) {
+ // This is reasonably expected, client didn't send a full request so just
+ // ignore him
+ // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
+ // GlobalOutput(errStr.c_str());
+ } catch (TException& x) {
+ string errStr = string("TThreadPoolServer exception: ") + x.what();
+ GlobalOutput(errStr.c_str());
+ } catch (std::exception &x) {
+ string errStr = string("TThreadPoolServer, std::exception: ") + x.what();
+ GlobalOutput(errStr.c_str());
+ }
+
+ if (eventHandler != NULL) {
+ eventHandler->clientEnd(input_, output_);
+ }
+
+ try {
+ input_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ try {
+ output_->getTransport()->close();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+
+ }
+
+ private:
+ TServer& server_;
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
+
+};
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
+ threadManager_(threadManager),
+ stop_(false), timeout_(0) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager),
+ stop_(false), timeout_(0) {}
+
+
+TThreadPoolServer::~TThreadPoolServer() {}
+
+void TThreadPoolServer::serve() {
+ shared_ptr<TTransport> client;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ string errStr = string("TThreadPoolServer::run() listen(): ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ return;
+ }
+
+ // Run the preServe event
+ if (eventHandler_ != NULL) {
+ eventHandler_->preServe();
+ }
+
+ while (!stop_) {
+ try {
+ client.reset();
+ inputTransport.reset();
+ outputTransport.reset();
+ inputProtocol.reset();
+ outputProtocol.reset();
+
+ // Fetch client from server
+ client = serverTransport_->accept();
+
+ // Make IO transports
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+
+ // Add to threadmanager pool
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_);
+
+ } catch (TTransportException& ttx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ continue;
+ } catch (TException& tx) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ continue;
+ } catch (string s) {
+ if (inputTransport != NULL) { inputTransport->close(); }
+ if (outputTransport != NULL) { outputTransport->close(); }
+ if (client != NULL) { client->close(); }
+ string errStr = "TThreadPoolServer: Unknown exception: " + s;
+ GlobalOutput(errStr.c_str());
+ break;
+ }
+ }
+
+ // If stopped manually, join the existing threads
+ if (stop_) {
+ try {
+ serverTransport_->close();
+ threadManager_->join();
+ } catch (TException &tx) {
+ string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ stop_ = false;
+ }
+
+}
+
+int64_t TThreadPoolServer::getTimeout() const {
+ return timeout_;
+}
+
+void TThreadPoolServer::setTimeout(int64_t value) {
+ timeout_ = value;
+}
+
+}}} // apache::thrift::server