summaryrefslogtreecommitdiff
path: root/buildscripts/linter/parallel.py
blob: 25a6f0d7e75a443fbb18fa50dc2504b87d14e386 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Utility code to execute code in parallel."""

import queue
import threading
import time
from multiprocessing import cpu_count
from typing import Any, Callable, List


def parallel_process(items, func):
    # type: (List[Any], Callable[[Any], bool]) -> bool
    """
    Run a set of work items to completion and wait.

    :returns whether all tasks were successful.
    """
    try:
        cpus = cpu_count()
    except NotImplementedError:
        cpus = 1

    task_queue = queue.Queue()  # type: queue.Queue

    has_failure_event = threading.Event()

    def worker():
        # type: () -> None
        """Worker thread to process work items in parallel."""
        while True:
            try:
                item = task_queue.get_nowait()
            except queue.Empty:
                # if the queue is empty, exit the worker thread
                return

            try:
                ret = func(item)
            finally:
                # Tell the queue we finished with the item
                task_queue.task_done()

            if not ret:
                has_failure_event.set()

    # Enqueue all the work we want to process
    for item in items:
        task_queue.put(item)

    # Process all the work
    threads = []
    for _ in range(cpus):
        thread = threading.Thread(target=worker)

        thread.daemon = True
        thread.start()
        threads.append(thread)

    # Wait for the threads to finish
    # Loop with a timeout so that we can process Ctrl-C interrupts
    while not queue.Empty:
        time.sleep(1)

    for thread in threads:
        thread.join()

    return not has_failure_event.is_set()