summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-20 12:19:33 +0000
committerKeith Wall <kwall@apache.org>2015-03-20 12:19:33 +0000
commita3132b9031d594ffccefd0ce6b9c2d3f19952d65 (patch)
tree8535490fe01ce78882cded2e6962d7130dfbb305
parent87629732fae81a4e9ac1a500e878dc3c57dc3ab8 (diff)
downloadqpid-python-a3132b9031d594ffccefd0ce6b9c2d3f19952d65.tar.gz
QPID-6460, QPID-6460: [Java Client] Make task pool used for exception reporting duties exactly one thread to serialise the callbacks
Also, * name the task pool thread (for diagnostic purposes) * no longer forcedily shutdown the pool on close as an unexpected InterruptedException may corrupt an application's state git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1668000 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java71
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java53
2 files changed, 59 insertions, 65 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ec60bd2914..717ebcc86f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionConsumer;
@@ -89,7 +89,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT,
ClientProperties.DEFAULT_CLOSE_TIMEOUT);
- private final long _connectionNumber;
+ private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -160,8 +160,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
- private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ /**
+ * Thread Pool for executing connection level processes such as reporting asynchronous exceptions
+ * and for 0-8..0-91 returning bounced messages.
+ */
+ private final ExecutorService _taskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread thread = new Thread(r, "Connection_" + AMQConnection.this._connectionNumber + "_task");
+ if (!thread.isDaemon())
+ {
+ thread.setDaemon(true);
+ }
+
+ return thread;
+ }
+ });
private AMQConnectionDelegate _delegate;
@@ -255,8 +271,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new IllegalArgumentException("Connection must be specified");
}
- _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
-
if (_logger.isDebugEnabled())
{
_logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
@@ -545,18 +559,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_connectionMetaData = new QpidConnectionMetaData(this);
}
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
@@ -935,8 +937,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
finally
{
- //This MUST occur after we have successfully closed all Channels/Sessions
- shutdownTaskPool(timeout);
+ shutdownTaskPool();
}
}
catch (JMSException e)
@@ -960,35 +961,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
}
-
}
- private void shutdownTaskPool(final long timeout)
+ private void shutdownTaskPool()
{
_taskPool.shutdown();
-
- if (!_taskPool.isTerminated())
- {
- try
- {
- _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
- }
-
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interrupt any running tasks.
- if (!_taskPool.isTerminated())
- {
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
- }
}
/**
@@ -1388,8 +1365,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
});
- }
- else
+ }
+ else
{
_logger.error("Throwable Received but no listener set: " + cause);
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
index 141de1e5a8..f58acb380a 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
@@ -37,7 +39,6 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.transport.ConnectionException;
@@ -186,25 +187,35 @@ public class ExceptionListenerTest extends QpidBrokerTestCase
// Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException.
// (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock,
// at least on my machine).
- final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+ final AtomicBoolean doneClosed = new AtomicBoolean();
+ final CountDownLatch connectionClosedAttemptLatch = new CountDownLatch(1);
+ final AtomicReference<Exception> connectionCloseException = new AtomicReference<>();
final ExceptionListener listener = new ExceptionListener()
{
public void onException(JMSException exception)
{
- try
+ exceptionReceivedLatch.countDown();
+ if (exceptionReceivedLatch.getCount() == 0)
{
- assertNotNull("JMS Exception must have cause", exception.getCause() );
- assertEquals("JMS Exception is of wrong type", AMQNoRouteException.class, exception.getCause().getClass());
- exceptionReceivedLatch.countDown();
- if (exceptionReceivedLatch.getCount() == 0)
+ try
{
- connection.stop(); // ** Deadlock
- connection.close();
+ if (doneClosed.compareAndSet(false, true))
+ {
+ connection.stop();
+ connection.close();
+ }
}
- }
- catch (Throwable t)
- {
- _lastExceptionListenerException = t;
+ catch (Exception e)
+ {
+ // We expect no exception to be caught
+ connectionCloseException.set(e);
+ }
+ finally
+ {
+ connectionClosedAttemptLatch.countDown();
+ }
+
}
}
};
@@ -212,7 +223,7 @@ public class ExceptionListenerTest extends QpidBrokerTestCase
// Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus
// provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener).
- final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");;
+ final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");
MessageListener redirectingMessageListener = new MessageListener()
{
@Override
@@ -221,7 +232,7 @@ public class ExceptionListenerTest extends QpidBrokerTestCase
try
{
Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock
- mlSession.createProducer(unknownQueue).send(msg);
+ mlSession.createProducer(unknownQueue).send(msg); // will cause async AMQNoRouteException;
mlSession.commit();
}
catch (JMSException je)
@@ -236,9 +247,15 @@ public class ExceptionListenerTest extends QpidBrokerTestCase
consumer.setMessageListener(redirectingMessageListener);
connection.start();
- // Await the 2nd exception
+ // Await an exception
boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
- assertTrue("Exception listener did not hear exception within timeout", exceptionReceived);
- assertNull("Exception listener should not have had experienced exception", _lastExceptionListenerException);
+ assertTrue("Exception listener did not hear at least one exception within timeout", exceptionReceived);
+
+ // Await the connection listener to close the connection
+ boolean closeAttemptedReceived = connectionClosedAttemptLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not try to close the exception within timeout", closeAttemptedReceived);
+ assertNull("Exception listener should not have had experienced an exception : " + connectionCloseException.get(), connectionCloseException.get());
}
+
+
}