summaryrefslogtreecommitdiff
path: root/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency')
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java48
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java46
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java239
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java486
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java145
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html28
6 files changed, 992 insertions, 0 deletions
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java
new file mode 100644
index 0000000000..8fb0a6a90e
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/DefaultThreadFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Implements a default thread factory.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create default threads with no specialization.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class DefaultThreadFactory implements ThreadFactory
+{
+ /**
+ * Constructs a new <tt>Thread</tt>.
+ *
+ * @param r A runnable to be executed by new thread instance.
+ *
+ * @return The constructed thread.
+ */
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+}
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java
new file mode 100644
index 0000000000..3bbfc2d502
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/PossibleDeadlockException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+/**
+ * PossibleDeadlockException is used to signal that two test threads being executed by a {@link ThreadTestCoordinator}
+ * may be in a state of deadlock because they are mutually blocking each other or one is waiting on the other and the
+ * other has been blocked elsewhere for longer than a specified timeout.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Signal a possible state of deadlock between coordinated test threads.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PossibleDeadlockException extends RuntimeException
+{
+ /**
+ * Create a new possible deadlock execption.
+ *
+ * @param message The exception message.
+ */
+ public PossibleDeadlockException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java
new file mode 100644
index 0000000000..02e776a4ea
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/TestRunnable.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+/**
+ * TestRunnable is an extension of java.util.Runnable that adds some features to make it easier to coordinate the
+ * activities of threads in such a way as to expose bugs in multi threaded code.
+ *
+ * <p/>Sometimes several threads will run in a particular order so that a bug is not revealed. Other times the ordering
+ * of the threads will expose a bug. Such bugs can be hard to replicate as the exact execution ordering of threads is not
+ * usually controlled. This class adds some methods that allow threads to synchronize other threads, either allowing them
+ * to run, or waiting for them to allow this thread to run. It also provides convenience methods to gather error messages
+ * and exceptions from threads, which will often be reported in unit testing code.
+ *
+ * <p/>Coordination between threads is handled by the {@link ThreadTestCoordinator}. It is called through the convenience
+ * methods {@link #allow} and {@link #waitFor}. Threads to be coordinated must be set up with the coordinator and assigned
+ * integer ids. It is then possible to call the coordinator with an array of thread ids requesting that those threads
+ * be allowed to continue, or to wait until one of them allows this thread to continue. The otherwise non-deterministic
+ * execution order of threads can be controlled into a carefully determined sequence using these methods in order
+ * to reproduce race conditions, dead locks, live locks, dirty reads, phantom reads, non repeatable reads and so on.
+ *
+ * <p/>When waiting for another thread to give a signal to continue it is sometimes the case that the other thread has
+ * become blocked by the code under test. For example in testing for a dirty read (for example in database code),
+ * thread 1 lets thread 2 perform a write but not commit it, then thread 2 lets thread 1 run and attempt to perform a
+ * dirty read on its uncommitted write. Transaction synchronization code being tested against the possibility of a dirty
+ * write may make use of snapshots in which case both threads should be able to read and write without blocking. It may
+ * make use of explicit keys in which case thread 2 may become blocked on its write attempt because thread 1 holds a
+ * read lock and it must wait until thread 1 completes its transaction before it can acquire this lock. The
+ * {@link #waitFor} method accepts a boolean parameter to indicate that threads being blocked (other than on the
+ * coordinator) can be interpreted the same as if the thread explicitly allows the thread calling waitFor to continue.
+ * Using this technique a dirty read test could be written that works against either the snapshot or the locking
+ * implementation, allowing both approaches to pass the test yet arranging for multiple threads to run against the
+ * implementation in such a way that a potential dirty read bug is exposed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Wait for another thread to allow this one to continue.
+ * <tr><td> Allow another thread to continue.
+ * <tr><td> Accumulate error messages.
+ * <tr><td> Record exceptions from thread run.
+ * <tr><td> Maintain link to thread coordinator.
+ * <tr><td> Explicitly mark a thread with an integer id.
+ * <tr><td> Maintian a flag to indicate whether or not this thread is waiting on the coordinator.
+ * </table>
+ *
+ * @todo The allow then waitFor operations are very often used as a pair. So create a method allowAndWait that combines
+ * them into a single method call.
+ *
+ * @author Rupert Smith
+ */
+public abstract class TestRunnable implements Runnable
+{
+ /** Holds a reference to the thread coordinator. */
+ private ThreadTestCoordinator coordinator;
+
+ /** Holds the explicit integer id of this thread. */
+ private int id;
+
+ /** Used to indicate that this thread is waiting on the coordinator and not elsewhere. */
+ private boolean waitingOnCoordinator = false;
+
+ /** Used to accumulate error messsages. */
+ private String errorMessage = "";
+
+ /** Holds the Java thread object that this is running under. */
+ private Thread thisThread;
+
+ /** Used to hold any exceptions resulting from the run method. */
+ private Exception runException = null;
+
+ /**
+ * Implementations override this to perform coordinated thread sequencing.
+ *
+ * @throws Exception Any exception raised by the implementation will be caught by the default {@link #run()}
+ * implementation for later querying by the {@link #getException()} method.
+ */
+ public abstract void runWithExceptions() throws Exception;
+
+ /**
+ * Provides a default implementation of the run method that allows exceptions to be thrown and keeps a record
+ * of those exceptions. Defers to the {@link #runWithExceptions()} method to provide the thread body implementation
+ * and catches any exceptions thrown by it.
+ */
+ public void run()
+ {
+ try
+ {
+ runWithExceptions();
+ }
+ catch (Exception e)
+ {
+ this.runException = e;
+ }
+ }
+
+ /**
+ * Attempt to consume an allow event from one of the specified threads and blocks until such an event occurrs.
+ *
+ * @param threads The set of threads that can allow this one to continue.
+ * @param otherWaitIsAllow If set to <tt>true</tt> if the threads being waited on are blocked other than on
+ * the coordinator itself then this is to be interpreted as allowing this thread to
+ * continue.
+ *
+ * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found
+ * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions.
+ */
+ protected boolean waitFor(int[] threads, boolean otherWaitIsAllow)
+ {
+ return coordinator.consumeAllowEvent(threads, otherWaitIsAllow, id, this);
+ }
+
+ /**
+ * Produces allow events on each of the specified threads.
+ *
+ * @param threads The set of threads that are to be allowed to continue.
+ */
+ protected void allow(int[] threads)
+ {
+ coordinator.produceAllowEvents(threads, id, this);
+ }
+
+ /**
+ * Keeps the error message for later reporting by the coordinator.
+ *
+ * @param message The error message to keep.
+ */
+ protected void addErrorMessage(String message)
+ {
+ errorMessage += message;
+ }
+
+ /**
+ * Sets the coordinator for this thread.
+ *
+ * @param coordinator The coordinator for this thread.
+ */
+ void setCoordinator(ThreadTestCoordinator coordinator)
+ {
+ this.coordinator = coordinator;
+ }
+
+ /**
+ * Reports whether or not this thread is waiting on the coordinator.
+ *
+ * @return <tt>If this thread is waiting on the coordinator.
+ */
+ boolean isWaitingOnCoordinator()
+ {
+ return waitingOnCoordinator;
+ }
+
+ /**
+ * Sets the value of the waiting on coordinator flag.
+ *
+ * @param waiting The value of the waiting on coordinator flag.
+ */
+ void setWaitingOnCoordinator(boolean waiting)
+ {
+ waitingOnCoordinator = waiting;
+ }
+
+ /**
+ * Sets up the explicit int id for this thread.
+ *
+ * @param id The integer id.
+ */
+ void setId(int id)
+ {
+ this.id = id;
+ }
+
+ /**
+ * Reports any accumulated error messages.
+ *
+ * @return Any accumulated error messages.
+ */
+ String getErrorMessage()
+ {
+ return errorMessage;
+ }
+
+ /**
+ * Reports any exception thrown by the {@link #runWithExceptions} method.
+ *
+ * @return Any exception thrown by the {@link #runWithExceptions} method.
+ */
+ Exception getException()
+ {
+ return runException;
+ }
+
+ /**
+ * Sets the Java thread under which this runs.
+ *
+ * @param thread The Java thread under which this runs.
+ */
+ void setThread(Thread thread)
+ {
+ thisThread = thread;
+ }
+
+ /**
+ * Gets the Java thread under which this runs.
+ *
+ * @return The Java thread under which this runs.
+ */
+ Thread getThread()
+ {
+ return thisThread;
+ }
+
+ /**
+ * Provides a string summary of this test threads status.
+ *
+ * @return Summarizes this threads status.
+ */
+ public String toString()
+ {
+ return "id = " + id + ", waitingOnCoordinator = " + waitingOnCoordinator;
+ }
+}
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
new file mode 100644
index 0000000000..3cf8543656
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
@@ -0,0 +1,486 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send
+ * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch
+ * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one
+ * sender the latches for all senders for a are cleared. This class is always used in conjunction with
+ * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce
+ * concurrency bugs.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept test threads to coordinate.
+ * <tr><td> Allow test threads to send 'allow to continue' signals.
+ * <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals.
+ * <tr><td> Report error messages from test threads.
+ * <tr><td> Report exceptions from test threads.
+ * <tr><td> Provide method to wait until all test threads have completed.
+ * </table>
+ *
+ * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has
+ * proved extremely usefull. Some documentation for this needs to be written to explain it better.
+ *
+ * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for
+ * each other, they are deadlocked and there is something wrong with the test code that put them in that
+ * situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some
+ * external event. A timeout should be used. Timeout is already implemented, just need to sanity check how
+ * this is working and document it.
+ *
+ * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I
+ * guess a longish timeout is the only thing that can be done for that.
+ *
+ * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can
+ * only insert synchronization points between method calls it makes. So this code will not be usefull for
+ * checking sequences of events within methods, unless the code under test is explicitly instrumented for it.
+ * It might be possible to instrument code by using labels, and then use the debugger/profiler interface to
+ * put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels
+ * can be left in the code, without altering its behaviour.
+ *
+ * @author Rupert Smith
+ */
+public class ThreadTestCoordinator
+{
+ /** Used for logging. */
+ private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class);
+
+ /** Keeps track of the test threads by their ids. */
+ private TestRunnable[] testThreads; // = new TestRunnable[2];
+
+ /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */
+ private final Object coordinatorLock = new Object();
+
+ /** A set of monitors for each test thread. */
+ private Object[] locks;
+
+ /** The binary latch array, this is always a square array allowing one event from and to every thread. */
+ private boolean[][] allowEvents;
+
+ /** Keeps track of the number of threads being coordinated. */
+ private int threadCount = 0;
+
+ /** Accumulates any exceptions resulting from the threads run methods. */
+ private Collection<Exception> exceptions = new ArrayList<Exception>();
+
+ /**
+ * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential
+ * deadlock may be happening.
+ */
+ private long deadlockTimeout = 1000 * 1000000;
+
+ /** Holds the factory to create test thread with. */
+ private ThreadFactory threadFactory;
+
+ /**
+ * Creates a new test thread coordinator. The number of threads to run must be specified here.
+ *
+ * @param numThreads The number of threads to run.
+ */
+ public ThreadTestCoordinator(int numThreads)
+ {
+ this.threadCount = numThreads;
+
+ // Create an array big enough to hold all the test threads.
+ testThreads = new TestRunnable[threadCount];
+
+ // Use the default thread factory, as none specified.
+ threadFactory = new DefaultThreadFactory();
+ }
+
+ /**
+ * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be
+ * specified here.
+ *
+ * @param numThreads The number of threads to run.
+ * @param threadFactory The factory to use to create the test threads.
+ */
+ public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory)
+ {
+ this.threadCount = numThreads;
+
+ // Create an array big enough to hold all the test threads.
+ testThreads = new TestRunnable[threadCount];
+
+ // Use the specified thread factory.
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and
+ * it is up to the caller to do this.
+ *
+ * @param runnable The test thread.
+ * @param id The explicit id to assign to the test thread.
+ */
+ public void addTestThread(TestRunnable runnable, int id)
+ {
+ testThreads[id] = runnable;
+ runnable.setCoordinator(this);
+ runnable.setId(id);
+ }
+
+ /**
+ * Starts all the coordinated threads running.
+ */
+ public void run()
+ {
+ // Create the monitors for each thread.
+ locks = new Object[threadCount];
+
+ // Create an appropriately sized event queue to allow one event from and to each thread.
+ allowEvents = new boolean[threadCount][threadCount];
+
+ // Initialize the monitors and clear the event queues.
+ for (int i = 0; i < locks.length; i++)
+ {
+ locks[i] = new Object();
+
+ for (int j = 0; j < locks.length; j++)
+ {
+ allowEvents[i][j] = false;
+ }
+ }
+
+ // Start all the threads running.
+ for (TestRunnable nextRunnable : testThreads)
+ {
+ // Create a Java thread for the test thread.
+ Thread newThread = threadFactory.newThread(nextRunnable);
+ nextRunnable.setThread(newThread);
+
+ // Start it running.
+ newThread.start();
+ }
+ }
+
+ /**
+ * Waits until all the test threads have completed and returns any accumulated error messages from them. Any
+ * exceptions thrown by their run methods are also kept at this point.
+ *
+ * @return The accumulated error messages from all the threads concatenated together.
+ */
+ public String joinAndRetrieveMessages()
+ {
+ // Create an empty error message.
+ String errorMessage = "";
+
+ // Join all the test threads.
+ for (TestRunnable r : testThreads)
+ {
+ Thread t = r.getThread();
+
+ try
+ {
+ t.join();
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Add any accumulated error messages to the return value.
+ errorMessage += r.getErrorMessage();
+
+ // Keep any exceptions resulting from the threads run method.
+ Exception e = r.getException();
+
+ if (e != null)
+ {
+ exceptions.add(e);
+ }
+ }
+
+ return errorMessage;
+ }
+
+ /**
+ * Reports any accumulated exceptions from the test threads run methods. This method must be called after
+ * {@link #joinAndRetrieveMessages}.
+ *
+ * @return Any accumulated exceptions from the test threads run methods. This method must be called after
+ */
+ public Collection<Exception> getExceptions()
+ {
+ return exceptions;
+ }
+
+ /**
+ * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send
+ * them continue events for longer than this timeout then the threads are all terminated.
+ *
+ * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks.
+ *
+ * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages
+ * method should throw a PotentialDeadlockException.
+ */
+ public void setDeadlockTimeout(long millis)
+ {
+ deadlockTimeout = millis * 1000000;
+ }
+
+ /**
+ * Creates a set of 'allow to continue' events on the event queues of the specified threads.
+ *
+ * @param threads The set of threads to allow to continue.
+ * @param callerId The explicit id of the calling test thread.
+ * @param caller The calling test thread.
+ */
+ void produceAllowEvents(int[] threads, int callerId, TestRunnable caller)
+ {
+ // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
+ String message = "Thread " + callerId + " is allowing threads [ ";
+
+ for (int j = 0; j < threads.length; j++)
+ {
+ message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
+ }
+
+ message += " ] to continue.";
+ log.debug(message);
+
+ // For each allow event, synchronize on the threads lock then set the event flag to true.
+ for (int id : threads)
+ {
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[id])
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Send the allow to continue event to the receiving thread.
+ allowEvents[id][callerId] = true;
+ }
+ }
+
+ // Wake up any threads waiting on the coordinator lock to recheck their event queues.
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (coordinatorLock)
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+ coordinatorLock.notifyAll();
+ }
+ }
+
+ /**
+ * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some
+ * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event.
+ *
+ * @param threads The set of threads to accept an allow to continue event from.
+ * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue.
+ * @param callerId The explicit id of the calling test thread.
+ * @param caller The calling test thread.
+ *
+ * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found
+ * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions.
+ */
+ boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller)
+ {
+ // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
+ String message = "Thread " + callerId + " is requesting threads [ ";
+
+ // Record the time at which this method was called. Will be used for breaking out of potential deadlocks.
+ long startTime = System.nanoTime();
+
+ for (int j = 0; j < threads.length; j++)
+ {
+ message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
+ }
+
+ message += " ] to allow it to continue.";
+ log.debug(message);
+
+ // Loop until an allow to continue event is received.
+ while (true)
+ {
+ // Look at all the allowing thread to see if one has created an event for consumption.
+ for (int allowerId : threads)
+ {
+ // Get the threads lock for the event to consume.
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[callerId])
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Check if there is an event on the queue from the allowing thread to this one.
+ if (allowEvents[callerId][allowerId])
+ {
+ log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId
+ + ", to continue.");
+
+ // Consume all the allow events for this thread.
+ /*for (int i = 0; i < allowEvents[callerId].length; i++)
+ {
+ allowEvents[callerId][i] = false;
+ }*/
+
+ // Consume just the event from the allower to the consumer, leaving other pending allow events alone.
+ allowEvents[callerId][allowerId] = false;
+
+ return false;
+ }
+ }
+ }
+
+ // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status
+ // for the threads being waited on to see if any are blocked on other resources.
+ if (otherWaitIsAllow)
+ {
+ log.debug("Other wait is to be interpreted as an allow event.");
+
+ // Look at all the potential allower threads.
+ for (int allowerId : threads)
+ {
+ // Get the Java thread state for the allowing thread.
+ Thread threadToTest = testThreads[allowerId].getThread();
+ Thread.State state = threadToTest.getState();
+
+ // Check if the thread is blocked and so a potential candidate for releasing this one.
+ if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING)
+ || (state == Thread.State.TIMED_WAITING))
+ {
+ log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating.");
+
+ // Check that the allower thread is not waiting on the coordinator lock or any of the
+ // individual thread locks. It must be waiting or blocked on another monitor.
+ TestRunnable allowingRunnable = testThreads[allowerId];
+ boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator();
+
+ if (!isWaitingOnCoordinator)
+ {
+ log.debug("The allower thread, id = " + allowerId
+ + ", is blocked or waiting other than on the coordinator.");
+
+ // Get the threads lock for the event to consume.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[callerId])
+ {
+ caller.setWaitingOnCoordinator(false);
+
+ // Consume all the allow events for this thread.
+ for (int i = 0; i < allowEvents[callerId].length; i++)
+ {
+ allowEvents[callerId][i] = false;
+ }
+
+ return true;
+ }
+ }
+ else
+ {
+ log.debug("The waiting allower thread, " + allowerId
+ + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue.");
+ }
+ }
+ }
+ }
+
+ // Keep waiting until an 'allow to continue' event can be consumed.
+ try
+ {
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (coordinatorLock)
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events.");
+
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+ coordinatorLock.wait(10);
+ }
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Release the waiting on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible
+ // deadlock exception if so.
+ long waitTime = System.nanoTime() - startTime;
+ log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds.");
+
+ if (waitTime > deadlockTimeout)
+ {
+ // Throw a possible deadlock exception.
+ throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this);
+ }
+
+ log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available.");
+ }
+ }
+
+ /**
+ * Pretty prints the state of the thread test coordinator, for debugging purposes.
+ *
+ * @return Pretty printed state of the thread test coordinator.
+ */
+ public String toString()
+ {
+ String result = "[";
+
+ for (int i = 0; i < allowEvents.length; i++)
+ {
+ for (int j = 0; j < allowEvents[i].length; j++)
+ {
+ result += allowEvents[i][j];
+
+ result += (j < (allowEvents[i].length - 1)) ? ", " : "";
+ }
+
+ result += (i < (allowEvents.length - 1)) ? ",\n " : "";
+ }
+
+ result += "]";
+
+ for (int i = 0; i < testThreads.length; i++)
+ {
+ result += "thread[" + i + "] = " + testThreads[i].toString();
+ }
+
+ return result;
+ }
+
+}
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java
new file mode 100644
index 0000000000..b9865f2e22
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestExample.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An example to illustrate the use of the {@link ThreadTestCoordinator} and {@link TestRunnable}s.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Demo multi-threaded testing.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class ThreadTestExample
+{
+ /** Used for logging. */
+ private static final Logger log = Logger.getLogger(ThreadTestExample.class);
+
+ /** Test thread 1. */
+ TestRunnable testThread1 =
+ new TestRunnable()
+ {
+ public void runWithExceptions() throws Exception
+ {
+ log.debug("public void run(): called");
+ log.info("in testThread0, block 1");
+
+ // Wait for t2 to allow t1 to continue.
+ allow(new int[] { 1 });
+ waitFor(new int[] { 1 }, false);
+
+ log.info("in testThread0, block 2");
+
+ // Wait for t2 to allow t1 to continue. T2 is allowed to be blocked elsewhere than giving explicit
+ // permission to allow t1 to continue.
+ allow(new int[] { 1 });
+ waitFor(new int[] { 1 }, true);
+
+ log.info("in testThread0, block 3");
+
+ // Release thread 2 from waiting on the shared lock.
+ synchronized (sharedLock)
+ {
+ sharedLock.notifyAll();
+ }
+
+ allow(new int[] { 1 });
+ }
+ };
+
+ /** A shared lock between the test threads. */
+ final Object sharedLock = new Object();
+
+ /** Test thread 2. */
+ TestRunnable testThread2 =
+ new TestRunnable()
+ {
+ public void runWithExceptions() throws Exception
+ {
+ log.debug("public void run(): called");
+ log.info("in testThread1, block 1");
+
+ // Wait for t1 to allow t2 to continue.
+ allow(new int[] { 0 });
+ waitFor(new int[] { 0 }, false);
+
+ log.info("in testThread1, block 2");
+
+ // Wait on another resource. T1 should accept this as permission to continue.
+ try
+ {
+ synchronized (sharedLock)
+ {
+ log.debug("in testThread1, waiting on shared lock.");
+ sharedLock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Bail-out with a runtime if this happens.
+ throw new RuntimeException("Interrupted whilst waiting for shared lock.", e);
+ }
+
+ log.info("in testThread1, finished waiting on shared lock.");
+
+ // allow(new int[] { 0 });
+
+ // Wait for t1 to allow t2 to continue.
+ waitFor(new int[] { 0 }, false);
+
+ log.info("in testThread1, block 3");
+
+ allow(new int[] { 0 });
+ }
+ };
+
+ /**
+ * Executes the test threads with coordination.
+ *
+ * @param args Ignored.
+ */
+ public void main(String[] args)
+ {
+ ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
+
+ tt.addTestThread(testThread1, 0);
+ tt.addTestThread(testThread2, 1);
+ tt.setDeadlockTimeout(500);
+ tt.run();
+
+ String errorMessage = tt.joinAndRetrieveMessages();
+
+ // Print any error messages or exceptions.
+ log.info(errorMessage);
+
+ if (!tt.getExceptions().isEmpty())
+ {
+ for (Exception e : tt.getExceptions())
+ {
+ log.warn("Exception thrown during test thread: ", e);
+ }
+ }
+ }
+}
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html
new file mode 100644
index 0000000000..904fd0fd05
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/package.html
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<html>
+<body>
+Contains code to assist in testing concurrency issues using coordinated threads to present code under test with
+oportunities to expose concurrency bugs. Some example concurrency bugs that may be tested using these techniques are
+race conditions, dead locks, live locks, dirty reads, phantom reads, non repeatable reads and so on.
+</body>
+</html>