summaryrefslogtreecommitdiff
path: root/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java')
-rw-r--r--qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java486
1 files changed, 486 insertions, 0 deletions
diff --git a/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
new file mode 100644
index 0000000000..3cf8543656
--- /dev/null
+++ b/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
@@ -0,0 +1,486 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.junit.concurrency;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send
+ * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch
+ * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one
+ * sender the latches for all senders for a are cleared. This class is always used in conjunction with
+ * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce
+ * concurrency bugs.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept test threads to coordinate.
+ * <tr><td> Allow test threads to send 'allow to continue' signals.
+ * <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals.
+ * <tr><td> Report error messages from test threads.
+ * <tr><td> Report exceptions from test threads.
+ * <tr><td> Provide method to wait until all test threads have completed.
+ * </table>
+ *
+ * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has
+ * proved extremely usefull. Some documentation for this needs to be written to explain it better.
+ *
+ * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for
+ * each other, they are deadlocked and there is something wrong with the test code that put them in that
+ * situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some
+ * external event. A timeout should be used. Timeout is already implemented, just need to sanity check how
+ * this is working and document it.
+ *
+ * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I
+ * guess a longish timeout is the only thing that can be done for that.
+ *
+ * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can
+ * only insert synchronization points between method calls it makes. So this code will not be usefull for
+ * checking sequences of events within methods, unless the code under test is explicitly instrumented for it.
+ * It might be possible to instrument code by using labels, and then use the debugger/profiler interface to
+ * put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels
+ * can be left in the code, without altering its behaviour.
+ *
+ * @author Rupert Smith
+ */
+public class ThreadTestCoordinator
+{
+ /** Used for logging. */
+ private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class);
+
+ /** Keeps track of the test threads by their ids. */
+ private TestRunnable[] testThreads; // = new TestRunnable[2];
+
+ /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */
+ private final Object coordinatorLock = new Object();
+
+ /** A set of monitors for each test thread. */
+ private Object[] locks;
+
+ /** The binary latch array, this is always a square array allowing one event from and to every thread. */
+ private boolean[][] allowEvents;
+
+ /** Keeps track of the number of threads being coordinated. */
+ private int threadCount = 0;
+
+ /** Accumulates any exceptions resulting from the threads run methods. */
+ private Collection<Exception> exceptions = new ArrayList<Exception>();
+
+ /**
+ * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential
+ * deadlock may be happening.
+ */
+ private long deadlockTimeout = 1000 * 1000000;
+
+ /** Holds the factory to create test thread with. */
+ private ThreadFactory threadFactory;
+
+ /**
+ * Creates a new test thread coordinator. The number of threads to run must be specified here.
+ *
+ * @param numThreads The number of threads to run.
+ */
+ public ThreadTestCoordinator(int numThreads)
+ {
+ this.threadCount = numThreads;
+
+ // Create an array big enough to hold all the test threads.
+ testThreads = new TestRunnable[threadCount];
+
+ // Use the default thread factory, as none specified.
+ threadFactory = new DefaultThreadFactory();
+ }
+
+ /**
+ * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be
+ * specified here.
+ *
+ * @param numThreads The number of threads to run.
+ * @param threadFactory The factory to use to create the test threads.
+ */
+ public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory)
+ {
+ this.threadCount = numThreads;
+
+ // Create an array big enough to hold all the test threads.
+ testThreads = new TestRunnable[threadCount];
+
+ // Use the specified thread factory.
+ this.threadFactory = threadFactory;
+ }
+
+ /**
+ * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and
+ * it is up to the caller to do this.
+ *
+ * @param runnable The test thread.
+ * @param id The explicit id to assign to the test thread.
+ */
+ public void addTestThread(TestRunnable runnable, int id)
+ {
+ testThreads[id] = runnable;
+ runnable.setCoordinator(this);
+ runnable.setId(id);
+ }
+
+ /**
+ * Starts all the coordinated threads running.
+ */
+ public void run()
+ {
+ // Create the monitors for each thread.
+ locks = new Object[threadCount];
+
+ // Create an appropriately sized event queue to allow one event from and to each thread.
+ allowEvents = new boolean[threadCount][threadCount];
+
+ // Initialize the monitors and clear the event queues.
+ for (int i = 0; i < locks.length; i++)
+ {
+ locks[i] = new Object();
+
+ for (int j = 0; j < locks.length; j++)
+ {
+ allowEvents[i][j] = false;
+ }
+ }
+
+ // Start all the threads running.
+ for (TestRunnable nextRunnable : testThreads)
+ {
+ // Create a Java thread for the test thread.
+ Thread newThread = threadFactory.newThread(nextRunnable);
+ nextRunnable.setThread(newThread);
+
+ // Start it running.
+ newThread.start();
+ }
+ }
+
+ /**
+ * Waits until all the test threads have completed and returns any accumulated error messages from them. Any
+ * exceptions thrown by their run methods are also kept at this point.
+ *
+ * @return The accumulated error messages from all the threads concatenated together.
+ */
+ public String joinAndRetrieveMessages()
+ {
+ // Create an empty error message.
+ String errorMessage = "";
+
+ // Join all the test threads.
+ for (TestRunnable r : testThreads)
+ {
+ Thread t = r.getThread();
+
+ try
+ {
+ t.join();
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Add any accumulated error messages to the return value.
+ errorMessage += r.getErrorMessage();
+
+ // Keep any exceptions resulting from the threads run method.
+ Exception e = r.getException();
+
+ if (e != null)
+ {
+ exceptions.add(e);
+ }
+ }
+
+ return errorMessage;
+ }
+
+ /**
+ * Reports any accumulated exceptions from the test threads run methods. This method must be called after
+ * {@link #joinAndRetrieveMessages}.
+ *
+ * @return Any accumulated exceptions from the test threads run methods. This method must be called after
+ */
+ public Collection<Exception> getExceptions()
+ {
+ return exceptions;
+ }
+
+ /**
+ * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send
+ * them continue events for longer than this timeout then the threads are all terminated.
+ *
+ * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks.
+ *
+ * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages
+ * method should throw a PotentialDeadlockException.
+ */
+ public void setDeadlockTimeout(long millis)
+ {
+ deadlockTimeout = millis * 1000000;
+ }
+
+ /**
+ * Creates a set of 'allow to continue' events on the event queues of the specified threads.
+ *
+ * @param threads The set of threads to allow to continue.
+ * @param callerId The explicit id of the calling test thread.
+ * @param caller The calling test thread.
+ */
+ void produceAllowEvents(int[] threads, int callerId, TestRunnable caller)
+ {
+ // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
+ String message = "Thread " + callerId + " is allowing threads [ ";
+
+ for (int j = 0; j < threads.length; j++)
+ {
+ message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
+ }
+
+ message += " ] to continue.";
+ log.debug(message);
+
+ // For each allow event, synchronize on the threads lock then set the event flag to true.
+ for (int id : threads)
+ {
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[id])
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Send the allow to continue event to the receiving thread.
+ allowEvents[id][callerId] = true;
+ }
+ }
+
+ // Wake up any threads waiting on the coordinator lock to recheck their event queues.
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (coordinatorLock)
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+ coordinatorLock.notifyAll();
+ }
+ }
+
+ /**
+ * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some
+ * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event.
+ *
+ * @param threads The set of threads to accept an allow to continue event from.
+ * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue.
+ * @param callerId The explicit id of the calling test thread.
+ * @param caller The calling test thread.
+ *
+ * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found
+ * to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions.
+ */
+ boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller)
+ {
+ // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
+ String message = "Thread " + callerId + " is requesting threads [ ";
+
+ // Record the time at which this method was called. Will be used for breaking out of potential deadlocks.
+ long startTime = System.nanoTime();
+
+ for (int j = 0; j < threads.length; j++)
+ {
+ message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
+ }
+
+ message += " ] to allow it to continue.";
+ log.debug(message);
+
+ // Loop until an allow to continue event is received.
+ while (true)
+ {
+ // Look at all the allowing thread to see if one has created an event for consumption.
+ for (int allowerId : threads)
+ {
+ // Get the threads lock for the event to consume.
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[callerId])
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Check if there is an event on the queue from the allowing thread to this one.
+ if (allowEvents[callerId][allowerId])
+ {
+ log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId
+ + ", to continue.");
+
+ // Consume all the allow events for this thread.
+ /*for (int i = 0; i < allowEvents[callerId].length; i++)
+ {
+ allowEvents[callerId][i] = false;
+ }*/
+
+ // Consume just the event from the allower to the consumer, leaving other pending allow events alone.
+ allowEvents[callerId][allowerId] = false;
+
+ return false;
+ }
+ }
+ }
+
+ // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status
+ // for the threads being waited on to see if any are blocked on other resources.
+ if (otherWaitIsAllow)
+ {
+ log.debug("Other wait is to be interpreted as an allow event.");
+
+ // Look at all the potential allower threads.
+ for (int allowerId : threads)
+ {
+ // Get the Java thread state for the allowing thread.
+ Thread threadToTest = testThreads[allowerId].getThread();
+ Thread.State state = threadToTest.getState();
+
+ // Check if the thread is blocked and so a potential candidate for releasing this one.
+ if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING)
+ || (state == Thread.State.TIMED_WAITING))
+ {
+ log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating.");
+
+ // Check that the allower thread is not waiting on the coordinator lock or any of the
+ // individual thread locks. It must be waiting or blocked on another monitor.
+ TestRunnable allowingRunnable = testThreads[allowerId];
+ boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator();
+
+ if (!isWaitingOnCoordinator)
+ {
+ log.debug("The allower thread, id = " + allowerId
+ + ", is blocked or waiting other than on the coordinator.");
+
+ // Get the threads lock for the event to consume.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (locks[callerId])
+ {
+ caller.setWaitingOnCoordinator(false);
+
+ // Consume all the allow events for this thread.
+ for (int i = 0; i < allowEvents[callerId].length; i++)
+ {
+ allowEvents[callerId][i] = false;
+ }
+
+ return true;
+ }
+ }
+ else
+ {
+ log.debug("The waiting allower thread, " + allowerId
+ + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue.");
+ }
+ }
+ }
+ }
+
+ // Keep waiting until an 'allow to continue' event can be consumed.
+ try
+ {
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+
+ synchronized (coordinatorLock)
+ {
+ // Release the wating on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events.");
+
+ // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
+ // being blocked at this time.
+ caller.setWaitingOnCoordinator(true);
+ coordinatorLock.wait(10);
+ }
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Release the waiting on coordinator flag now that this thread is running again.
+ caller.setWaitingOnCoordinator(false);
+
+ // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible
+ // deadlock exception if so.
+ long waitTime = System.nanoTime() - startTime;
+ log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds.");
+
+ if (waitTime > deadlockTimeout)
+ {
+ // Throw a possible deadlock exception.
+ throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this);
+ }
+
+ log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available.");
+ }
+ }
+
+ /**
+ * Pretty prints the state of the thread test coordinator, for debugging purposes.
+ *
+ * @return Pretty printed state of the thread test coordinator.
+ */
+ public String toString()
+ {
+ String result = "[";
+
+ for (int i = 0; i < allowEvents.length; i++)
+ {
+ for (int j = 0; j < allowEvents[i].length; j++)
+ {
+ result += allowEvents[i][j];
+
+ result += (j < (allowEvents[i].length - 1)) ? ", " : "";
+ }
+
+ result += (i < (allowEvents.length - 1)) ? ",\n " : "";
+ }
+
+ result += "]";
+
+ for (int i = 0; i < testThreads.length; i++)
+ {
+ result += "thread[" + i + "] = " + testThreads[i].toString();
+ }
+
+ return result;
+ }
+
+}