/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.junit.concurrency; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ThreadFactory; /** * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one * sender the latches for all senders for a are cleared. This class is always used in conjunction with * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce * concurrency bugs. * *

*
CRC Card
Responsibilities Collaborations *
Accept test threads to coordinate. *
Allow test threads to send 'allow to continue' signals. *
Allow test threads to wait on this coordinator for 'allow to continue' signals. *
Report error messages from test threads. *
Report exceptions from test threads. *
Provide method to wait until all test threads have completed. *
* * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has * proved extremely usefull. Some documentation for this needs to be written to explain it better. * * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for * each other, they are deadlocked and there is something wrong with the test code that put them in that * situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some * external event. A timeout should be used. Timeout is already implemented, just need to sanity check how * this is working and document it. * * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I * guess a longish timeout is the only thing that can be done for that. * * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can * only insert synchronization points between method calls it makes. So this code will not be usefull for * checking sequences of events within methods, unless the code under test is explicitly instrumented for it. * It might be possible to instrument code by using labels, and then use the debugger/profiler interface to * put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels * can be left in the code, without altering its behaviour. * * @author Rupert Smith */ public class ThreadTestCoordinator { /** Used for logging. */ private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class); /** Keeps track of the test threads by their ids. */ private TestRunnable[] testThreads; // = new TestRunnable[2]; /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */ private final Object coordinatorLock = new Object(); /** A set of monitors for each test thread. */ private Object[] locks; /** The binary latch array, this is always a square array allowing one event from and to every thread. */ private boolean[][] allowEvents; /** Keeps track of the number of threads being coordinated. */ private int threadCount = 0; /** Accumulates any exceptions resulting from the threads run methods. */ private Collection exceptions = new ArrayList(); /** * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential * deadlock may be happening. */ private long deadlockTimeout = 1000 * 1000000; /** Holds the factory to create test thread with. */ private ThreadFactory threadFactory; /** * Creates a new test thread coordinator. The number of threads to run must be specified here. * * @param numThreads The number of threads to run. */ public ThreadTestCoordinator(int numThreads) { this.threadCount = numThreads; // Create an array big enough to hold all the test threads. testThreads = new TestRunnable[threadCount]; // Use the default thread factory, as none specified. threadFactory = new DefaultThreadFactory(); } /** * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be * specified here. * * @param numThreads The number of threads to run. * @param threadFactory The factory to use to create the test threads. */ public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory) { this.threadCount = numThreads; // Create an array big enough to hold all the test threads. testThreads = new TestRunnable[threadCount]; // Use the specified thread factory. this.threadFactory = threadFactory; } /** * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and * it is up to the caller to do this. * * @param runnable The test thread. * @param id The explicit id to assign to the test thread. */ public void addTestThread(TestRunnable runnable, int id) { testThreads[id] = runnable; runnable.setCoordinator(this); runnable.setId(id); } /** * Starts all the coordinated threads running. */ public void run() { // Create the monitors for each thread. locks = new Object[threadCount]; // Create an appropriately sized event queue to allow one event from and to each thread. allowEvents = new boolean[threadCount][threadCount]; // Initialize the monitors and clear the event queues. for (int i = 0; i < locks.length; i++) { locks[i] = new Object(); for (int j = 0; j < locks.length; j++) { allowEvents[i][j] = false; } } // Start all the threads running. for (TestRunnable nextRunnable : testThreads) { // Create a Java thread for the test thread. Thread newThread = threadFactory.newThread(nextRunnable); nextRunnable.setThread(newThread); // Start it running. newThread.start(); } } /** * Waits until all the test threads have completed and returns any accumulated error messages from them. Any * exceptions thrown by their run methods are also kept at this point. * * @return The accumulated error messages from all the threads concatenated together. */ public String joinAndRetrieveMessages() { // Create an empty error message. String errorMessage = ""; // Join all the test threads. for (TestRunnable r : testThreads) { Thread t = r.getThread(); try { t.join(); } catch (InterruptedException e) { } // Add any accumulated error messages to the return value. errorMessage += r.getErrorMessage(); // Keep any exceptions resulting from the threads run method. Exception e = r.getException(); if (e != null) { exceptions.add(e); } } return errorMessage; } /** * Reports any accumulated exceptions from the test threads run methods. This method must be called after * {@link #joinAndRetrieveMessages}. * * @return Any accumulated exceptions from the test threads run methods. This method must be called after */ public Collection getExceptions() { return exceptions; } /** * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send * them continue events for longer than this timeout then the threads are all terminated. * * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks. * * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages * method should throw a PotentialDeadlockException. */ public void setDeadlockTimeout(long millis) { deadlockTimeout = millis * 1000000; } /** * Creates a set of 'allow to continue' events on the event queues of the specified threads. * * @param threads The set of threads to allow to continue. * @param callerId The explicit id of the calling test thread. * @param caller The calling test thread. */ void produceAllowEvents(int[] threads, int callerId, TestRunnable caller) { // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. String message = "Thread " + callerId + " is allowing threads [ "; for (int j = 0; j < threads.length; j++) { message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); } message += " ] to continue."; log.debug(message); // For each allow event, synchronize on the threads lock then set the event flag to true. for (int id : threads) { // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for // being blocked at this time. caller.setWaitingOnCoordinator(true); synchronized (locks[id]) { // Release the wating on coordinator flag now that this thread is running again. caller.setWaitingOnCoordinator(false); // Send the allow to continue event to the receiving thread. allowEvents[id][callerId] = true; } } // Wake up any threads waiting on the coordinator lock to recheck their event queues. // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for // being blocked at this time. caller.setWaitingOnCoordinator(true); synchronized (coordinatorLock) { // Release the wating on coordinator flag now that this thread is running again. caller.setWaitingOnCoordinator(false); coordinatorLock.notifyAll(); } } /** * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event. * * @param threads The set of threads to accept an allow to continue event from. * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue. * @param callerId The explicit id of the calling test thread. * @param caller The calling test thread. * * @return If the otherWaitIsAllow flag is set, then true is returned when the thread being waited on is found * to be blocked outside of the thread test coordinator. false under all other conditions. */ boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller) { // Generate some debugging messages. Very usefull to know how thread synchronization is progressing. String message = "Thread " + callerId + " is requesting threads [ "; // Record the time at which this method was called. Will be used for breaking out of potential deadlocks. long startTime = System.nanoTime(); for (int j = 0; j < threads.length; j++) { message += threads[j] + ((j < (threads.length - 1)) ? ", " : ""); } message += " ] to allow it to continue."; log.debug(message); // Loop until an allow to continue event is received. while (true) { // Look at all the allowing thread to see if one has created an event for consumption. for (int allowerId : threads) { // Get the threads lock for the event to consume. // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for // being blocked at this time. caller.setWaitingOnCoordinator(true); synchronized (locks[callerId]) { // Release the wating on coordinator flag now that this thread is running again. caller.setWaitingOnCoordinator(false); // Check if there is an event on the queue from the allowing thread to this one. if (allowEvents[callerId][allowerId]) { log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId + ", to continue."); // Consume all the allow events for this thread. /*for (int i = 0; i < allowEvents[callerId].length; i++) { allowEvents[callerId][i] = false; }*/ // Consume just the event from the allower to the consumer, leaving other pending allow events alone. allowEvents[callerId][allowerId] = false; return false; } } } // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status // for the threads being waited on to see if any are blocked on other resources. if (otherWaitIsAllow) { log.debug("Other wait is to be interpreted as an allow event."); // Look at all the potential allower threads. for (int allowerId : threads) { // Get the Java thread state for the allowing thread. Thread threadToTest = testThreads[allowerId].getThread(); Thread.State state = threadToTest.getState(); // Check if the thread is blocked and so a potential candidate for releasing this one. if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING) || (state == Thread.State.TIMED_WAITING)) { log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating."); // Check that the allower thread is not waiting on the coordinator lock or any of the // individual thread locks. It must be waiting or blocked on another monitor. TestRunnable allowingRunnable = testThreads[allowerId]; boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator(); if (!isWaitingOnCoordinator) { log.debug("The allower thread, id = " + allowerId + ", is blocked or waiting other than on the coordinator."); // Get the threads lock for the event to consume. caller.setWaitingOnCoordinator(true); synchronized (locks[callerId]) { caller.setWaitingOnCoordinator(false); // Consume all the allow events for this thread. for (int i = 0; i < allowEvents[callerId].length; i++) { allowEvents[callerId][i] = false; } return true; } } else { log.debug("The waiting allower thread, " + allowerId + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue."); } } } } // Keep waiting until an 'allow to continue' event can be consumed. try { // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for // being blocked at this time. caller.setWaitingOnCoordinator(true); synchronized (coordinatorLock) { // Release the wating on coordinator flag now that this thread is running again. caller.setWaitingOnCoordinator(false); log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events."); // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for // being blocked at this time. caller.setWaitingOnCoordinator(true); coordinatorLock.wait(10); } } catch (InterruptedException e) { } // Release the waiting on coordinator flag now that this thread is running again. caller.setWaitingOnCoordinator(false); // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible // deadlock exception if so. long waitTime = System.nanoTime() - startTime; log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds."); if (waitTime > deadlockTimeout) { // Throw a possible deadlock exception. throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this); } log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available."); } } /** * Pretty prints the state of the thread test coordinator, for debugging purposes. * * @return Pretty printed state of the thread test coordinator. */ public String toString() { String result = "["; for (int i = 0; i < allowEvents.length; i++) { for (int j = 0; j < allowEvents[i].length; j++) { result += allowEvents[i][j]; result += (j < (allowEvents[i].length - 1)) ? ", " : ""; } result += (i < (allowEvents.length - 1)) ? ",\n " : ""; } result += "]"; for (int i = 0; i < testThreads.length; i++) { result += "thread[" + i + "] = " + testThreads[i].toString(); } return result; } }