summaryrefslogtreecommitdiff
path: root/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
blob: 3cf8543656dabfb1aa83c81a50ccbf0273e7b6e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.junit.concurrency;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;

/**
 * ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send
 * them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch
 * array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one
 * sender the latches for all senders for a are cleared. This class is always used in conjunction with
 * {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce
 * concurrency bugs.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Accept test threads to coordinate.
 * <tr><td> Allow test threads to send 'allow to continue' signals.
 * <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals.
 * <tr><td> Report error messages from test threads.
 * <tr><td> Report exceptions from test threads.
 * <tr><td> Provide method to wait until all test threads have completed.
 * </table>
 *
 * @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has
 *       proved extremely usefull. Some documentation for this needs to be written to explain it better.
 *
 * @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for
 *       each other, they are deadlocked and there is something wrong with the test code that put them in that
 *       situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some
 *       external event. A timeout should be used. Timeout is already implemented, just need to sanity check how
 *       this is working and document it.
 *
 * @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I
 *       guess a longish timeout is the only thing that can be done for that.
 *
 * @todo Only course grained synchronous at the method class level can be obtained. This is because test code can
 *       only insert synchronization points between method calls it makes. So this code will not be usefull for
 *       checking sequences of events within methods, unless the code under test is explicitly instrumented for it.
 *       It might be possible to instrument code by using labels, and then use the debugger/profiler interface to
 *       put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels
 *       can be left in the code, without altering its behaviour.
 *
 * @author Rupert Smith
 */
