summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBryan Duxbury <bryanduxbury@apache.org>2011-11-04 00:23:30 +0000
committerBryan Duxbury <bryanduxbury@apache.org>2011-11-04 00:23:30 +0000
commitdf971daf373ceda6de2f76e5f0713c64b6d2e242 (patch)
treee77f84418bae0c5f5e5fab0c9ff5c80d8cb110af
parente7bdf70daba05e2fa704a3562eb924573342777b (diff)
downloadthrift-df971daf373ceda6de2f76e5f0713c64b6d2e242.tar.gz
THRIFT-1420. java: Nonblocking and HsHa server should make sure to close all their socket connections when the selector exits
This patch makes the selector threads close out all of their open sockets before completely exiting. In testing, this appears to alleviate issues with hanging clients. Patch: Thomas Kielbus git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1197370 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--lib/java/src/org/apache/thrift/server/TNonblockingServer.java11
-rw-r--r--lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java3
-rw-r--r--lib/java/test/org/apache/thrift/server/TestNonblockingServer.java44
3 files changed, 54 insertions, 4 deletions
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 7afd4b352..dccae527c 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -20,14 +20,14 @@
package org.apache.thrift.server;
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
* connected clients in terms of invocations.
@@ -154,6 +154,9 @@ public class TNonblockingServer extends AbstractNonblockingServer {
select();
processInterestChanges();
}
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 4cf5f1b51..04179e64d 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -537,6 +537,9 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
processAcceptedConnections();
processInterestChanges();
}
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
index 52b62c349..597074ed2 100644
--- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
+++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
@@ -20,16 +20,22 @@ package org.apache.thrift.server;
import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer.Args;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import thrift.test.ThriftTest;
public class TestNonblockingServer extends ServerTestBase {
private Thread serverThread;
private TServer server;
+ private static final int NUM_QUERIES = 10000;
protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
@@ -71,4 +77,42 @@ public class TestNonblockingServer extends ServerTestBase {
public TTransport getClientTransport(TTransport underlyingTransport) throws Exception {
return new TFramedTransport(underlyingTransport);
}
+
+
+ public void testCleanupAllSelectionKeys() throws Exception {
+ for (TProtocolFactory protoFactory : getProtocols()) {
+ TestHandler handler = new TestHandler();
+ ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+
+ startServer(processor, protoFactory);
+
+ TSocket socket = new TSocket(HOST, PORT);
+ socket.setTimeout(SOCKET_TIMEOUT);
+ TTransport transport = getClientTransport(socket);
+
+ TProtocol protocol = protoFactory.getProtocol(transport);
+ ThriftTest.Client testClient = new ThriftTest.Client(protocol);
+
+ open(transport);
+
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ testClient.testI32(1);
+ }
+ server.stop();
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ try {
+ testClient.testI32(1);
+ } catch(TTransportException e) {
+ System.err.println(e);
+ e.printStackTrace();
+ if (e.getCause() instanceof java.net.SocketTimeoutException) {
+ fail("timed out when it should have thrown another kind of error!");
+ }
+ }
+ }
+
+ transport.close();
+ stopServer();
+ }
+ }
}