diff options
author | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-02-04 08:14:00 +0000 |
commit | e7f02a8b8b25d9fcce6525ccc5b794f8438995f0 (patch) | |
tree | 20179efb250c6351d7012b29fa8104558b83780f | |
parent | cf47f99d276a50ac32ed9835a9afb818fd90f4ba (diff) | |
download | qpid-python-e7f02a8b8b25d9fcce6525ccc5b794f8438995f0.tar.gz |
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067108 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 480 insertions, 70 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5c41e483c..1f940b62f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -119,7 +119,6 @@ import org.slf4j.LoggerFactory; */ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -363,7 +362,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing to enure that an auto ack is not sent. + * onMessage() processing to ensure that an auto ack is not sent. */ private boolean _inRecovery; @@ -383,7 +382,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Object _suspensionLock = new Object(); /** - * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. + * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. * * @todo This is accessed only within a synchronized method, so does not need to be atomic. */ @@ -429,7 +428,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. @@ -475,7 +474,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // flow control if (!(_thisSession.isClosed() || _thisSession.isClosing())) { - // Only executute change if previous state + // Only execute change if previous state // was False if (!_suspendState.getAndSet(true)) { @@ -485,7 +484,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic "Above threshold(" + _prefetchHighMark + ") so suspending channel. Current value is " + currentValue); } - new Thread(new SuspenderRunner(_suspendState)).start(); + try + { + Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } } } } @@ -496,7 +502,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // flow control if (!(_thisSession.isClosed() || _thisSession.isClosing())) { - // Only executute change if previous state + // Only execute change if previous state // was true if (_suspendState.getAndSet(false)) { @@ -507,7 +513,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic "Below threshold(" + _prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); } - new Thread(new SuspenderRunner(_suspendState)).start(); + try + { + Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } } } } @@ -531,7 +544,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ @@ -562,7 +575,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (IllegalStateException ise) { - // if the Connection has closed then we should throw any exception that has occured that we were not waiting for + // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) @@ -677,11 +690,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Closes the session. * - * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close + * <p/>Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close * the channel. This is because the channel is marked as closed before the request to close it is made, so the * fail-over should not re-open it. * - * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. + * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. * @todo Be aware of possible changes to parameter order as versions change. diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c16941b341..eb5af119b2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.mina.filter.codec.ProtocolCodecException; @@ -63,6 +64,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; @@ -100,7 +102,7 @@ import org.slf4j.LoggerFactory; * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. * - * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through. + * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through. * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. @@ -114,8 +116,8 @@ import org.slf4j.LoggerFactory; * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * be merged, although there is sense in keeping the session model separate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler implements ProtocolEngine @@ -158,7 +160,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ + /** The last failover exception that occurred */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ @@ -187,6 +189,21 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); @@ -275,7 +292,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if(!_connection.isClosed()) { - Thread failoverThread = new Thread(_failoverHandler); + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } failoverThread.setName("Failover"); // Do not inherit daemon-ness from current thread as this can be a daemon // thread such as a AnonymousIoService thread. @@ -369,7 +394,7 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any * protocol level waits. * * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should @@ -407,7 +432,7 @@ public class AMQProtocolHandler implements ProtocolEngine } //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be - // interupted unless failover cannot restore the state. + // interrupted unless failover cannot restore the state. propagateExceptionToFrameListeners(_lastFailoverException); } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 20a30b3ed3..8152a1f5e9 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,10 +22,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -36,7 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; * * <p/><table id="crc><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide a shared exector service. <td> {@link Executors} + * <tr><td> Provide a shared executor service. <td> {@link Executors} * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService} * <tr><td> Track references to the executor service. * <tr><td> Provide configuration of the executor service. @@ -53,13 +55,15 @@ import java.util.concurrent.LinkedBlockingQueue; * @todo {@link #_poolSize} should be static? * * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used - * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility + * further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors. */ public class ReferenceCountingExecutorService { + + /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */ private static final int MINIMUM_POOL_SIZE = 4; @@ -87,6 +91,11 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + /** Thread Factory used to create thread of the pool. Uses the default implementation provided by + * {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller. + */ + private ThreadFactory _threadFactory = Executors.defaultThreadFactory(); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); /** @@ -116,19 +125,23 @@ public class ReferenceCountingExecutorService { if (_refCount++ == 0) { -// _pool = Executors.newFixedThreadPool(_poolSize); - // Use a job queue that biases to writes if(_useBiasedPool) { _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); + new ReadWriteJobQueue(), + _threadFactory); + } else { - _pool = Executors.newFixedThreadPool(_poolSize); + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + _threadFactory); } + } @@ -137,7 +150,7 @@ public class ReferenceCountingExecutorService } /** - * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls + * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls * to zero, the executor service is shut down. */ public void releaseExecutorService() @@ -169,4 +182,34 @@ public class ReferenceCountingExecutorService { return _refCount; } + + /** + * + * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * + * @return thread factory + */ + public ThreadFactory getThreadFactory() + { + return _threadFactory; + } + + /** + * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * <p> + * If the pool has been already created, the change will have no effect until + * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason, + * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}. + * + * @param threadFactory thread factory + */ + public void setThreadFactory(final ThreadFactory threadFactory) + { + if (threadFactory == null) + { + throw new NullPointerException("threadFactory cannot be null"); + } + _threadFactory = threadFactory; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java index 9786d8fc3f..a96dac4109 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,29 +19,25 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; -public class DefaultThreadFactory implements ThreadFactory -{ - - private static class QpidThread extends Thread - { - private QpidThread(final Runnable target) - { - super(target); - } - } +public class DefaultThreadFactory implements ThreadFactory +{ + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); public Thread createThread(Runnable r) { - return new Thread(r); + Thread t = new Thread(r); + t.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return t; } public Thread createThread(Runnable r, int priority) { - Thread t = new Thread(r); + Thread t = createThread(r); t.setPriority(priority); return t; } diff --git a/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java new file mode 100644 index 0000000000..192675edcd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/thread/LoggingUncaughtExceptionHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.thread; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * An {@link UncaughtExceptionHandler} that writes the exception to the application log via + * the SLF4J framework. Once registered with {@link Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)} + * it will be invoked by the JVM when a thread has been <i>abruptly</i> terminated due to an uncaught exception. + * Owing to the contract of {@link Runnable#run()}, the only possible exception types which can cause such a termination + * are instances of {@link RuntimeException} and {@link Error}. These exceptions are catastrophic and the client must + * restart the JVM. + * <p> + * The implementation also invokes {@link ThreadGroup#uncaughtException(Thread, Throwable)}. This + * is done to retain compatibility with any monitoring solutions (for example, log scraping of + * standard error) that existing users of older Qpid client libraries may have in place. + * + */ +public class LoggingUncaughtExceptionHandler implements UncaughtExceptionHandler +{ + private static final Logger _logger = LoggerFactory.getLogger(LoggingUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(Thread t, Throwable e) + { + try + { + _logger.error("Uncaught exception in thread \"{}\"", t.getName(), e); + } + finally + { + // Invoke the thread group's handler too for compatibility with any + // existing clients who are already scraping stderr for such conditions. + t.getThreadGroup().uncaughtException(t, e); + } + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java index 0507b3108f..95a8d192c5 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -25,6 +25,8 @@ import java.lang.reflect.Constructor; public class RealtimeThreadFactory implements ThreadFactory { + private final LoggingUncaughtExceptionHandler _loggingUncaughtExceptionHandler = new LoggingUncaughtExceptionHandler(); + private Class threadClass; private Constructor threadConstructor; private Constructor priorityParameterConstructor; @@ -62,7 +64,9 @@ public class RealtimeThreadFactory implements ThreadFactory public Thread createThread(Runnable r, int priority) throws Exception { Object priorityParams = priorityParameterConstructor.newInstance(priority); - return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + Thread thread = (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r); + thread.setUncaughtExceptionHandler(_loggingUncaughtExceptionHandler); + return thread; } } diff --git a/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java new file mode 100644 index 0000000000..35998de3a1 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/pool/ReferenceCountingExecutorServiceTest.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + + +public class ReferenceCountingExecutorServiceTest extends TestCase +{ + + + private ReferenceCountingExecutorService _executorService = ReferenceCountingExecutorService.getInstance(); // Class under test + private ThreadFactory _beforeExecutorThreadFactory; + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _beforeExecutorThreadFactory = _executorService.getThreadFactory(); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + _executorService.setThreadFactory(_beforeExecutorThreadFactory); + } + + + + /** + * Tests that the ReferenceCountingExecutorService correctly manages the reference count. + */ + public void testReferenceCounting() throws Exception + { + final int countBefore = _executorService.getReferenceCount(); + + try + { + _executorService.acquireExecutorService(); + _executorService.acquireExecutorService(); + + assertEquals("Reference count should now be +2", countBefore + 2, _executorService.getReferenceCount()); + } + finally + { + _executorService.releaseExecutorService(); + _executorService.releaseExecutorService(); + } + assertEquals("Reference count should have returned to the initial value", countBefore, _executorService.getReferenceCount()); + } + + /** + * Tests that the executor creates and executes a task using the default thread pool. + */ + public void testExecuteCommandWithDefaultExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final Set<ThreadGroup> threadGroups = new HashSet<ThreadGroup>(); + + _executorService.acquireExecutorService(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using default thread factory", + threadGroups.contains(Thread.currentThread().getThreadGroup())); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + /** + * Tests that the executor creates and executes a task using an overridden thread pool. + */ + public void testExecuteCommandWithOverriddenExecutorThreadFactory() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + final ThreadGroup expectedThreadGroup = new ThreadGroup("junit"); + _executorService.setThreadFactory(new ThreadGroupChangingThreadFactory(expectedThreadGroup)); + _executorService.acquireExecutorService(); + + final Set<ThreadGroup> threadGroups = new HashSet<ThreadGroup>(); + + try + { + _executorService.getPool().execute(createRunnable(latch, threadGroups)); + + latch.await(3, TimeUnit.SECONDS); + + assertTrue("Expect that executor created a thread using overridden thread factory", + threadGroups.contains(expectedThreadGroup)); + } + finally + { + _executorService.releaseExecutorService(); + } + } + + private Runnable createRunnable(final CountDownLatch latch, final Set<ThreadGroup> threadGroups) + { + return new Runnable() + { + + public void run() + { + threadGroups.add(Thread.currentThread().getThreadGroup()); + latch.countDown(); + } + + }; + } + + private final class ThreadGroupChangingThreadFactory implements ThreadFactory + { + private final ThreadGroup _newGroup; + + private ThreadGroupChangingThreadFactory(final ThreadGroup newGroup) + { + this._newGroup = newGroup; + } + + public Thread newThread(Runnable r) + { + return new Thread(_newGroup, r); + } + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java index 7f17592893..7b0f93700a 100644 --- a/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java +++ b/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,18 +19,22 @@ package org.apache.qpid.thread; * */ +package org.apache.qpid.thread; import junit.framework.TestCase; +/** + * Tests the ThreadFactory. + */ public class ThreadFactoryTest extends TestCase { public void testThreadFactory() { - Class threadFactoryClass = null; + Class<? extends ThreadFactory> threadFactoryClass = null; try { threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory", - "org.apache.qpid.thread.DefaultThreadFactory")); + "org.apache.qpid.thread.DefaultThreadFactory")).asSubclass(ThreadFactory.class); } // If the thread factory class was wrong it will flagged way before it gets here. catch(Exception e) @@ -41,20 +44,19 @@ public class ThreadFactoryTest extends TestCase assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass()); } - - public void testThreadCreate() + + /** + * Tests creating a thread without a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithDefaultPriority() { - Runnable r = new Runnable(){ - - public void run(){ - - } - }; + Runnable r = createRunnable(); Thread t = null; try { - t = Threading.getThreadFactory().createThread(r,5); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { @@ -62,6 +64,41 @@ public class ThreadFactoryTest extends TestCase } assertNotNull(t); - assertEquals(5,t.getPriority()); + assertEquals(Thread.NORM_PRIORITY, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + /** + * Tests creating thread with a priority. Also verifies that the factory sets the + * uncaught exception handler so uncaught exceptions are logged to SLF4J. + */ + public void testCreateThreadWithSpecifiedPriority() + { + Runnable r = createRunnable(); + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r, 4); + } + catch(Exception e) + { + fail("Error creating thread using Qpid thread factory"); + } + + assertNotNull(t); + assertEquals(4, t.getPriority()); + assertTrue(t.getUncaughtExceptionHandler() instanceof LoggingUncaughtExceptionHandler); + } + + private Runnable createRunnable() + { + Runnable r = new Runnable(){ + + public void run(){ + + } + }; + return r; } } diff --git a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java index 11d9ce2bbc..e4d1c72208 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.client; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -46,14 +50,15 @@ import java.util.concurrent.TimeUnit; * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining * messages will be left on the queue and lost, subsequent messages on the session will arrive first. */ -public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener +public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener { private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); Context _context; private static final int MSG_COUNT = 5; - private int receivedCount = 0; + private int _receivedCount = 0; + private int _errorCount = 0; private MessageConsumer _consumer; private Connection _clientConnection; private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); @@ -94,11 +99,14 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi protected void tearDown() throws Exception { - _clientConnection.close(); + if (_clientConnection != null) + { + _clientConnection.close(); + } super.tearDown(); } - public void testSynchronousRecieve() throws Exception + public void testSynchronousReceive() throws Exception { for (int msg = 0; msg < MSG_COUNT; msg++) { @@ -106,15 +114,15 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi } } - public void testSynchronousRecieveNoWait() throws Exception + public void testSynchronousReceiveNoWait() throws Exception { for (int msg = 0; msg < MSG_COUNT; msg++) { - assertTrue(_consumer.receiveNoWait() != null); + assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null); } } - public void testAsynchronousRecieve() throws Exception + public void testAsynchronousReceive() throws Exception { _consumer.setMessageListener(this); @@ -128,18 +136,17 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi { // do nothing } - // Should have recieved all async messages - assertEquals(MSG_COUNT, receivedCount); + // Should have received all async messages + assertEquals(MSG_COUNT, _receivedCount); } - public void testRecieveThenUseMessageListener() throws Exception + public void testReceiveThenUseMessageListener() throws Exception { - _logger.error("Test disabled as initial receive is not called first"); // Perform initial receive to start connection assertTrue(_consumer.receive(2000) != null); - receivedCount++; + _receivedCount++; // Sleep to ensure remaining 4 msgs end up on _synchronousQueue Thread.sleep(1000); @@ -157,8 +164,8 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi { // do nothing } - // Should have recieved all async messages - assertEquals(MSG_COUNT, receivedCount); + // Should have received all async messages + assertEquals(MSG_COUNT, _receivedCount); _clientConnection.close(); @@ -172,14 +179,81 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi assertTrue(cons.receive(2000) == null); } + /** + * Tests the case where the message listener throws an java.lang.Error. + * + */ + public void testMessageListenerThrowsError() throws Exception + { + final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; + _clientConnection.setExceptionListener(this); + + _awaitMessages = new CountDownLatch(1); + + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + try + { + _logger.debug("onMessage called"); + _receivedCount++; + + + throw new Error(javaLangErrorMessageText); + } + finally + { + _awaitMessages.countDown(); + } + } + }); + + + _logger.info("Waiting 3 seconds for message"); + _awaitMessages.await(3000, TimeUnit.MILLISECONDS); + + assertEquals("onMessage should have been called", 1, _receivedCount); + assertEquals("onException should NOT have been called", 0, _errorCount); + + // Check that Error has been written to the application log. + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected message not written to log file.", + _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); + + if (_clientConnection != null) + { + try + { + _clientConnection.close(); + } + catch (JMSException e) + { + // Ignore connection close errors for this test. + } + finally + { + _clientConnection = null; + } + } + } + public void onMessage(Message message) { - _logger.info("Received Message(" + receivedCount + "):" + message); + _logger.info("Received Message(" + _receivedCount + "):" + message); - receivedCount++; + _receivedCount++; _awaitMessages.countDown(); } + @Override + public void onException(JMSException e) + { + _logger.info("Exception received", e); + _errorCount++; + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(MessageListenerTest.class); diff --git a/java/test-profiles/08StandaloneExcludes b/java/test-profiles/08StandaloneExcludes index 214ee83bb2..b482a14c6d 100644 --- a/java/test-profiles/08StandaloneExcludes +++ b/java/test-profiles/08StandaloneExcludes @@ -34,6 +34,6 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* -org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait +org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism |