/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "server/TThreadedServer.h" #include "transport/TTransportException.h" #include "concurrency/PosixThreadFactory.h" #include #include #include #include namespace apache { namespace thrift { namespace server { using boost::shared_ptr; using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; class TThreadedServer::Task: public Runnable { public: Task(TThreadedServer& server, shared_ptr processor, shared_ptr input, shared_ptr output) : server_(server), processor_(processor), input_(input), output_(output) { } ~Task() {} void run() { boost::shared_ptr eventHandler = server_.getEventHandler(); if (eventHandler != NULL) { eventHandler->clientBegin(input_, output_); } try { while (processor_->process(input_, output_)) { if (!input_->getTransport()->peek()) { break; } } } catch (TTransportException& ttx) { string errStr = string("TThreadedServer client died: ") + ttx.what(); GlobalOutput(errStr.c_str()); } catch (TException& x) { string errStr = string("TThreadedServer exception: ") + x.what(); GlobalOutput(errStr.c_str()); } catch (...) { GlobalOutput("TThreadedServer uncaught exception."); } if (eventHandler != NULL) { eventHandler->clientEnd(input_, output_); } try { input_->getTransport()->close(); } catch (TTransportException& ttx) { string errStr = string("TThreadedServer input close failed: ") + ttx.what(); GlobalOutput(errStr.c_str()); } try { output_->getTransport()->close(); } catch (TTransportException& ttx) { string errStr = string("TThreadedServer output close failed: ") + ttx.what(); GlobalOutput(errStr.c_str()); } // Remove this task from parent bookkeeping { Synchronized s(server_.tasksMonitor_); server_.tasks_.erase(this); if (server_.tasks_.empty()) { server_.tasksMonitor_.notify(); } } } private: TThreadedServer& server_; friend class TThreadedServer; shared_ptr processor_; shared_ptr input_; shared_ptr output_; }; TThreadedServer::TThreadedServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, shared_ptr protocolFactory): TServer(processor, serverTransport, transportFactory, protocolFactory), stop_(false) { threadFactory_ = shared_ptr(new PosixThreadFactory()); } TThreadedServer::TThreadedServer(boost::shared_ptr processor, boost::shared_ptr serverTransport, boost::shared_ptr transportFactory, boost::shared_ptr protocolFactory, boost::shared_ptr threadFactory): TServer(processor, serverTransport, transportFactory, protocolFactory), threadFactory_(threadFactory), stop_(false) { } TThreadedServer::~TThreadedServer() {} void TThreadedServer::serve() { shared_ptr client; shared_ptr inputTransport; shared_ptr outputTransport; shared_ptr inputProtocol; shared_ptr outputProtocol; try { // Start the server listening serverTransport_->listen(); } catch (TTransportException& ttx) { string errStr = string("TThreadedServer::run() listen(): ") +ttx.what(); GlobalOutput(errStr.c_str()); return; } // Run the preServe event if (eventHandler_ != NULL) { eventHandler_->preServe(); } while (!stop_) { try { client.reset(); inputTransport.reset(); outputTransport.reset(); inputProtocol.reset(); outputProtocol.reset(); // Fetch client from server client = serverTransport_->accept(); // Make IO transports inputTransport = inputTransportFactory_->getTransport(client); outputTransport = outputTransportFactory_->getTransport(client); inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); TThreadedServer::Task* task = new TThreadedServer::Task(*this, processor_, inputProtocol, outputProtocol); // Create a task shared_ptr runnable = shared_ptr(task); // Create a thread for this task shared_ptr thread = shared_ptr(threadFactory_->newThread(runnable)); // Insert thread into the set of threads { Synchronized s(tasksMonitor_); tasks_.insert(task); } // Start the thread! thread->start(); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } if (outputTransport != NULL) { outputTransport->close(); } if (client != NULL) { client->close(); } if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) { string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what(); GlobalOutput(errStr.c_str()); } continue; } catch (TException& tx) { if (inputTransport != NULL) { inputTransport->close(); } if (outputTransport != NULL) { outputTransport->close(); } if (client != NULL) { client->close(); } string errStr = string("TThreadedServer: Caught TException: ") + tx.what(); GlobalOutput(errStr.c_str()); continue; } catch (string s) { if (inputTransport != NULL) { inputTransport->close(); } if (outputTransport != NULL) { outputTransport->close(); } if (client != NULL) { client->close(); } string errStr = "TThreadedServer: Unknown exception: " + s; GlobalOutput(errStr.c_str()); break; } } // If stopped manually, make sure to close server transport if (stop_) { try { serverTransport_->close(); } catch (TException &tx) { string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what(); GlobalOutput(errStr.c_str()); } try { Synchronized s(tasksMonitor_); while (!tasks_.empty()) { tasksMonitor_.wait(); } } catch (TException &tx) { string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what(); GlobalOutput(errStr.c_str()); } stop_ = false; } } }}} // apache::thrift::server