summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
committerRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
commite7f02a8b8b25d9fcce6525ccc5b794f8438995f0 (patch)
tree20179efb250c6351d7012b29fa8104558b83780f
parentcf47f99d276a50ac32ed9835a9afb818fd90f4ba (diff)
downloadqpid-python-e7f02a8b8b25d9fcce6525ccc5b794f8438995f0.tar.gz
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java57
-rw-r--r--java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java60
-rw-r--r--java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java6
-rw-r--r--java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java159
-rw-r--r--java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java63
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java106
-rw-r--r--java/test-profiles/08StandaloneExcludes2
10 files changed, 480 insertions, 70 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b5c41e483c..1f940b62f0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -363,7 +362,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
- * onMessage() processing to enure that an auto ack is not sent.
+ * onMessage() processing to ensure that an auto ack is not sent.
*/
private boolean _inRecovery;
@@ -383,7 +382,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Object _suspensionLock = new Object();
/**
- * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
+ * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
*
* @todo This is accessed only within a synchronized method, so does not need to be atomic.
*/
@@ -429,7 +428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -475,7 +474,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// flow control
if (!(_thisSession.isClosed() || _thisSession.isClosing()))
{
- // Only executute change if previous state
+ // Only execute change if previous state
// was False
if (!_suspendState.getAndSet(true))
{
@@ -485,7 +484,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
"Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
}
- new Thread(new SuspenderRunner(_suspendState)).start();
+ try
+ {
+ Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
}
}
}
@@ -496,7 +502,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// flow control
if (!(_thisSession.isClosed() || _thisSession.isClosing()))
{
- // Only executute change if previous state
+ // Only execute change if previous state
// was true
if (_suspendState.getAndSet(false))
{
@@ -507,7 +513,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
"Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
}
- new Thread(new SuspenderRunner(_suspendState)).start();
+ try
+ {
+ Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
}
}
}
@@ -531,7 +544,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -562,7 +575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (IllegalStateException ise)
{
- // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+ // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
@@ -677,11 +690,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Closes the session.
*
- * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
+ * <p/>Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close
* the channel. This is because the channel is marked as closed before the request to close it is made, so the
* fail-over should not re-open it.
*
- * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
+ * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
* @todo Be aware of possible changes to parameter order as versions change.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c16941b341..eb5af119b2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecException;
@@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
@@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory;
* connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
* and the protocol session data is held outside of the MINA IOSession.
*
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through.
* The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
* the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
* optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
@@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory;
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
* failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
* AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * be merged, although there is sense in keeping the session model separate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler implements ProtocolEngine
@@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
- /** The last failover exception that occured */
+ /** The last failover exception that occurred */
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
@@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _poolReference.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread(final Runnable runnable)
+ {
+ try
+ {
+ return Threading.getThreadFactory().createThread(runnable);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
+ }
+ });
_readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
_writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_poolReference.acquireExecutorService();
@@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if(!_connection.isClosed())
{
- Thread failoverThread = new Thread(_failoverHandler);
+ final Thread failoverThread;
+ try
+ {
+ failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
failoverThread.setName("Failover");
// Do not inherit daemon-ness from current thread as this can be a daemon
// thread such as a AnonymousIoService thread.
@@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
/**
- * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+ * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any
* protocol level waits.
*
* This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
@@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
//Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
- // interupted unless failover cannot restore the state.
+ // interrupted unless failover cannot restore the state.
propagateExceptionToFrameListeners(_lastFailoverException);
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index 20a30b3ed3..8152a1f5e9 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -22,10 +22,12 @@ package org.apache.qpid.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
+
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
* the references taken, instantiating the service on the first reference, and shutting it down when the last
@@ -36,7 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
*
* <p/><table id="crc><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a shared exector service. <td> {@link Executors}
+ * <tr><td> Provide a shared executor service. <td> {@link Executors}
* <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
* <tr><td> Track references to the executor service.
* <tr><td> Provide configuration of the executor service.
@@ -53,13 +55,15 @@ import java.util.concurrent.LinkedBlockingQueue;
* @todo {@link #_poolSize} should be static?
*
* @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
- * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility
+ * further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility
* for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
* here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
* isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
*/
public class ReferenceCountingExecutorService
{
+
+
/** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
private static final int MINIMUM_POOL_SIZE = 4;
@@ -87,6 +91,11 @@ public class ReferenceCountingExecutorService
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ /** Thread Factory used to create thread of the pool. Uses the default implementation provided by
+ * {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller.
+ */
+ private ThreadFactory _threadFactory = Executors.defaultThreadFactory();
+
private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
/**
@@ -116,19 +125,23 @@ public class ReferenceCountingExecutorService
{
if (_refCount++ == 0)
{
-// _pool = Executors.newFixedThreadPool(_poolSize);
-
// Use a job queue that biases to writes
if(_useBiasedPool)
{
_pool = new ThreadPoolExecutor(_poolSize, _poolSize,
0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue());
+ new ReadWriteJobQueue(),
+ _threadFactory);
+
}
else
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ _threadFactory);
}
+
}
@@ -137,7 +150,7 @@ public class ReferenceCountingExecutorService
}
/**
- * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
+ * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls
* to zero, the executor service is shut down.
*/
public void releaseExecutorService()
@@ -169,4 +182,34 @@ public class ReferenceCountingExecutorService
{
return _refCount;
}
+
+ /**
+ *
+ * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
+ *
+ * @return thread factory
+ */
+ public ThreadFactory getThreadFactory()
+ {
+ return _threadFactory;
+ }
+
+ /**
+ * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
+ * <p>
+ * If the pool has been already created, the change will have no effect until
+ * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason,
+ * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}.
+ *
+ * @param threadFactory thread factory
+ */
+ public void setThreadFactory(final ThreadFactory threadFactory)
+ {
+ if (threadFactory == null)
+ {
+ throw new NullPointerException("threadFactory cannot be null");
+ }
+ _threadFactory = threadFactory;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
index 9786d8fc3f..a96dac4109 100644
--- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.thread;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,29 +19,25 @@ package org.apache.qpid.thread;
*
*/
+package org.apache.qpid.thread;
-public class DefaultThreadFactory implements ThreadFactory
-{
-
- private static class QpidThread extends Thread
- {
- private QpidThread(final Runnable target)
- {
- super(target);
- }
- }
+public class DefaultThreadFactory implements ThreadFactory
+{
+ private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler();
public Thread createThread(Runnable r)
{
- return new Thread(r);
+ Thread t = new Thread(r);
+ t.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler);
+ return t;
}
public Thread createThread(Runnable r, int priority)
{
- Thread t = new Thread(r);
+ Thread t = createThread(r);
t.setPriority(priority);
return t;
}
diff --git a/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..192675edcd
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.thread;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * An {@link UncaughtExceptionHandler} that writes the exception to the application log via
+ * the SLF4J framework. Once registered with {@link Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)}
+ * it will be invoked by the JVM when a thread has been <i>abruptly</i> terminated due to an uncaught exception.
+ * Owing to the contract of {@link Runnable#run()}, the only possible exception types which can cause such a termination
+ * are instances of {@link RuntimeException} and {@link Error}. These exceptions are catastrophic and the client must
+ * restart the JVM.
+ * <p>
+ * The implementation also invokes {@link ThreadGroup#uncaughtException(Thread, Throwable)}. This
+ * is done to retain compatibility with any monitoring solutions (for example, log scraping of
+ * standard error) that existing users of older Qpid client libraries may have in place.
+ *
+ */
+public class LoggingUncaughtExceptionHandler implements UncaughtExceptionHandler
+{
+ private static final Logger _logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ try
+ {
+ _logger.error("Uncaught exception in thread \"{}\"", t.getName(), e);
+ }
+ finally
+ {
+ // Invoke the thread group's handler too for compatibility with any
+ // existing clients who are already scraping stderr for such conditions.
+ t.getThreadGroup().uncaughtException(t, e);
+ }
+ }
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
index 0507b3108f..95a8d192c5 100644
--- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
@@ -25,6 +25,8 @@ import java.lang.reflect.Constructor;
public class RealtimeThreadFactory implements ThreadFactory
{
+ private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler();
+
private Class threadClass;
private Constructor threadConstructor;
private Constructor priorityParameterConstructor;
@@ -62,7 +64,9 @@ public class RealtimeThreadFactory implements ThreadFactory
public Thread createThread(Runnable r, int priority) throws Exception
{
Object priorityParams = priorityParameterConstructor.newInstance(priority);
- return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ Thread thread = (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ thread.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler);
+ return thread;
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java
new file mode 100644
index 0000000000..35998de3a1
--- /dev/null
+++ b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+
+public class ReferenceCountingExecutorServiceTest extends TestCase
+{
+
+
+ private ReferenceCountingExecutorService _executorService = ReferenceCountingExecutorService.getInstance(); // Class under test
+ private ThreadFactory _beforeExecutorThreadFactory;
+
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _beforeExecutorThreadFactory = _executorService.getThreadFactory();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ _executorService.setThreadFactory(_beforeExecutorThreadFactory);
+ }
+
+
+
+ /**
+ * Tests that the ReferenceCountingExecutorService correctly manages the reference count.
+ */
+ public void testReferenceCounting() throws Exception
+ {
+ final int countBefore = _executorService.getReferenceCount();
+
+ try
+ {
+ _executorService.acquireExecutorService();
+ _executorService.acquireExecutorService();
+
+ assertEquals("Reference count should now be +2", countBefore + 2, _executorService.getReferenceCount());
+ }
+ finally
+ {
+ _executorService.releaseExecutorService();
+ _executorService.releaseExecutorService();
+ }
+ assertEquals("Reference count should have returned to the initial value", countBefore, _executorService.getReferenceCount());
+ }
+
+ /**
+ * Tests that the executor creates and executes a task using the default thread pool.
+ */
+ public void testExecuteCommandWithDefaultExecutorThreadFactory() throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Set<ThreadGroup> threadGroups = new HashSet<ThreadGroup>();
+
+ _executorService.acquireExecutorService();
+
+ try
+ {
+ _executorService.getPool().execute(createRunnable(latch, threadGroups));
+
+ latch.await(3, TimeUnit.SECONDS);
+
+ assertTrue("Expect that executor created a thread using default thread factory",
+ threadGroups.contains(Thread.currentThread().getThreadGroup()));
+ }
+ finally
+ {
+ _executorService.releaseExecutorService();
+ }
+ }
+
+ /**
+ * Tests that the executor creates and executes a task using an overridden thread pool.
+ */
+ public void testExecuteCommandWithOverriddenExecutorThreadFactory() throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final ThreadGroup expectedThreadGroup = new ThreadGroup("junit");
+ _executorService.setThreadFactory(new ThreadGroupChangingThreadFactory(expectedThreadGroup));
+ _executorService.acquireExecutorService();
+
+ final Set<ThreadGroup> threadGroups = new HashSet<ThreadGroup>();
+
+ try
+ {
+ _executorService.getPool().execute(createRunnable(latch, threadGroups));
+
+ latch.await(3, TimeUnit.SECONDS);
+
+ assertTrue("Expect that executor created a thread using overridden thread factory",
+ threadGroups.contains(expectedThreadGroup));
+ }
+ finally
+ {
+ _executorService.releaseExecutorService();
+ }
+ }
+
+ private Runnable createRunnable(final CountDownLatch latch, final Set<ThreadGroup> threadGroups)
+ {
+ return new Runnable()
+ {
+
+ public void run()
+ {
+ threadGroups.add(Thread.currentThread().getThreadGroup());
+ latch.countDown();
+ }
+
+ };
+ }
+
+ private final class ThreadGroupChangingThreadFactory implements ThreadFactory
+ {
+ private final ThreadGroup _newGroup;
+
+ private ThreadGroupChangingThreadFactory(final ThreadGroup newGroup)
+ {
+ this._newGroup = newGroup;
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(_newGroup, r);
+ }
+ }
+
+}
diff --git a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java
index 7f17592893..7b0f93700a 100644
--- a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java
+++ b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.thread;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,18 +19,22 @@ package org.apache.qpid.thread;
*
*/
+package org.apache.qpid.thread;
import junit.framework.TestCase;
+/**
+ * Tests the ThreadFactory.
+ */
public class ThreadFactoryTest extends TestCase
{
public void testThreadFactory()
{
- Class threadFactoryClass = null;
+ Class<? extends ThreadFactory> threadFactoryClass = null;
try
{
threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory",
- "org.apache.qpid.thread.DefaultThreadFactory"));
+ "org.apache.qpid.thread.DefaultThreadFactory")).asSubclass(ThreadFactory.class);
}
// If the thread factory class was wrong it will flagged way before it gets here.
catch(Exception e)
@@ -41,20 +44,19 @@ public class ThreadFactoryTest extends TestCase
assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass());
}
-
- public void testThreadCreate()
+
+ /**
+ * Tests creating a thread without a priority. Also verifies that the factory sets the
+ * uncaught exception handler so uncaught exceptions are logged to SLF4J.
+ */
+ public void testCreateThreadWithDefaultPriority()
{
- Runnable r = new Runnable(){
-
- public void run(){
-
- }
- };
+ Runnable r = createRunnable();
Thread t = null;
try
{
- t = Threading.getThreadFactory().createThread(r,5);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
@@ -62,6 +64,41 @@ public class ThreadFactoryTest extends TestCase
}
assertNotNull(t);
- assertEquals(5,t.getPriority());
+ assertEquals(Thread.NORM_PRIORITY, t.getPriority());
+ assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler);
+ }
+
+ /**
+ * Tests creating thread with a priority. Also verifies that the factory sets the
+ * uncaught exception handler so uncaught exceptions are logged to SLF4J.
+ */
+ public void testCreateThreadWithSpecifiedPriority()
+ {
+ Runnable r = createRunnable();
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r, 4);
+ }
+ catch(Exception e)
+ {
+ fail("Error creating thread using Qpid thread factory");
+ }
+
+ assertNotNull(t);
+ assertEquals(4, t.getPriority());
+ assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler);
+ }
+
+ private Runnable createRunnable()
+ {
+ Runnable r = new Runnable(){
+
+ public void run(){
+
+ }
+ };
+ return r;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
index 11d9ce2bbc..e4d1c72208 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -46,14 +50,15 @@ import java.util.concurrent.TimeUnit;
* the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
* messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
-public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener
+public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener
{
private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
Context _context;
private static final int MSG_COUNT = 5;
- private int receivedCount = 0;
+ private int _receivedCount = 0;
+ private int _errorCount = 0;
private MessageConsumer _consumer;
private Connection _clientConnection;
private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
@@ -94,11 +99,14 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
protected void tearDown() throws Exception
{
- _clientConnection.close();
+ if (_clientConnection != null)
+ {
+ _clientConnection.close();
+ }
super.tearDown();
}
- public void testSynchronousRecieve() throws Exception
+ public void testSynchronousReceive() throws Exception
{
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -106,15 +114,15 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
}
}
- public void testSynchronousRecieveNoWait() throws Exception
+ public void testSynchronousReceiveNoWait() throws Exception
{
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- assertTrue(_consumer.receiveNoWait() != null);
+ assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null);
}
}
- public void testAsynchronousRecieve() throws Exception
+ public void testAsynchronousReceive() throws Exception
{
_consumer.setMessageListener(this);
@@ -128,18 +136,17 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
{
// do nothing
}
- // Should have recieved all async messages
- assertEquals(MSG_COUNT, receivedCount);
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
}
- public void testRecieveThenUseMessageListener() throws Exception
+ public void testReceiveThenUseMessageListener() throws Exception
{
-
_logger.error("Test disabled as initial receive is not called first");
// Perform initial receive to start connection
assertTrue(_consumer.receive(2000) != null);
- receivedCount++;
+ _receivedCount++;
// Sleep to ensure remaining 4 msgs end up on _synchronousQueue
Thread.sleep(1000);
@@ -157,8 +164,8 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
{
// do nothing
}
- // Should have recieved all async messages
- assertEquals(MSG_COUNT, receivedCount);
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
_clientConnection.close();
@@ -172,14 +179,81 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
assertTrue(cons.receive(2000) == null);
}
+ /**
+ * Tests the case where the message listener throws an java.lang.Error.
+ *
+ */
+ public void testMessageListenerThrowsError() throws Exception
+ {
+ final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error";
+ _clientConnection.setExceptionListener(this);
+
+ _awaitMessages = new CountDownLatch(1);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.debug("onMessage called");
+ _receivedCount++;
+
+
+ throw new Error(javaLangErrorMessageText);
+ }
+ finally
+ {
+ _awaitMessages.countDown();
+ }
+ }
+ });
+
+
+ _logger.info("Waiting 3 seconds for message");
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+
+ assertEquals("onMessage should have been called", 1, _receivedCount);
+ assertEquals("onException should NOT have been called", 0, _errorCount);
+
+ // Check that Error has been written to the application log.
+
+ LogMonitor _monitor = new LogMonitor(_outputFile);
+ assertTrue("The expected message not written to log file.",
+ _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT));
+
+ if (_clientConnection != null)
+ {
+ try
+ {
+ _clientConnection.close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore connection close errors for this test.
+ }
+ finally
+ {
+ _clientConnection = null;
+ }
+ }
+ }
+
public void onMessage(Message message)
{
- _logger.info("Received Message(" + receivedCount + "):" + message);
+ _logger.info("Received Message(" + _receivedCount + "):" + message);
- receivedCount++;
+ _receivedCount++;
_awaitMessages.countDown();
}
+ @Override
+ public void onException(JMSException e)
+ {
+ _logger.info("Exception received", e);
+ _errorCount++;
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(MessageListenerTest.class);
diff --git a/java/test-profiles/08StandaloneExcludes b/java/test-profiles/08StandaloneExcludes
index 214ee83bb2..b482a14c6d 100644
--- a/java/test-profiles/08StandaloneExcludes
+++ b/java/test-profiles/08StandaloneExcludes
@@ -34,6 +34,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
-org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
+org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait
org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism