1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import select
import threading
"""Zmake multiprocessing utility module.
This module is used to aid in zmake's multiprocessing. It contains tools
available to log output from multiple processes on the fly. This means that a
process does not need to finish before the output is available to the developer
on the screen.
"""
# A local pipe use to signal the look that a new file descriptor was added and
# should be included in the select statement.
_logging_interrupt_pipe = os.pipe()
# A condition variable used to synchronize logging operations.
_logging_cv = threading.Condition()
# A map of file descriptors to their logger/logging level tuple.
_logging_map = {}
def _log_fd(fd):
"""Log information from a single file descriptor.
This function is BLOCKING. It will read from the given file descriptor until
either the end of line is read or EOF. Once EOF is read it will remove the
file descriptor from _logging_map so it will no longer be used.
Additionally, in some cases, the file descriptor will be closed (caused by
a call to Popen.wait()). In these cases, the file descriptor will also be
removed from the map as it is no longer valid.
"""
with _logging_cv:
logger, log_level = _logging_map[fd]
if fd.closed:
del _logging_map[fd]
_logging_cv.notify_all()
return
line = fd.readline()
if not line:
# EOF
del _logging_map[fd]
_logging_cv.notify_all()
return
line = line.strip()
if line:
logger.log(log_level, line)
def _prune_logging_fds():
"""Prune the current file descriptors under _logging_map.
This function will iterate over the logging map and check for closed file
descriptors. Every closed file descriptor will be removed.
"""
with _logging_cv:
remove = [fd for fd in _logging_map.keys() if fd.closed]
for fd in remove:
del _logging_map[fd]
if remove:
_logging_cv.notify_all()
def _logging_loop():
"""The primary logging thread loop.
This is the entry point of the logging thread. It will listen for (1) any
new data on the output file descriptors that were added via log_output and
(2) any new file descriptors being added by log_output. Once a file
descriptor is ready to be read, this function will call _log_fd to perform
the actual read and logging.
"""
while True:
with _logging_cv:
_logging_cv.wait_for(lambda: _logging_map)
keys = list(_logging_map.keys()) + [_logging_interrupt_pipe[0]]
try:
fds, _, _ = select.select(keys, [], [])
except ValueError:
# One of the file descriptors must be closed, prune them and try
# again.
_prune_logging_fds()
continue
if _logging_interrupt_pipe[0] in fds:
# We got a dummy byte sent by log_output, this is a signal used to
# break out of the blocking select.select call to tell us that the
# file descriptor set has changed. We just need to read the byte and
# remove this descriptor from the list. If we actually have data
# that should be read it will be read in the for loop below.
os.read(_logging_interrupt_pipe[0], 1)
fds.remove(_logging_interrupt_pipe[0])
for fd in fds:
_log_fd(fd)
_logging_thread = threading.Thread(target=_logging_loop, daemon=True)
def log_output(logger, log_level, file_descriptor):
"""Log the output from the given file descriptor.
Args:
logger: The logger object to use.
log_level: The logging level to use.
file_descriptor: The file descriptor to read from.
"""
with _logging_cv:
if not _logging_thread.is_alive():
_logging_thread.start()
_logging_map[file_descriptor] = (logger, log_level)
# Write a dummy byte to the pipe to break the select so we can add the
# new fd.
os.write(_logging_interrupt_pipe[1], b'x')
# Notify the condition so we can run the select on the current fds.
_logging_cv.notify_all()
def wait_for_log_end():
"""Wait for all the logs to be printed.
This method will block execution until all the logs have been flushed out.
"""
with _logging_cv:
_logging_cv.wait_for(lambda: not _logging_map)
class Executor:
"""Parallel executor helper class.
This class is used to run multiple functions in parallel. The functions MUST
return an integer result code (or throw an exception). This class will start
a thread per operation and wait() for all the threads to resolve. If
fail_fast is set to True, then not all threads must return before wait()
returns. Instead either ALL threads must return 0 OR any thread must return
a non zero result (or throw an exception).
Attributes:
fail_fast: Whether or not the first function's error code should
terminate the executor.
lock: The condition variable used to synchronize across threads.
threads: A list of threading.Thread objects currently under this
Executor.
results: A list of result codes returned by each of the functions called
by this Executor.
"""
def __init__(self, fail_fast):
self.fail_fast = fail_fast
self.lock = threading.Condition()
self.threads = []
self.results = []
self.logger = logging.getLogger(self.__class__.__name__)
def append(self, func):
"""Append the given function to the wait list.
Once added, the function's return value will be used to determine the
Executor's final result value. The function must return an int result
code or throw an exception. For example: If two functions were added
to the Executor, they will both be run in parallel and their results
will determine whether or not the Executor succeeded. If both functions
returned 0, then the Executor's wait function will also return 0.
Args:
func: A function which returns an int result code or throws an
exception.
"""
with self.lock:
thread = threading.Thread(target=lambda: self._run_fn(func),
daemon=True)
thread.start()
self.threads.append(thread)
def wait(self):
"""Wait for a result to be available.
This function waits for the executor to resolve. Being resolved depends on
the initial fail_fast setting.
- If fail_fast is True then the executor is resolved as soon as any thread
throws an exception or returns a non-zero result. Or, all the threads
returned a zero result code.
- If fail_fast is False, then all the threads must have returned a result
code or have thrown.
Returns:
An integer result code of either the first failed function or 0 if
they all succeeded.
"""
with self.lock:
self.lock.wait_for(predicate=lambda: self._is_finished)
return self._result
def _run_fn(self, func):
"""Entry point to each running thread.
This function will run the function provided in the append() function.
The result value of the function will be used to determine the
Executor's result value. If the function throws any exception it will be
caught and -1 will be used as the assumed result value.
Args:
func: The function to run.
"""
try:
result = func()
except Exception as ex:
self.logger.exception(ex)
result = -1
with self.lock:
self.results.append(result)
self.lock.notify_all()
@property
def _is_finished(self):
"""Whether or not the Executor is considered to be done.
Returns:
True if the Executor is considered done.
"""
if len(self.threads) == len(self.results):
return True
return self.fail_fast and any([result for result in self.results])
@property
def _result(self):
"""The result code of the Executor.
Note that _is_finished must be True for this to have any meaning.
Returns:
An int representing the result value of the underlying functions.
"""
return next((result for result in self.results if result), 0)
|