diff options
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 188 |
1 files changed, 127 insertions, 61 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index 625c9b9d7b..d907c89f3f 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -10,6 +10,11 @@ except ImportError: from time import time as _time from traceback import format_exc as _format_exc from _weakrefset import WeakSet +from itertools import islice as _islice +try: + from _collections import deque as _deque +except ImportError: + from collections import deque as _deque # Note regarding PEP 8 compliant names # This threading model was originally inspired by Java, and inherited @@ -28,6 +33,7 @@ __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread _allocate_lock = _thread.allocate_lock +_set_sentinel = _thread._set_sentinel get_ident = _thread.get_ident ThreadError = _thread.error try: @@ -132,7 +138,7 @@ class _RLock: """ me = get_ident() if self._owner == me: - self._count = self._count + 1 + self._count += 1 return 1 rc = self._block.acquire(blocking, timeout) if rc: @@ -224,7 +230,7 @@ class Condition: self._is_owned = lock._is_owned except AttributeError: pass - self._waiters = [] + self._waiters = _deque() def __enter__(self): return self._lock.__enter__() @@ -332,14 +338,14 @@ class Condition: """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") - __waiters = self._waiters - waiters = __waiters[:n] - if not waiters: + all_waiters = self._waiters + waiters_to_notify = _deque(_islice(all_waiters, n)) + if not waiters_to_notify: return - for waiter in waiters: + for waiter in waiters_to_notify: waiter.release() try: - __waiters.remove(waiter) + all_waiters.remove(waiter) except ValueError: pass @@ -414,7 +420,7 @@ class Semaphore: break self._cond.wait(timeout) else: - self._value = self._value - 1 + self._value -= 1 rc = True return rc @@ -428,7 +434,7 @@ class Semaphore: """ with self._cond: - self._value = self._value + 1 + self._value += 1 self._cond.notify() def __exit__(self, t, v, tb): @@ -723,15 +729,13 @@ class BrokenBarrierError(RuntimeError): _counter = 0 def _newname(template="Thread-%d"): global _counter - _counter = _counter + 1 + _counter += 1 return template % _counter # Active thread administration _active_limbo_lock = _allocate_lock() _active = {} # maps thread id to Thread object _limbo = {} - -# For debug and leak testing _dangling = WeakSet() # Main class for threads @@ -790,28 +794,35 @@ class Thread: else: self._daemonic = current_thread().daemon self._ident = None + self._tstate_lock = None self._started = Event() - self._stopped = False - self._block = Condition(Lock()) + self._is_stopped = False self._initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self._stderr = _sys.stderr + # For debugging and _after_fork() _dangling.add(self) - def _reset_internal_locks(self): + def _reset_internal_locks(self, is_alive): # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. - if hasattr(self, '_block'): # DummyThread deletes _block - self._block.__init__() self._started._reset_internal_locks() + if is_alive: + self._set_tstate_lock() + else: + # The thread isn't alive after fork: it doesn't have a tstate + # anymore. + self._is_stopped = True + self._tstate_lock = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" if self._started.is_set(): status = "started" - if self._stopped: + self.is_alive() # easy way to get ._is_stopped set when appropriate + if self._is_stopped: status = "stopped" if self._daemonic: status += " daemon" @@ -884,9 +895,18 @@ class Thread: def _set_ident(self): self._ident = get_ident() + def _set_tstate_lock(self): + """ + Set a lock object which will be released by the interpreter when + the underlying thread state (see pystate.h) gets deleted. + """ + self._tstate_lock = _set_sentinel() + self._tstate_lock.acquire() + def _bootstrap_inner(self): try: self._set_ident() + self._set_tstate_lock() self._started.set() with _active_limbo_lock: _active[self._ident] = self @@ -941,7 +961,6 @@ class Thread: pass finally: with _active_limbo_lock: - self._stop() try: # We don't call self._delete() because it also # grabs _active_limbo_lock. @@ -950,10 +969,27 @@ class Thread: pass def _stop(self): - self._block.acquire() - self._stopped = True - self._block.notify_all() - self._block.release() + # After calling ._stop(), .is_alive() returns False and .join() returns + # immediately. ._tstate_lock must be released before calling ._stop(). + # + # Normal case: C code at the end of the thread's life + # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and + # that's detected by our ._wait_for_tstate_lock(), called by .join() + # and .is_alive(). Any number of threads _may_ call ._stop() + # simultaneously (for example, if multiple threads are blocked in + # .join() calls), and they're not serialized. That's harmless - + # they'll just make redundant rebindings of ._is_stopped and + # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the + # "assert self._is_stopped" in ._wait_for_tstate_lock() always works + # (the assert is executed only if ._tstate_lock is None). + # + # Special case: _main_thread releases ._tstate_lock via this + # module's _shutdown() function. + lock = self._tstate_lock + if lock is not None: + assert not lock.locked() + self._is_stopped = True + self._tstate_lock = None def _delete(self): "Remove current thread from the dict of currently running threads." @@ -1021,20 +1057,26 @@ class Thread: if self is current_thread(): raise RuntimeError("cannot join current thread") - self._block.acquire() - try: - if timeout is None: - while not self._stopped: - self._block.wait() - else: - deadline = _time() + timeout - while not self._stopped: - delay = deadline - _time() - if delay <= 0: - break - self._block.wait(delay) - finally: - self._block.release() + if timeout is None: + self._wait_for_tstate_lock() + else: + # the behavior of a negative timeout isn't documented, but + # historically .join(timeout=x) for x<0 has acted as if timeout=0 + self._wait_for_tstate_lock(timeout=max(timeout, 0)) + + def _wait_for_tstate_lock(self, block=True, timeout=-1): + # Issue #18808: wait for the thread state to be gone. + # At the end of the thread's life, after all knowledge of the thread + # is removed from C data structures, C code releases our _tstate_lock. + # This method passes its arguments to _tstate_lock.aquire(). + # If the lock is acquired, the C code is done, and self._stop() is + # called. That sets ._is_stopped to True, and ._tstate_lock to None. + lock = self._tstate_lock + if lock is None: # already determined that the C code is done + assert self._is_stopped + elif lock.acquire(block, timeout): + lock.release() + self._stop() @property def name(self): @@ -1073,7 +1115,10 @@ class Thread: """ assert self._initialized, "Thread.__init__() not called" - return self._started.is_set() and not self._stopped + if self._is_stopped or not self._started.is_set(): + return False + self._wait_for_tstate_lock(False) + return not self._is_stopped isAlive = is_alive @@ -1149,25 +1194,12 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread", daemon=False) + self._set_tstate_lock() self._started.set() self._set_ident() with _active_limbo_lock: _active[self._ident] = self - def _exitfunc(self): - self._stop() - t = _pickSomeNonDaemonThread() - while t: - t.join() - t = _pickSomeNonDaemonThread() - self._delete() - -def _pickSomeNonDaemonThread(): - for t in enumerate(): - if not t.daemon and t.is_alive(): - return t - return None - # Dummy thread class to represent threads not started here. # These aren't garbage collected when they die, nor can they be waited for. @@ -1182,11 +1214,6 @@ class _DummyThread(Thread): def __init__(self): Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) - # Thread._block consumes an OS-level locking primitive, which - # can never be used by a _DummyThread. Since a _DummyThread - # instance is immortal, that's bad, so release this resource. - del self._block - self._started.set() self._set_ident() with _active_limbo_lock: @@ -1248,7 +1275,40 @@ from _thread import stack_size # and make it available for the interpreter # (Py_Main) as threading._shutdown. -_shutdown = _MainThread()._exitfunc +_main_thread = _MainThread() + +def _shutdown(): + # Obscure: other threads may be waiting to join _main_thread. That's + # dubious, but some code does it. We can't wait for C code to release + # the main thread's tstate_lock - that won't happen until the interpreter + # is nearly dead. So we release it here. Note that just calling _stop() + # isn't enough: other threads may already be waiting on _tstate_lock. + tlock = _main_thread._tstate_lock + # The main thread isn't finished yet, so its thread state lock can't have + # been released. + assert tlock is not None + assert tlock.locked() + tlock.release() + _main_thread._stop() + t = _pickSomeNonDaemonThread() + while t: + t.join() + t = _pickSomeNonDaemonThread() + _main_thread._delete() + +def _pickSomeNonDaemonThread(): + for t in enumerate(): + if not t.daemon and t.is_alive(): + return t + return None + +def main_thread(): + """Return the main thread object. + + In normal conditions, the main thread is the thread from which the + Python interpreter was started. + """ + return _main_thread # get thread-local implementation, either from the thread # module, or from the python fallback @@ -1266,25 +1326,31 @@ def _after_fork(): # Reset _active_limbo_lock, in case we forked while the lock was held # by another (non-forked) thread. http://bugs.python.org/issue874900 - global _active_limbo_lock + global _active_limbo_lock, _main_thread _active_limbo_lock = _allocate_lock() # fork() only copied the current thread; clear references to others. new_active = {} current = current_thread() + _main_thread = current with _active_limbo_lock: - for thread in _enumerate(): + # Dangling thread instances must still have their locks reset, + # because someone may join() them. + threads = set(_enumerate()) + threads.update(_dangling) + for thread in threads: # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. - thread._reset_internal_locks() if thread is current: # There is only one active thread. We reset the ident to # its new value since it can have changed. + thread._reset_internal_locks(True) ident = get_ident() thread._ident = ident new_active[ident] = thread else: # All the others are already stopped. + thread._reset_internal_locks(False) thread._stop() _limbo.clear() |