diff options
Diffstat (limited to 'qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java')
-rw-r--r-- | qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java new file mode 100644 index 0000000000..3cf8543656 --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java @@ -0,0 +1,486 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ThreadFactory; + +/** + * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send + * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch + * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one + * sender the latches for all senders for a are cleared. This class is always used in conjunction with + * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce + * concurrency bugs. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept test threads to coordinate. + * <tr><td> Allow test threads to send 'allow to continue' signals. + * <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals. + * <tr><td> Report error messages from test threads. + * <tr><td> Report exceptions from test threads. + * <tr><td> Provide method to wait until all test threads have completed. + * </table> + * + * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has + * proved extremely usefull. Some documentation for this needs to be written to explain it better. + * + * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for + * each other, they are deadlocked and there is something wrong with the test code that put them in that + * situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some + * external event. A timeout should be used. Timeout is already implemented, just need to sanity check how + * this is working and document it. + * + * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I + * guess a longish timeout is the only thing that can be done for that. + * + * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can + * only insert synchronization points between method calls it makes. So this code will not be usefull for + * checking sequences of events within methods, unless the code under test is explicitly instrumented for it. + * It might be possible to instrument code by using labels, and then use the debugger/profiler interface to + * put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels + * can be left in the code, without altering its behaviour. + * + * @author Rupert Smith + */ +public class ThreadTestCoordinator +{ + /** Used for logging. */ + private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class); + + /** Keeps track of the test threads by their ids. */ + private TestRunnable[] testThreads; // = new TestRunnable[2]; + + /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */ + private final Object coordinatorLock = new Object(); + + /** A set of monitors for each test thread. */ + private Object[] locks; + + /** The binary latch array, this is always a square array allowing one event from and to every thread. */ + private boolean[][] allowEvents; + + /** Keeps track of the number of threads being coordinated. */ + private int threadCount = 0; + + /** Accumulates any exceptions resulting from the threads run methods. */ + private Collection<Exception> exceptions = new ArrayList<Exception>(); + + /** + * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential + * deadlock may be happening. + */ + private long deadlockTimeout = 1000 * 1000000; + + /** Holds the factory to create test thread with. */ + private ThreadFactory threadFactory; + + /** + * Creates a new test thread coordinator. The number of threads to run must be specified here. + * + * @param numThreads The number of threads to run. + */ + public ThreadTestCoordinator(int numThreads) + { + this.threadCount = numThreads; + + // Create an array big enough to hold all the test threads. + testThreads = new TestRunnable[threadCount]; + + // Use the default thread factory, as none specified. + threadFactory = new DefaultThreadFactory(); + } + + /** + * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be + * specified here. + * + * @param numThreads The number of threads to run. + * @param threadFactory The factory to use to create the test threads. + */ + public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory) + { + this.threadCount = numThreads; + + // Create an array big enough to hold all the test threads. + testThreads = new TestRunnable[threadCount]; + + // Use the specified thread factory. + this.threadFactory = threadFactory; + } + + /** + * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and + * it is up to the caller to do this. + * + * @param runnable The test thread. + * @param id The explicit id to assign to the test thread. + */ + public void addTestThread(TestRunnable runnable, int id) + { + testThreads[id] = runnable; + runnable.setCoordinator(this); + runnable.setId(id); + } + + /** + * Starts all the coordinated threads running. + */ + public void run() + { + // Create the monitors for each thread. + locks = new Object[threadCount]; + + // Create an appropriately sized event queue to allow one event from and to each thread. + allowEvents = new boolean[threadCount][threadCount]; + + // Initialize the monitors and clear the event queues. + for (int i = 0; i < locks.length; i++) + { + locks[i] = new Object(); + + for (int j = 0; j < locks.length; j++) + { + allowEvents[i][j] = false; + } + } + + // Start all the threads running. + for (TestRunnable nextRunnable : testThreads) + { + // Create a Java thread for the test thread. + Thread newThread = threadFactory.newThread(nextRunnable); + nextRunnable.setThread(newThread); + + // Start it running. + newThread.start(); + } + } + + /** + * Waits until all the test threads have completed and returns any accumulated error messages from them. Any + * exceptions thrown by their run methods are also kept at this point. + * + * @return The accumulated error messages from all the threads concatenated together. + */ + public String joinAndRetrieveMessages() + { + // Create an empty error message. + String errorMessage = ""; + + // Join all the test threads. + for (TestRunnable r : testThreads) + { + Thread t = r.getThread(); + + try + { + t.join(); + } + catch (InterruptedException e) + { } + + // Add any accumulated error messages to the return value. + errorMessage += r.getErrorMessage(); + + // Keep any exceptions resulting from the threads run method. + Exception e = r.getException(); + + if (e != null) + { + exceptions.add(e); + } + } + + return errorMessage; + } + + /** + * Reports any accumulated exceptions from the test threads run methods. This method must be called after + * {@link #joinAndRetrieveMessages}. + * + * @return Any accumulated exceptions from the test threads run methods. This method must be called after + */ + public Collection<Exception> getExceptions() + { + return exceptions; + } + + /** + * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send + * them continue events for longer than this timeout then the threads are all terminated. + * + * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks. + * + * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages + * method should throw a PotentialDeadlockException. + */ + public void setDeadlockTimeout(long millis) + { + deadlockTimeout = millis * 1000000; + } + + /** + * Creates a set of 'allow to continue' events on the event queues of the specified threads. + * + * @param threads The set of threads to allow to continue. + * @param callerId The explicit id of the calling test thread. + * @param caller The calling test thread. + */ + void produceAllowEvents(int[] threads, int callerId, TestRunnable caller) + { + // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. + String message = "Thread " + callerId + " is allowing threads [ "; + + for (int j = 0; j < threads.length; j++) + { + message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); + } + + message += " ] to continue."; + log.debug(message); + + // For each allow event, synchronize on the threads lock then set the event flag to true. + for (int id : threads) + { + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[id]) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Send the allow to continue event to the receiving thread. + allowEvents[id][callerId] = true; + } + } + + // Wake up any threads waiting on the coordinator lock to recheck their event queues. + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (coordinatorLock) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + coordinatorLock.notifyAll(); + } + } + + /** + * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some + * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event. + * + * @param threads The set of threads to accept an allow to continue event from. + * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue. + * @param callerId The explicit id of the calling test thread. + * @param caller The calling test thread. + * + * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found + * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions. + */ + boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller) + { + // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. + String message = "Thread " + callerId + " is requesting threads [ "; + + // Record the time at which this method was called. Will be used for breaking out of potential deadlocks. + long startTime = System.nanoTime(); + + for (int j = 0; j < threads.length; j++) + { + message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); + } + + message += " ] to allow it to continue."; + log.debug(message); + + // Loop until an allow to continue event is received. + while (true) + { + // Look at all the allowing thread to see if one has created an event for consumption. + for (int allowerId : threads) + { + // Get the threads lock for the event to consume. + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[callerId]) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Check if there is an event on the queue from the allowing thread to this one. + if (allowEvents[callerId][allowerId]) + { + log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId + + ", to continue."); + + // Consume all the allow events for this thread. + /*for (int i = 0; i < allowEvents[callerId].length; i++) + { + allowEvents[callerId][i] = false; + }*/ + + // Consume just the event from the allower to the consumer, leaving other pending allow events alone. + allowEvents[callerId][allowerId] = false; + + return false; + } + } + } + + // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status + // for the threads being waited on to see if any are blocked on other resources. + if (otherWaitIsAllow) + { + log.debug("Other wait is to be interpreted as an allow event."); + + // Look at all the potential allower threads. + for (int allowerId : threads) + { + // Get the Java thread state for the allowing thread. + Thread threadToTest = testThreads[allowerId].getThread(); + Thread.State state = threadToTest.getState(); + + // Check if the thread is blocked and so a potential candidate for releasing this one. + if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING) + || (state == Thread.State.TIMED_WAITING)) + { + log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating."); + + // Check that the allower thread is not waiting on the coordinator lock or any of the + // individual thread locks. It must be waiting or blocked on another monitor. + TestRunnable allowingRunnable = testThreads[allowerId]; + boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator(); + + if (!isWaitingOnCoordinator) + { + log.debug("The allower thread, id = " + allowerId + + ", is blocked or waiting other than on the coordinator."); + + // Get the threads lock for the event to consume. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[callerId]) + { + caller.setWaitingOnCoordinator(false); + + // Consume all the allow events for this thread. + for (int i = 0; i < allowEvents[callerId].length; i++) + { + allowEvents[callerId][i] = false; + } + + return true; + } + } + else + { + log.debug("The waiting allower thread, " + allowerId + + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue."); + } + } + } + } + + // Keep waiting until an 'allow to continue' event can be consumed. + try + { + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (coordinatorLock) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events."); + + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + coordinatorLock.wait(10); + } + } + catch (InterruptedException e) + { } + + // Release the waiting on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible + // deadlock exception if so. + long waitTime = System.nanoTime() - startTime; + log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds."); + + if (waitTime > deadlockTimeout) + { + // Throw a possible deadlock exception. + throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this); + } + + log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available."); + } + } + + /** + * Pretty prints the state of the thread test coordinator, for debugging purposes. + * + * @return Pretty printed state of the thread test coordinator. + */ + public String toString() + { + String result = "["; + + for (int i = 0; i < allowEvents.length; i++) + { + for (int j = 0; j < allowEvents[i].length; j++) + { + result += allowEvents[i][j]; + + result += (j < (allowEvents[i].length - 1)) ? ", " : ""; + } + + result += (i < (allowEvents.length - 1)) ? ",\n " : ""; + } + + result += "]"; + + for (int i = 0; i < testThreads.length; i++) + { + result += "thread[" + i + "] = " + testThreads[i].toString(); + } + + return result; + } + +} |