diff options
Diffstat (limited to 'lib/java/src/org/apache/thrift/server/THsHaServer.java')
-rw-r--r-- | lib/java/src/org/apache/thrift/server/THsHaServer.java | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java new file mode 100644 index 000000000..8bf096ed6 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.server; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; + +/** + * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. + * Like TNonblockingServer, it relies on the use of TFramedTransport. + */ +public class THsHaServer extends TNonblockingServer { + + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the Selector to workers. + private ExecutorService invoker; + + protected final int MIN_WORKER_THREADS; + protected final int MAX_WORKER_THREADS; + protected final int STOP_TIMEOUT_VAL; + protected final TimeUnit STOP_TIMEOUT_UNIT; + + /** + * Create server with given processor, and server transport. Default server + * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport) { + this(processor, serverTransport, new Options()); + } + + /** + * Create server with given processor, server transport, and server options + * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + Options options) { + this(new TProcessorFactory(processor), serverTransport, options); + } + + /** + * Create server with specified processor factory and server transport. Uses + * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is + * used on both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport) { + this(processorFactory, serverTransport, new Options()); + } + + /** + * Create server with specified processor factory, server transport, and server + * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on + * both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + Options options) { + this(processorFactory, serverTransport, new TFramedTransport.Factory(), + new TBinaryProtocol.Factory(), options); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, protocolFactory, new Options()); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory, + Options options) { + this(processor, serverTransport, new TFramedTransport.Factory(), + protocolFactory); + } + + /** + * Create server with specified processor, server transport, in/out + * transport factory, in/out protocol factory, and default server options. A + * processor factory will be created that always returns the specified + * processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + transportFactory, protocolFactory); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, new Options()); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory, + Options options) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + options); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) + { + this(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, new Options()); + } + + /** + * Create server with every option fully specified. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) + { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + options); + + MIN_WORKER_THREADS = options.minWorkerThreads; + MAX_WORKER_THREADS = options.maxWorkerThreads; + STOP_TIMEOUT_VAL = options.stopTimeoutVal; + STOP_TIMEOUT_UNIT = options.stopTimeoutUnit; + } + + /** @inheritDoc */ + @Override + public void serve() { + if (!startInvokerPool()) { + return; + } + + // start listening, or exit + if (!startListening()) { + return; + } + + // start the selector, or exit + if (!startSelectorThread()) { + return; + } + + // this will block while we serve + joinSelector(); + + gracefullyShutdownInvokerPool(); + + // do a little cleanup + stopListening(); + + // ungracefully shut down the invoker pool? + } + + protected boolean startInvokerPool() { + // start the invoker pool + LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); + invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS, + STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue); + + return true; + } + + protected void gracefullyShutdownInvokerPool() { + // try to gracefully shut down the executor service + invoker.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = 10000; + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + /** + * We override the standard invoke method here to queue the invocation for + * invoker service instead of immediately invoking. The thread pool takes care of the rest. + */ + @Override + protected void requestInvoke(FrameBuffer frameBuffer) { + invoker.execute(new Invocation(frameBuffer)); + } + + /** + * An Invocation represents a method call that is prepared to execute, given + * an idle worker thread. It contains the input and output protocols the + * thread's processor should use to perform the usual Thrift invocation. + */ + private class Invocation implements Runnable { + + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + public void run() { + frameBuffer.invoke(); + } + } + + public static class Options extends TNonblockingServer.Options { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } +} |