diff options
author | Bryan Duxbury <bryanduxbury@apache.org> | 2011-11-04 00:23:30 +0000 |
---|---|---|
committer | Bryan Duxbury <bryanduxbury@apache.org> | 2011-11-04 00:23:30 +0000 |
commit | df971daf373ceda6de2f76e5f0713c64b6d2e242 (patch) | |
tree | e77f84418bae0c5f5e5fab0c9ff5c80d8cb110af | |
parent | e7bdf70daba05e2fa704a3562eb924573342777b (diff) | |
download | thrift-df971daf373ceda6de2f76e5f0713c64b6d2e242.tar.gz |
THRIFT-1420. java: Nonblocking and HsHa server should make sure to close all their socket connections when the selector exits
This patch makes the selector threads close out all of their open sockets before completely exiting. In testing, this appears to alleviate issues with hanging clients.
Patch: Thomas Kielbus
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1197370 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 54 insertions, 4 deletions
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 7afd4b352..dccae527c 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -20,14 +20,14 @@ package org.apache.thrift.server; -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.util.Iterator; - import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.Iterator; + /** * A nonblocking TServer implementation. This allows for fairness amongst all * connected clients in terms of invocations. @@ -154,6 +154,9 @@ public class TNonblockingServer extends AbstractNonblockingServer { select(); processInterestChanges(); } + for (SelectionKey selectionKey : selector.keys()) { + cleanupSelectionKey(selectionKey); + } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 4cf5f1b51..04179e64d 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -537,6 +537,9 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { processAcceptedConnections(); processInterestChanges(); } + for (SelectionKey selectionKey : selector.keys()) { + cleanupSelectionKey(selectionKey); + } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java index 52b62c349..597074ed2 100644 --- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java +++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java @@ -20,16 +20,22 @@ package org.apache.thrift.server; import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TNonblockingServer.Args; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import thrift.test.ThriftTest; public class TestNonblockingServer extends ServerTestBase { private Thread serverThread; private TServer server; + private static final int NUM_QUERIES = 10000; protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) { return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory)); @@ -71,4 +77,42 @@ public class TestNonblockingServer extends ServerTestBase { public TTransport getClientTransport(TTransport underlyingTransport) throws Exception { return new TFramedTransport(underlyingTransport); } + + + public void testCleanupAllSelectionKeys() throws Exception { + for (TProtocolFactory protoFactory : getProtocols()) { + TestHandler handler = new TestHandler(); + ThriftTest.Processor processor = new ThriftTest.Processor(handler); + + startServer(processor, protoFactory); + + TSocket socket = new TSocket(HOST, PORT); + socket.setTimeout(SOCKET_TIMEOUT); + TTransport transport = getClientTransport(socket); + + TProtocol protocol = protoFactory.getProtocol(transport); + ThriftTest.Client testClient = new ThriftTest.Client(protocol); + + open(transport); + + for (int i = 0; i < NUM_QUERIES; ++i) { + testClient.testI32(1); + } + server.stop(); + for (int i = 0; i < NUM_QUERIES; ++i) { + try { + testClient.testI32(1); + } catch(TTransportException e) { + System.err.println(e); + e.printStackTrace(); + if (e.getCause() instanceof java.net.SocketTimeoutException) { + fail("timed out when it should have thrown another kind of error!"); + } + } + } + + transport.close(); + stopServer(); + } + } } |