summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBryan Duxbury <bryanduxbury@apache.org>2010-08-05 21:13:06 +0000
committerBryan Duxbury <bryanduxbury@apache.org>2010-08-05 21:13:06 +0000
commit067a7beb7350818c97edf24086f48cef2a9be1fa (patch)
tree2c1b867b8316cb486caa510f300e07fc9b6740da
parent5226eeaaaa1789fccce5c7cf0a5cef0a38190df8 (diff)
downloadthrift-067a7beb7350818c97edf24086f48cef2a9be1fa.tar.gz
THRIFT-836. java: Race condition causes CancelledKeyException in TAsyncClientManager
This patch removes an erroneous key cancellation and adds exception handling for a bunch of different exceptions. Patch: Ning Liang git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@982789 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--lib/java/src/org/apache/thrift/async/TAsyncClientManager.java41
-rw-r--r--lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java14
-rw-r--r--lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java91
3 files changed, 126 insertions, 20 deletions
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
index b659f7cbb..1d32ace16 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -82,28 +85,40 @@ public class TAsyncClientManager {
}
// Handle any ready channels calls
- Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- keys.remove();
- if (!key.isValid()) {
- // this should only have happened if the method call experienced an
- // error and the key was cancelled. just skip it.
- continue;
+ try {
+ Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+ while (keys.hasNext()) {
+ SelectionKey key = keys.next();
+ keys.remove();
+ if (!key.isValid()) {
+ // this should only have happened if the method call experienced an
+ // error and the key was cancelled. just skip it.
+ continue;
+ }
+ TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
+ method.transition(key);
}
- TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
- method.transition(key);
+ } catch (ClosedSelectorException e) {
+ LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
}
// Start any new calls
TAsyncMethodCall methodCall;
while ((methodCall = pendingCalls.poll()) != null) {
+ // Catch registration errors. Method will catch transition errors and cleanup.
try {
SelectionKey key = methodCall.registerWithSelector(selector);
methodCall.transition(key);
- } catch (IOException e) {
- LOGGER.warn("Caught IOException in TAsyncClientManager!", e);
- }
+ } catch (ClosedChannelException e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e);
+ } catch (CancelledKeyException e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e);
+ } catch (Exception e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e);
+ }
}
}
}
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index 7273b5b23..eca321b3b 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -111,8 +111,7 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
if (!key.isValid()) {
key.cancel();
Exception e = new TTransportException("Selection key not valid!");
- client.onError(e);
- callback.onError(e);
+ onError(e);
return;
}
@@ -137,13 +136,17 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
+ " but selector called transition method. Seems like a bug...");
}
} catch (Throwable e) {
- state = State.ERROR;
key.cancel();
key.attach(null);
- client.onError(e);
- callback.onError(e);
+ onError(e);
}
}
+
+ protected void onError(Throwable e) {
+ state = State.ERROR;
+ client.onError(e);
+ callback.onError(e);
+ }
private void doReadingResponseBody(SelectionKey key) throws IOException {
if (transport.read(frameBuffer) < 0) {
@@ -159,7 +162,6 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
key.interestOps(0);
// this ensures that the TAsyncMethod instance doesn't hang around
key.attach(null);
- key.cancel();
client.onComplete();
callback.onComplete((T)this);
}
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 2962897c3..253506727 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -28,6 +28,9 @@ import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
+import java.util.List;
+import java.util.ArrayList;
+
import thrift.test.CompactProtoTestStruct;
import thrift.test.Srv;
import thrift.test.Srv.Iface;
@@ -73,6 +76,72 @@ public class TestTAsyncClientManager extends TestCase {
public void onewayMethod() throws TException {
}
}
+
+ public class JankyRunnable implements Runnable {
+ private TAsyncClientManager acm_;
+ private int numCalls_;
+ private int numSuccesses_ = 0;
+ private Srv.AsyncClient client_;
+ private TNonblockingSocket clientSocket_;
+
+ public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
+ this.acm_ = acm;
+ this.numCalls_ = numCalls;
+ this.clientSocket_ = new TNonblockingSocket("localhost", 12345);
+ this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
+ }
+
+ public int getNumSuccesses() {
+ return numSuccesses_;
+ }
+
+ public void run() {
+ for (int i = 0; i < numCalls_; i++) {
+ try {
+ // connect an async client
+ final Object o = new Object();
+
+ final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+ client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
+ @Override
+ public void onComplete(Janky_call response) {
+ try {
+ assertEquals(3, response.getResult());
+ jankyReturned.set(true);
+ synchronized(o) {
+ o.notifyAll();
+ }
+ } catch (TException e) {
+ e.printStackTrace();
+ synchronized(o) {
+ o.notifyAll();
+ }
+ fail("unexpected exception: " + e);
+ }
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ synchronized(o) {
+ o.notifyAll();
+ }
+ fail("unexpected exception: " + throwable);
+ }
+ });
+
+ synchronized(o) {
+ o.wait(1000);
+ }
+
+ assertTrue(jankyReturned.get());
+ this.numSuccesses_++;
+ } catch (Exception e) {
+ fail("Unexpected " + e);
+ }
+ }
+ }
+ }
public void testIt() throws Exception {
// put up a server
@@ -179,7 +248,27 @@ public class TestTAsyncClientManager extends TestCase {
synchronized(o) {
o.wait(1000);
}
-
assertTrue(voidAfterOnewayReturned.get());
+
+ // make multiple calls with deserialization in the selector thread (repro Eric's issue)
+ int numThreads = 500;
+ int numCallsPerThread = 100;
+ List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread);
+ Thread thread = new Thread(runnable);
+ thread.start();
+ threads.add(thread);
+ runnables.add(runnable);
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ int numSuccesses = 0;
+ for (JankyRunnable runnable : runnables) {
+ numSuccesses += runnable.getNumSuccesses();
+ }
+ assertEquals(numSuccesses, numThreads * numCallsPerThread);
}
}