public class ThreadTestCoordinator
{
    /** Used for logging. */
    private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class);

    /** Keeps track of the test threads by their ids. */
    private TestRunnable[] testThreads; // = new TestRunnable[2];

    /** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */
    private final Object coordinatorLock = new Object();

    /** A set of monitors for each test thread. */
    private Object[] locks;

    /** The binary latch array, this is always a square array allowing one event from and to every thread. */
    private boolean[][] allowEvents;

    /** Keeps track of the number of threads being coordinated. */
    private int threadCount = 0;

    /** Accumulates any exceptions resulting from the threads run methods. */
    private Collection<Exception> exceptions = new ArrayList<Exception>();

    /**
     * Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential
     * deadlock may be happening.
     */
    private long deadlockTimeout = 1000 * 1000000;

    /** Holds the factory to create test thread with. */
    private ThreadFactory threadFactory;

    /**
     * Creates a new test thread coordinator. The number of threads to run must be specified here.
     *
     * @param numThreads The number of threads to run.
     */
    public ThreadTestCoordinator(int numThreads)
    {
        this.threadCount = numThreads;

        // Create an array big enough to hold all the test threads.
        testThreads = new TestRunnable[threadCount];

        // Use the default thread factory, as none specified.
        threadFactory = new DefaultThreadFactory();
    }

    /**
     * Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be
     * specified here.
     *
     * @param numThreads    The number of threads to run.
     * @param threadFactory The factory to use to create the test threads.
     */
    public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory)
    {
        this.threadCount = numThreads;

        // Create an array big enough to hold all the test threads.
        testThreads = new TestRunnable[threadCount];

        // Use the specified thread factory.
        this.threadFactory = threadFactory;
    }

    /**
     * Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and
     * it is up to the caller to do this.
     *
     * @param runnable The test thread.
     * @param id       The explicit id to assign to the test thread.
     */
    public void addTestThread(TestRunnable runnable, int id)
    {
        testThreads[id] = runnable;
        runnable.setCoordinator(this);
        runnable.setId(id);
    }

    /**
     * Starts all the coordinated threads running.
     */
    public void run()
    {
        // Create the monitors for each thread.
        locks = new Object[threadCount];

        // Create an appropriately sized event queue to allow one event from and to each thread.
        allowEvents = new boolean[threadCount][threadCount];

        // Initialize the monitors and clear the event queues.
        for (int i = 0; i < locks.length; i++)
        {
            locks[i] = new Object();

            for (int j = 0; j < locks.length; j++)
            {
                allowEvents[i][j] = false;
            }
        }

        // Start all the threads running.
        for (TestRunnable nextRunnable : testThreads)
        {
            // Create a Java thread for the test thread.
            Thread newThread = threadFactory.newThread(nextRunnable);
            nextRunnable.setThread(newThread);

            // Start it running.
            newThread.start();
        }
    }

    /**
     * Waits until all the test threads have completed and returns any accumulated error messages from them. Any
     * exceptions thrown by their run methods are also kept at this point.
     *
     * @return The accumulated error messages from all the threads concatenated together.
     */
    public String joinAndRetrieveMessages()
    {
        // Create an empty error message.
        String errorMessage = "";

        // Join all the test threads.
        for (TestRunnable r : testThreads)
        {
            Thread t = r.getThread();

            try
            {
                t.join();
            }
            catch (InterruptedException e)
            { }

            // Add any accumulated error messages to the return value.
            errorMessage += r.getErrorMessage();

            // Keep any exceptions resulting from the threads run method.
            Exception e = r.getException();

            if (e != null)
            {
                exceptions.add(e);
            }
        }

        return errorMessage;
    }

    /**
     * Reports any accumulated exceptions from the test threads run methods. This method must be called after
     * {@link #joinAndRetrieveMessages}.
     *
     * @return Any accumulated exceptions from the test threads run methods. This method must be called after
     */
    public Collection<Exception> getExceptions()
    {
        return exceptions;
    }

    /**
     * Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send
     * them continue events for longer than this timeout then the threads are all terminated.
     *
     * @param millis The minimum time to allow to pass before breaking out of any potential deadlocks.
     *
     * @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages
     *       method should throw a PotentialDeadlockException.
     */
    public void setDeadlockTimeout(long millis)
    {
        deadlockTimeout = millis * 1000000;
    }

    /**
     * Creates a set of 'allow to continue' events on the event queues of the specified threads.
     *
     * @param threads  The set of threads to allow to continue.
     * @param callerId The explicit id of the calling test thread.
     * @param caller   The calling test thread.
     */
    void produceAllowEvents(int[] threads, int callerId, TestRunnable caller)
    {
        // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
        String message = "Thread " + callerId + " is allowing threads [ ";

        for (int j = 0; j < threads.length; j++)
        {
            message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
        }

        message += " ] to continue.";
        log.debug(message);

        // For each allow event, synchronize on the threads lock then set the event flag to true.
        for (int id : threads)
        {
            // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
            // being blocked at this time.
            caller.setWaitingOnCoordinator(true);

            synchronized (locks[id])
            {
                // Release the wating on coordinator flag now that this thread is running again.
                caller.setWaitingOnCoordinator(false);

                // Send the allow to continue event to the receiving thread.
                allowEvents[id][callerId] = true;
            }
        }

        // Wake up any threads waiting on the coordinator lock to recheck their event queues.
        // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
        // being blocked at this time.
        caller.setWaitingOnCoordinator(true);

        synchronized (coordinatorLock)
        {
            // Release the wating on coordinator flag now that this thread is running again.
            caller.setWaitingOnCoordinator(false);
            coordinatorLock.notifyAll();
        }
    }

    /**
     * Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some
     * cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event.
     *
     * @param threads          The set of threads to accept an allow to continue event from.
     * @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue.
     * @param callerId         The explicit id of the calling test thread.
     * @param caller           The calling test thread.
     *
     * @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found
     *         to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions.
     */
    boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller)
    {
        // Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
        String message = "Thread " + callerId + " is requesting threads [ ";

        // Record the time at which this method was called. Will be used for breaking out of potential deadlocks.
        long startTime = System.nanoTime();

        for (int j = 0; j < threads.length; j++)
        {
            message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
        }

        message += " ] to allow it to continue.";
        log.debug(message);

        // Loop until an allow to continue event is received.
        while (true)
        {
            // Look at all the allowing thread to see if one has created an event for consumption.
            for (int allowerId : threads)
            {
                // Get the threads lock for the event to consume.
                // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
                // being blocked at this time.
                caller.setWaitingOnCoordinator(true);

                synchronized (locks[callerId])
                {
                    // Release the wating on coordinator flag now that this thread is running again.
                    caller.setWaitingOnCoordinator(false);

                    // Check if there is an event on the queue from the allowing thread to this one.
                    if (allowEvents[callerId][allowerId])
                    {
                        log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId
                            + ", to continue.");

                        // Consume all the allow events for this thread.
                        /*for (int i = 0; i < allowEvents[callerId].length; i++)
                        {
                            allowEvents[callerId][i] = false;
                        }*/

                        // Consume just the event from the allower to the consumer, leaving other pending allow events alone.
                        allowEvents[callerId][allowerId] = false;

                        return false;
                    }
                }
            }

            // If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status
            // for the threads being waited on to see if any are blocked on other resources.
            if (otherWaitIsAllow)
            {
                log.debug("Other wait is to be interpreted as an allow event.");

                // Look at all the potential allower threads.
                for (int allowerId : threads)
                {
                    // Get the Java thread state for the allowing thread.
                    Thread threadToTest = testThreads[allowerId].getThread();
                    Thread.State state = threadToTest.getState();

                    // Check if the thread is blocked and so a potential candidate for releasing this one.
                    if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING)
                            || (state == Thread.State.TIMED_WAITING))
                    {
                        log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating.");

                        // Check that the allower thread is not waiting on the coordinator lock or any of the
                        // individual thread locks. It must be waiting or blocked on another monitor.
                        TestRunnable allowingRunnable = testThreads[allowerId];
                        boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator();

                        if (!isWaitingOnCoordinator)
                        {
                            log.debug("The allower thread, id = " + allowerId
                                + ", is blocked or waiting other than on the coordinator.");

                            // Get the threads lock for the event to consume.
                            caller.setWaitingOnCoordinator(true);

                            synchronized (locks[callerId])
                            {
                                caller.setWaitingOnCoordinator(false);

                                // Consume all the allow events for this thread.
                                for (int i = 0; i < allowEvents[callerId].length; i++)
                                {
                                    allowEvents[callerId][i] = false;
                                }

                                return true;
                            }
                        }
                        else
                        {
                            log.debug("The waiting allower thread, " + allowerId
                                + ", is waiting on the coordinator so does not allow thread " + callerId + " to continue.");
                        }
                    }
                }
            }

            // Keep waiting until an 'allow to continue' event can be consumed.
            try
            {
                // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
                // being blocked at this time.
                caller.setWaitingOnCoordinator(true);

                synchronized (coordinatorLock)
                {
                    // Release the wating on coordinator flag now that this thread is running again.
                    caller.setWaitingOnCoordinator(false);

                    log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events.");

                    // Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
                    // being blocked at this time.
                    caller.setWaitingOnCoordinator(true);
                    coordinatorLock.wait(10);
                }
            }
            catch (InterruptedException e)
            { }

            // Release the waiting on coordinator flag now that this thread is running again.
            caller.setWaitingOnCoordinator(false);

            // Check if this thread has been waiting for longer than the deadlock timeout and raise a possible
            // deadlock exception if so.
            long waitTime = System.nanoTime() - startTime;
            log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds.");

            if (waitTime > deadlockTimeout)
            {
                // Throw a possible deadlock exception.
                throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this);
            }

            log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available.");
        }
    }

    /**
     * Pretty prints the state of the thread test coordinator, for debugging purposes.
     *
     * @return Pretty printed state of the thread test coordinator.
     */
    public String toString()
    {
        String result = "[";

        for (int i = 0; i < allowEvents.length; i++)
        {
            for (int j = 0; j < allowEvents[i].length; j++)
            {
                result += allowEvents[i][j];

                result += (j < (allowEvents[i].length - 1)) ? ", " : "";
            }

            result += (i < (allowEvents.length - 1)) ? ",\n " : "";
        }

        result += "]";

        for (int i = 0; i < testThreads.length; i++)
        {
            result += "thread[" + i + "] = " + testThreads[i].toString();
        }

        return result;
    }

}