diff options
author | Keith Wall <kwall@apache.org> | 2015-03-20 12:19:33 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-03-20 12:19:33 +0000 |
commit | a3132b9031d594ffccefd0ce6b9c2d3f19952d65 (patch) | |
tree | 8535490fe01ce78882cded2e6962d7130dfbb305 | |
parent | 87629732fae81a4e9ac1a500e878dc3c57dc3ab8 (diff) | |
download | qpid-python-a3132b9031d594ffccefd0ce6b9c2d3f19952d65.tar.gz |
QPID-6460, QPID-6460: [Java Client] Make task pool used for exception reporting duties exactly one thread to serialise the callbacks
Also,
* name the task pool thread (for diagnostic purposes)
* no longer forcedily shutdown the pool on close as an unexpected InterruptedException may corrupt an application's state
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1668000 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 59 insertions, 65 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ec60bd2914..717ebcc86f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import javax.jms.ConnectionConsumer; @@ -89,7 +89,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, ClientProperties.DEFAULT_CLOSE_TIMEOUT); - private final long _connectionNumber; + private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -160,8 +160,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); - /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ - private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + /** + * Thread Pool for executing connection level processes such as reporting asynchronous exceptions + * and for 0-8..0-91 returning bounced messages. + */ + private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory() + { + @Override + public Thread newThread(final Runnable r) + { + Thread thread = new Thread(r, "Connection_" + AMQConnection.this._connectionNumber + "_task"); + if (!thread.isDaemon()) + { + thread.setDaemon(true); + } + + return thread; + } + }); private AMQConnectionDelegate _delegate; @@ -255,8 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new IllegalArgumentException("Connection must be specified"); } - _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); - if (_logger.isDebugEnabled()) { _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL); @@ -545,18 +559,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _connectionMetaData = new QpidConnectionMetaData(this); } - protected boolean checkException(Throwable thrown) - { - Throwable cause = thrown.getCause(); - - if (cause == null) - { - cause = thrown; - } - - return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); - } - private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try @@ -935,8 +937,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } finally { - //This MUST occur after we have successfully closed all Channels/Sessions - shutdownTaskPool(timeout); + shutdownTaskPool(); } } catch (JMSException e) @@ -960,35 +961,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } } - } - private void shutdownTaskPool(final long timeout) + private void shutdownTaskPool() { _taskPool.shutdown(); - - if (!_taskPool.isTerminated()) - { - try - { - _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } - } - - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interrupt any running tasks. - if (!_taskPool.isTerminated()) - { - List<Runnable> tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } - } } /** @@ -1388,8 +1365,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } }); - } - else + } + else { _logger.error("Throwable Received but no listener set: " + cause); } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java index 141de1e5a8..f58acb380a 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -24,7 +24,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.ExceptionListener; @@ -37,7 +39,6 @@ import javax.jms.Queue; import javax.jms.Session; import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.ConnectionException; @@ -186,25 +187,35 @@ public class ExceptionListenerTest extends QpidBrokerTestCase // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException. // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock, // at least on my machine). - final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2); + final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2); + final AtomicBoolean doneClosed = new AtomicBoolean(); + final CountDownLatch connectionClosedAttemptLatch = new CountDownLatch(1); + final AtomicReference<Exception> connectionCloseException = new AtomicReference<>(); final ExceptionListener listener = new ExceptionListener() { public void onException(JMSException exception) { - try + exceptionReceivedLatch.countDown(); + if (exceptionReceivedLatch.getCount() == 0) { - assertNotNull("JMS Exception must have cause", exception.getCause() ); - assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass()); - exceptionReceivedLatch.countDown(); - if (exceptionReceivedLatch.getCount() == 0) + try { - connection.stop(); // ** Deadlock - connection.close(); + if (doneClosed.compareAndSet(false, true)) + { + connection.stop(); + connection.close(); + } } - } - catch (Throwable t) - { - _lastExceptionListenerException = t; + catch (Exception e) + { + // We expect no exception to be caught + connectionCloseException.set(e); + } + finally + { + connectionClosedAttemptLatch.countDown(); + } + } } }; @@ -212,7 +223,7 @@ public class ExceptionListenerTest extends QpidBrokerTestCase // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener). - final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");; + final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown"); MessageListener redirectingMessageListener = new MessageListener() { @Override @@ -221,7 +232,7 @@ public class ExceptionListenerTest extends QpidBrokerTestCase try { Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock - mlSession.createProducer(unknownQueue).send(msg); + mlSession.createProducer(unknownQueue).send(msg); // will cause async AMQNoRouteException; mlSession.commit(); } catch (JMSException je) @@ -236,9 +247,15 @@ public class ExceptionListenerTest extends QpidBrokerTestCase consumer.setMessageListener(redirectingMessageListener); connection.start(); - // Await the 2nd exception + // Await an exception boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS); - assertTrue("Exception listener did not hear exception within timeout", exceptionReceived); - assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException); + assertTrue("Exception listener did not hear at least one exception within timeout", exceptionReceived); + + // Await the connection listener to close the connection + boolean closeAttemptedReceived = connectionClosedAttemptLatch.await(10, TimeUnit.SECONDS); + assertTrue("Exception listener did not try to close the exception within timeout", closeAttemptedReceived); + assertNull("Exception listener should not have had experienced an exception : " + connectionCloseException.get(), connectionCloseException.get()); } + + } |