blob: 0648bfb16e7952f09d1ab5461a8072e35880087b (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
"""Utility code to execute code in parallel."""
from __future__ import absolute_import
from __future__ import print_function
import Queue
import threading
import time
from multiprocessing import cpu_count
from typing import Any, Callable, List
def parallel_process(items, func):
# type: (List[Any], Callable[[Any], bool]) -> bool
"""Run a set of work items to completion and wait."""
try:
cpus = cpu_count()
except NotImplementedError:
cpus = 1
task_queue = Queue.Queue() # type: Queue.Queue
# Use a list so that worker function will capture this variable
pp_event = threading.Event()
pp_result = [True]
pp_lock = threading.Lock()
def worker():
# type: () -> None
"""Worker thread to process work items in parallel."""
while not pp_event.is_set():
try:
item = task_queue.get_nowait()
except Queue.Empty:
# if the queue is empty, exit the worker thread
pp_event.set()
return
try:
ret = func(item)
finally:
# Tell the queue we finished with the item
task_queue.task_done()
# Return early if we fail, and signal we are done
if not ret:
with pp_lock:
pp_result[0] = False
pp_event.set()
return
# Enqueue all the work we want to process
for item in items:
task_queue.put(item)
# Process all the work
threads = []
for _ in range(cpus):
thread = threading.Thread(target=worker)
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for the threads to finish
# Loop with a timeout so that we can process Ctrl-C interrupts
while not pp_event.wait(1):
time.sleep(1)
for thread in threads:
thread.join()
return pp_result[0]
|