diff options
Diffstat (limited to 'qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency')
6 files changed, 992 insertions, 0 deletions
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java new file mode 100644 index 0000000000..8fb0a6a90e --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +import java.util.concurrent.ThreadFactory; + +/** + * Implements a default thread factory. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create default threads with no specialization. + * </table> + * + * @author Rupert Smith + */ +public class DefaultThreadFactory implements ThreadFactory +{ + /** + * Constructs a new <tt>Thread</tt>. + * + * @param r A runnable to be executed by new thread instance. + * + * @return The constructed thread. + */ + public Thread newThread(Runnable r) + { + return new Thread(r); + } +} diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java new file mode 100644 index 0000000000..3bbfc2d502 --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +/** + * PossibleDeadlockException is used to signal that two test threads being executed by a {@link ThreadTestCoordinator} + * may be in a state of deadlock because they are mutually blocking each other or one is waiting on the other and the + * other has been blocked elsewhere for longer than a specified timeout. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Signal a possible state of deadlock between coordinated test threads. + * </table> + * + * @author Rupert Smith + */ +public class PossibleDeadlockException extends RuntimeException +{ + /** + * Create a new possible deadlock execption. + * + * @param message The exception message. + */ + public PossibleDeadlockException(String message) + { + super(message); + } +} diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java new file mode 100644 index 0000000000..02e776a4ea --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +/** + * TestRunnable is an extension of java.util.Runnable that adds some features to make it easier to coordinate the + * activities of threads in such a way as to expose bugs in multi threaded code. + * + * <p/>Sometimes several threads will run in a particular order so that a bug is not revealed. Other times the ordering + * of the threads will expose a bug. Such bugs can be hard to replicate as the exact execution ordering of threads is not + * usually controlled. This class adds some methods that allow threads to synchronize other threads, either allowing them + * to run, or waiting for them to allow this thread to run. It also provides convenience methods to gather error messages + * and exceptions from threads, which will often be reported in unit testing code. + * + * <p/>Coordination between threads is handled by the {@link ThreadTestCoordinator}. It is called through the convenience + * methods {@link #allow} and {@link #waitFor}. Threads to be coordinated must be set up with the coordinator and assigned + * integer ids. It is then possible to call the coordinator with an array of thread ids requesting that those threads + * be allowed to continue, or to wait until one of them allows this thread to continue. The otherwise non-deterministic + * execution order of threads can be controlled into a carefully determined sequence using these methods in order + * to reproduce race conditions, dead locks, live locks, dirty reads, phantom reads, non repeatable reads and so on. + * + * <p/>When waiting for another thread to give a signal to continue it is sometimes the case that the other thread has + * become blocked by the code under test. For example in testing for a dirty read (for example in database code), + * thread 1 lets thread 2 perform a write but not commit it, then thread 2 lets thread 1 run and attempt to perform a + * dirty read on its uncommitted write. Transaction synchronization code being tested against the possibility of a dirty + * write may make use of snapshots in which case both threads should be able to read and write without blocking. It may + * make use of explicit keys in which case thread 2 may become blocked on its write attempt because thread 1 holds a + * read lock and it must wait until thread 1 completes its transaction before it can acquire this lock. The + * {@link #waitFor} method accepts a boolean parameter to indicate that threads being blocked (other than on the + * coordinator) can be interpreted the same as if the thread explicitly allows the thread calling waitFor to continue. + * Using this technique a dirty read test could be written that works against either the snapshot or the locking + * implementation, allowing both approaches to pass the test yet arranging for multiple threads to run against the + * implementation in such a way that a potential dirty read bug is exposed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Wait for another thread to allow this one to continue. + * <tr><td> Allow another thread to continue. + * <tr><td> Accumulate error messages. + * <tr><td> Record exceptions from thread run. + * <tr><td> Maintain link to thread coordinator. + * <tr><td> Explicitly mark a thread with an integer id. + * <tr><td> Maintian a flag to indicate whether or not this thread is waiting on the coordinator. + * </table> + * + * @todo The allow then waitFor operations are very often used as a pair. So create a method allowAndWait that combines + * them into a single method call. + * + * @author Rupert Smith + */ +public abstract class TestRunnable implements Runnable +{ + /** Holds a reference to the thread coordinator. */ + private ThreadTestCoordinator coordinator; + + /** Holds the explicit integer id of this thread. */ + private int id; + + /** Used to indicate that this thread is waiting on the coordinator and not elsewhere. */ + private boolean waitingOnCoordinator = false; + + /** Used to accumulate error messsages. */ + private String errorMessage = ""; + + /** Holds the Java thread object that this is running under. */ + private Thread thisThread; + + /** Used to hold any exceptions resulting from the run method. */ + private Exception runException = null; + + /** + * Implementations override this to perform coordinated thread sequencing. + * + * @throws Exception Any exception raised by the implementation will be caught by the default {@link #run()} + * implementation for later querying by the {@link #getException()} method. + */ + public abstract void runWithExceptions() throws Exception; + + /** + * Provides a default implementation of the run method that allows exceptions to be thrown and keeps a record + * of those exceptions. Defers to the {@link #runWithExceptions()} method to provide the thread body implementation + * and catches any exceptions thrown by it. + */ + public void run() + { + try + { + runWithExceptions(); + } + catch (Exception e) + { + this.runException = e; + } + } + + /** + * Attempt to consume an allow event from one of the specified threads and blocks until such an event occurrs. + * + * @param threads The set of threads that can allow this one to continue. + * @param otherWaitIsAllow If set to <tt>true</tt> if the threads being waited on are blocked other than on + * the coordinator itself then this is to be interpreted as allowing this thread to + * continue. + * + * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found + * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions. + */ + protected boolean waitFor(int[] threads, boolean otherWaitIsAllow) + { + return coordinator.consumeAllowEvent(threads, otherWaitIsAllow, id, this); + } + + /** + * Produces allow events on each of the specified threads. + * + * @param threads The set of threads that are to be allowed to continue. + */ + protected void allow(int[] threads) + { + coordinator.produceAllowEvents(threads, id, this); + } + + /** + * Keeps the error message for later reporting by the coordinator. + * + * @param message The error message to keep. + */ + protected void addErrorMessage(String message) + { + errorMessage += message; + } + + /** + * Sets the coordinator for this thread. + * + * @param coordinator The coordinator for this thread. + */ + void setCoordinator(ThreadTestCoordinator coordinator) + { + this.coordinator = coordinator; + } + + /** + * Reports whether or not this thread is waiting on the coordinator. + * + * @return <tt>If this thread is waiting on the coordinator. + */ + boolean isWaitingOnCoordinator() + { + return waitingOnCoordinator; + } + + /** + * Sets the value of the waiting on coordinator flag. + * + * @param waiting The value of the waiting on coordinator flag. + */ + void setWaitingOnCoordinator(boolean waiting) + { + waitingOnCoordinator = waiting; + } + + /** + * Sets up the explicit int id for this thread. + * + * @param id The integer id. + */ + void setId(int id) + { + this.id = id; + } + + /** + * Reports any accumulated error messages. + * + * @return Any accumulated error messages. + */ + String getErrorMessage() + { + return errorMessage; + } + + /** + * Reports any exception thrown by the {@link #runWithExceptions} method. + * + * @return Any exception thrown by the {@link #runWithExceptions} method. + */ + Exception getException() + { + return runException; + } + + /** + * Sets the Java thread under which this runs. + * + * @param thread The Java thread under which this runs. + */ + void setThread(Thread thread) + { + thisThread = thread; + } + + /** + * Gets the Java thread under which this runs. + * + * @return The Java thread under which this runs. + */ + Thread getThread() + { + return thisThread; + } + + /** + * Provides a string summary of this test threads status. + * + * @return Summarizes this threads status. + */ + public String toString() + { + return "id = " + id + ", waitingOnCoordinator = " + waitingOnCoordinator; + } +} diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java new file mode 100644 index 0000000000..3cf8543656 --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java @@ -0,0 +1,486 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ThreadFactory; + +/** + * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send + * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch + * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one + * sender the latches for all senders for a are cleared. This class is always used in conjunction with + * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce + * concurrency bugs. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept test threads to coordinate. + * <tr><td> Allow test threads to send 'allow to continue' signals. + * <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals. + * <tr><td> Report error messages from test threads. + * <tr><td> Report exceptions from test threads. + * <tr><td> Provide method to wait until all test threads have completed. + * </table> + * + * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has + * proved extremely usefull. Some documentation for this needs to be written to explain it better. + * + * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for + * each other, they are deadlocked and there is something wrong with the test code that put them in that + * situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some + * external event. A timeout should be used. Timeout is already implemented, just need to sanity check how + * this is working and document it. + * + * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I + * guess a longish timeout is the only thing that can be done for that. + * + * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can + * only insert synchronization points between method calls it makes. So this code will not be usefull for + * checking sequences of events within methods, unless the code under test is explicitly instrumented for it. + * It might be possible to instrument code by using labels, and then use the debugger/profiler interface to + * put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels + * can be left in the code, without altering its behaviour. + * + * @author Rupert Smith + */ +public class ThreadTestCoordinator +{ + /** Used for logging. */ + private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class); + + /** Keeps track of the test threads by their ids. */ + private TestRunnable[] testThreads; // = new TestRunnable[2]; + + /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */ + private final Object coordinatorLock = new Object(); + + /** A set of monitors for each test thread. */ + private Object[] locks; + + /** The binary latch array, this is always a square array allowing one event from and to every thread. */ + private boolean[][] allowEvents; + + /** Keeps track of the number of threads being coordinated. */ + private int threadCount = 0; + + /** Accumulates any exceptions resulting from the threads run methods. */ + private Collection<Exception> exceptions = new ArrayList<Exception>(); + + /** + * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential + * deadlock may be happening. + */ + private long deadlockTimeout = 1000 * 1000000; + + /** Holds the factory to create test thread with. */ + private ThreadFactory threadFactory; + + /** + * Creates a new test thread coordinator. The number of threads to run must be specified here. + * + * @param numThreads The number of threads to run. + */ + public ThreadTestCoordinator(int numThreads) + { + this.threadCount = numThreads; + + // Create an array big enough to hold all the test threads. + testThreads = new TestRunnable[threadCount]; + + // Use the default thread factory, as none specified. + threadFactory = new DefaultThreadFactory(); + } + + /** + * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be + * specified here. + * + * @param numThreads The number of threads to run. + * @param threadFactory The factory to use to create the test threads. + */ + public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory) + { + this.threadCount = numThreads; + + // Create an array big enough to hold all the test threads. + testThreads = new TestRunnable[threadCount]; + + // Use the specified thread factory. + this.threadFactory = threadFactory; + } + + /** + * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and + * it is up to the caller to do this. + * + * @param runnable The test thread. + * @param id The explicit id to assign to the test thread. + */ + public void addTestThread(TestRunnable runnable, int id) + { + testThreads[id] = runnable; + runnable.setCoordinator(this); + runnable.setId(id); + } + + /** + * Starts all the coordinated threads running. + */ + public void run() + { + // Create the monitors for each thread. + locks = new Object[threadCount]; + + // Create an appropriately sized event queue to allow one event from and to each thread. + allowEvents = new boolean[threadCount][threadCount]; + + // Initialize the monitors and clear the event queues. + for (int i = 0; i < locks.length; i++) + { + locks[i] = new Object(); + + for (int j = 0; j < locks.length; j++) + { + allowEvents[i][j] = false; + } + } + + // Start all the threads running. + for (TestRunnable nextRunnable : testThreads) + { + // Create a Java thread for the test thread. + Thread newThread = threadFactory.newThread(nextRunnable); + nextRunnable.setThread(newThread); + + // Start it running. + newThread.start(); + } + } + + /** + * Waits until all the test threads have completed and returns any accumulated error messages from them. Any + * exceptions thrown by their run methods are also kept at this point. + * + * @return The accumulated error messages from all the threads concatenated together. + */ + public String joinAndRetrieveMessages() + { + // Create an empty error message. + String errorMessage = ""; + + // Join all the test threads. + for (TestRunnable r : testThreads) + { + Thread t = r.getThread(); + + try + { + t.join(); + } + catch (InterruptedException e) + { } + + // Add any accumulated error messages to the return value. + errorMessage += r.getErrorMessage(); + + // Keep any exceptions resulting from the threads run method. + Exception e = r.getException(); + + if (e != null) + { + exceptions.add(e); + } + } + + return errorMessage; + } + + /** + * Reports any accumulated exceptions from the test threads run methods. This method must be called after + * {@link #joinAndRetrieveMessages}. + * + * @return Any accumulated exceptions from the test threads run methods. This method must be called after + */ + public Collection<Exception> getExceptions() + { + return exceptions; + } + + /** + * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send + * them continue events for longer than this timeout then the threads are all terminated. + * + * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks. + * + * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages + * method should throw a PotentialDeadlockException. + */ + public void setDeadlockTimeout(long millis) + { + deadlockTimeout = millis * 1000000; + } + + /** + * Creates a set of 'allow to continue' events on the event queues of the specified threads. + * + * @param threads The set of threads to allow to continue. + * @param callerId The explicit id of the calling test thread. + * @param caller The calling test thread. + */ + void produceAllowEvents(int[] threads, int callerId, TestRunnable caller) + { + // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. + String message = "Thread " + callerId + " is allowing threads [ "; + + for (int j = 0; j < threads.length; j++) + { + message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); + } + + message += " ] to continue."; + log.debug(message); + + // For each allow event, synchronize on the threads lock then set the event flag to true. + for (int id : threads) + { + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[id]) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Send the allow to continue event to the receiving thread. + allowEvents[id][callerId] = true; + } + } + + // Wake up any threads waiting on the coordinator lock to recheck their event queues. + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (coordinatorLock) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + coordinatorLock.notifyAll(); + } + } + + /** + * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some + * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event. + * + * @param threads The set of threads to accept an allow to continue event from. + * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue. + * @param callerId The explicit id of the calling test thread. + * @param caller The calling test thread. + * + * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found + * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions. + */ + boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller) + { + // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. + String message = "Thread " + callerId + " is requesting threads [ "; + + // Record the time at which this method was called. Will be used for breaking out of potential deadlocks. + long startTime = System.nanoTime(); + + for (int j = 0; j < threads.length; j++) + { + message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); + } + + message += " ] to allow it to continue."; + log.debug(message); + + // Loop until an allow to continue event is received. + while (true) + { + // Look at all the allowing thread to see if one has created an event for consumption. + for (int allowerId : threads) + { + // Get the threads lock for the event to consume. + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[callerId]) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Check if there is an event on the queue from the allowing thread to this one. + if (allowEvents[callerId][allowerId]) + { + log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId + + ", to continue."); + + // Consume all the allow events for this thread. + /*for (int i = 0; i < allowEvents[callerId].length; i++) + { + allowEvents[callerId][i] = false; + }*/ + + // Consume just the event from the allower to the consumer, leaving other pending allow events alone. + allowEvents[callerId][allowerId] = false; + + return false; + } + } + } + + // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status + // for the threads being waited on to see if any are blocked on other resources. + if (otherWaitIsAllow) + { + log.debug("Other wait is to be interpreted as an allow event."); + + // Look at all the potential allower threads. + for (int allowerId : threads) + { + // Get the Java thread state for the allowing thread. + Thread threadToTest = testThreads[allowerId].getThread(); + Thread.State state = threadToTest.getState(); + + // Check if the thread is blocked and so a potential candidate for releasing this one. + if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING) + || (state == Thread.State.TIMED_WAITING)) + { + log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating."); + + // Check that the allower thread is not waiting on the coordinator lock or any of the + // individual thread locks. It must be waiting or blocked on another monitor. + TestRunnable allowingRunnable = testThreads[allowerId]; + boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator(); + + if (!isWaitingOnCoordinator) + { + log.debug("The allower thread, id = " + allowerId + + ", is blocked or waiting other than on the coordinator."); + + // Get the threads lock for the event to consume. + caller.setWaitingOnCoordinator(true); + + synchronized (locks[callerId]) + { + caller.setWaitingOnCoordinator(false); + + // Consume all the allow events for this thread. + for (int i = 0; i < allowEvents[callerId].length; i++) + { + allowEvents[callerId][i] = false; + } + + return true; + } + } + else + { + log.debug("The waiting allower thread, " + allowerId + + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue."); + } + } + } + } + + // Keep waiting until an 'allow to continue' event can be consumed. + try + { + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + + synchronized (coordinatorLock) + { + // Release the wating on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events."); + + // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for + // being blocked at this time. + caller.setWaitingOnCoordinator(true); + coordinatorLock.wait(10); + } + } + catch (InterruptedException e) + { } + + // Release the waiting on coordinator flag now that this thread is running again. + caller.setWaitingOnCoordinator(false); + + // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible + // deadlock exception if so. + long waitTime = System.nanoTime() - startTime; + log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds."); + + if (waitTime > deadlockTimeout) + { + // Throw a possible deadlock exception. + throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this); + } + + log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available."); + } + } + + /** + * Pretty prints the state of the thread test coordinator, for debugging purposes. + * + * @return Pretty printed state of the thread test coordinator. + */ + public String toString() + { + String result = "["; + + for (int i = 0; i < allowEvents.length; i++) + { + for (int j = 0; j < allowEvents[i].length; j++) + { + result += allowEvents[i][j]; + + result += (j < (allowEvents[i].length - 1)) ? ", " : ""; + } + + result += (i < (allowEvents.length - 1)) ? ",\n " : ""; + } + + result += "]"; + + for (int i = 0; i < testThreads.length; i++) + { + result += "thread[" + i + "] = " + testThreads[i].toString(); + } + + return result; + } + +} diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java new file mode 100644 index 0000000000..b9865f2e22 --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.junit.concurrency; + +import org.apache.log4j.Logger; + +/** + * An example to illustrate the use of the {@link ThreadTestCoordinator} and {@link TestRunnable}s. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Demo multi-threaded testing. + * </table> + * + * @author Rupert Smith + */ +public class ThreadTestExample +{ + /** Used for logging. */ + private static final Logger log = Logger.getLogger(ThreadTestExample.class); + + /** Test thread 1. */ + TestRunnable testThread1 = + new TestRunnable() + { + public void runWithExceptions() throws Exception + { + log.debug("public void run(): called"); + log.info("in testThread0, block 1"); + + // Wait for t2 to allow t1 to continue. + allow(new int[] { 1 }); + waitFor(new int[] { 1 }, false); + + log.info("in testThread0, block 2"); + + // Wait for t2 to allow t1 to continue. T2 is allowed to be blocked elsewhere than giving explicit + // permission to allow t1 to continue. + allow(new int[] { 1 }); + waitFor(new int[] { 1 }, true); + + log.info("in testThread0, block 3"); + + // Release thread 2 from waiting on the shared lock. + synchronized (sharedLock) + { + sharedLock.notifyAll(); + } + + allow(new int[] { 1 }); + } + }; + + /** A shared lock between the test threads. */ + final Object sharedLock = new Object(); + + /** Test thread 2. */ + TestRunnable testThread2 = + new TestRunnable() + { + public void runWithExceptions() throws Exception + { + log.debug("public void run(): called"); + log.info("in testThread1, block 1"); + + // Wait for t1 to allow t2 to continue. + allow(new int[] { 0 }); + waitFor(new int[] { 0 }, false); + + log.info("in testThread1, block 2"); + + // Wait on another resource. T1 should accept this as permission to continue. + try + { + synchronized (sharedLock) + { + log.debug("in testThread1, waiting on shared lock."); + sharedLock.wait(); + } + } + catch (InterruptedException e) + { + // Bail-out with a runtime if this happens. + throw new RuntimeException("Interrupted whilst waiting for shared lock.", e); + } + + log.info("in testThread1, finished waiting on shared lock."); + + // allow(new int[] { 0 }); + + // Wait for t1 to allow t2 to continue. + waitFor(new int[] { 0 }, false); + + log.info("in testThread1, block 3"); + + allow(new int[] { 0 }); + } + }; + + /** + * Executes the test threads with coordination. + * + * @param args Ignored. + */ + public void main(String[] args) + { + ThreadTestCoordinator tt = new ThreadTestCoordinator(2); + + tt.addTestThread(testThread1, 0); + tt.addTestThread(testThread2, 1); + tt.setDeadlockTimeout(500); + tt.run(); + + String errorMessage = tt.joinAndRetrieveMessages(); + + // Print any error messages or exceptions. + log.info(errorMessage); + + if (!tt.getExceptions().isEmpty()) + { + for (Exception e : tt.getExceptions()) + { + log.warn("Exception thrown during test thread: ", e); + } + } + } +} diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html new file mode 100644 index 0000000000..904fd0fd05 --- /dev/null +++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html @@ -0,0 +1,28 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<html> +<body> +Contains code to assist in testing concurrency issues using coordinated threads to present code under test with +oportunities to expose concurrency bugs. Some example concurrency bugs that may be tested using these techniques are +race conditions, dead locks, live locks, dirty reads, phantom reads, non repeatable reads and so on. +</body> +</html> |