diff options
author | Bryan Duxbury <bryanduxbury@apache.org> | 2010-08-05 21:13:06 +0000 |
---|---|---|
committer | Bryan Duxbury <bryanduxbury@apache.org> | 2010-08-05 21:13:06 +0000 |
commit | 067a7beb7350818c97edf24086f48cef2a9be1fa (patch) | |
tree | 2c1b867b8316cb486caa510f300e07fc9b6740da | |
parent | 5226eeaaaa1789fccce5c7cf0a5cef0a38190df8 (diff) | |
download | thrift-067a7beb7350818c97edf24086f48cef2a9be1fa.tar.gz |
THRIFT-836. java: Race condition causes CancelledKeyException in TAsyncClientManager
This patch removes an erroneous key cancellation and adds exception handling for a bunch of different exceptions.
Patch: Ning Liang
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@982789 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 126 insertions, 20 deletions
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java index b659f7cbb..1d32ace16 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; @@ -82,28 +85,40 @@ public class TAsyncClientManager { } // Handle any ready channels calls - Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - SelectionKey key = keys.next(); - keys.remove(); - if (!key.isValid()) { - // this should only have happened if the method call experienced an - // error and the key was cancelled. just skip it. - continue; + try { + Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + if (!key.isValid()) { + // this should only have happened if the method call experienced an + // error and the key was cancelled. just skip it. + continue; + } + TAsyncMethodCall method = (TAsyncMethodCall)key.attachment(); + method.transition(key); } - TAsyncMethodCall method = (TAsyncMethodCall)key.attachment(); - method.transition(key); + } catch (ClosedSelectorException e) { + LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e); } // Start any new calls TAsyncMethodCall methodCall; while ((methodCall = pendingCalls.poll()) != null) { + // Catch registration errors. Method will catch transition errors and cleanup. try { SelectionKey key = methodCall.registerWithSelector(selector); methodCall.transition(key); - } catch (IOException e) { - LOGGER.warn("Caught IOException in TAsyncClientManager!", e); - } + } catch (ClosedChannelException e) { + methodCall.onError(e); + LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e); + } catch (CancelledKeyException e) { + methodCall.onError(e); + LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e); + } catch (Exception e) { + methodCall.onError(e); + LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e); + } } } } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index 7273b5b23..eca321b3b 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -111,8 +111,7 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> { if (!key.isValid()) { key.cancel(); Exception e = new TTransportException("Selection key not valid!"); - client.onError(e); - callback.onError(e); + onError(e); return; } @@ -137,13 +136,17 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> { + " but selector called transition method. Seems like a bug..."); } } catch (Throwable e) { - state = State.ERROR; key.cancel(); key.attach(null); - client.onError(e); - callback.onError(e); + onError(e); } } + + protected void onError(Throwable e) { + state = State.ERROR; + client.onError(e); + callback.onError(e); + } private void doReadingResponseBody(SelectionKey key) throws IOException { if (transport.read(frameBuffer) < 0) { @@ -159,7 +162,6 @@ public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> { key.interestOps(0); // this ensures that the TAsyncMethod instance doesn't hang around key.attach(null); - key.cancel(); client.onComplete(); callback.onComplete((T)this); } diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 2962897c3..253506727 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -28,6 +28,9 @@ import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingSocket; +import java.util.List; +import java.util.ArrayList; + import thrift.test.CompactProtoTestStruct; import thrift.test.Srv; import thrift.test.Srv.Iface; @@ -73,6 +76,72 @@ public class TestTAsyncClientManager extends TestCase { public void onewayMethod() throws TException { } } + + public class JankyRunnable implements Runnable { + private TAsyncClientManager acm_; + private int numCalls_; + private int numSuccesses_ = 0; + private Srv.AsyncClient client_; + private TNonblockingSocket clientSocket_; + + public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception { + this.acm_ = acm; + this.numCalls_ = numCalls; + this.clientSocket_ = new TNonblockingSocket("localhost", 12345); + this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_); + } + + public int getNumSuccesses() { + return numSuccesses_; + } + + public void run() { + for (int i = 0; i < numCalls_; i++) { + try { + // connect an async client + final Object o = new Object(); + + final AtomicBoolean jankyReturned = new AtomicBoolean(false); + client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() { + @Override + public void onComplete(Janky_call response) { + try { + assertEquals(3, response.getResult()); + jankyReturned.set(true); + synchronized(o) { + o.notifyAll(); + } + } catch (TException e) { + e.printStackTrace(); + synchronized(o) { + o.notifyAll(); + } + fail("unexpected exception: " + e); + } + + } + + @Override + public void onError(Throwable throwable) { + synchronized(o) { + o.notifyAll(); + } + fail("unexpected exception: " + throwable); + } + }); + + synchronized(o) { + o.wait(1000); + } + + assertTrue(jankyReturned.get()); + this.numSuccesses_++; + } catch (Exception e) { + fail("Unexpected " + e); + } + } + } + } public void testIt() throws Exception { // put up a server @@ -179,7 +248,27 @@ public class TestTAsyncClientManager extends TestCase { synchronized(o) { o.wait(1000); } - assertTrue(voidAfterOnewayReturned.get()); + + // make multiple calls with deserialization in the selector thread (repro Eric's issue) + int numThreads = 500; + int numCallsPerThread = 100; + List<JankyRunnable> runnables = new ArrayList<JankyRunnable>(); + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < numThreads; i++) { + JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread); + Thread thread = new Thread(runnable); + thread.start(); + threads.add(thread); + runnables.add(runnable); + } + for (Thread thread : threads) { + thread.join(); + } + int numSuccesses = 0; + for (JankyRunnable runnable : runnables) { + numSuccesses += runnable.getNumSuccesses(); + } + assertEquals(numSuccesses, numThreads * numCallsPerThread); } } |