summaryrefslogtreecommitdiff
path: root/Lib/threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/threading.py')
-rw-r--r--Lib/threading.py188
1 files changed, 127 insertions, 61 deletions
diff --git a/Lib/threading.py b/Lib/threading.py
index 625c9b9d7b..d907c89f3f 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -10,6 +10,11 @@ except ImportError:
from time import time as _time
from traceback import format_exc as _format_exc
from _weakrefset import WeakSet
+from itertools import islice as _islice
+try:
+ from _collections import deque as _deque
+except ImportError:
+ from collections import deque as _deque
# Note regarding PEP 8 compliant names
# This threading model was originally inspired by Java, and inherited
@@ -28,6 +33,7 @@ __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event',
# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
+_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
ThreadError = _thread.error
try:
@@ -132,7 +138,7 @@ class _RLock:
"""
me = get_ident()
if self._owner == me:
- self._count = self._count + 1
+ self._count += 1
return 1
rc = self._block.acquire(blocking, timeout)
if rc:
@@ -224,7 +230,7 @@ class Condition:
self._is_owned = lock._is_owned
except AttributeError:
pass
- self._waiters = []
+ self._waiters = _deque()
def __enter__(self):
return self._lock.__enter__()
@@ -332,14 +338,14 @@ class Condition:
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
- __waiters = self._waiters
- waiters = __waiters[:n]
- if not waiters:
+ all_waiters = self._waiters
+ waiters_to_notify = _deque(_islice(all_waiters, n))
+ if not waiters_to_notify:
return
- for waiter in waiters:
+ for waiter in waiters_to_notify:
waiter.release()
try:
- __waiters.remove(waiter)
+ all_waiters.remove(waiter)
except ValueError:
pass
@@ -414,7 +420,7 @@ class Semaphore:
break
self._cond.wait(timeout)
else:
- self._value = self._value - 1
+ self._value -= 1
rc = True
return rc
@@ -428,7 +434,7 @@ class Semaphore:
"""
with self._cond:
- self._value = self._value + 1
+ self._value += 1
self._cond.notify()
def __exit__(self, t, v, tb):
@@ -723,15 +729,13 @@ class BrokenBarrierError(RuntimeError):
_counter = 0
def _newname(template="Thread-%d"):
global _counter
- _counter = _counter + 1
+ _counter += 1
return template % _counter
# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {} # maps thread id to Thread object
_limbo = {}
-
-# For debug and leak testing
_dangling = WeakSet()
# Main class for threads
@@ -790,28 +794,35 @@ class Thread:
else:
self._daemonic = current_thread().daemon
self._ident = None
+ self._tstate_lock = None
self._started = Event()
- self._stopped = False
- self._block = Condition(Lock())
+ self._is_stopped = False
self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
self._stderr = _sys.stderr
+ # For debugging and _after_fork()
_dangling.add(self)
- def _reset_internal_locks(self):
+ def _reset_internal_locks(self, is_alive):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
- if hasattr(self, '_block'): # DummyThread deletes _block
- self._block.__init__()
self._started._reset_internal_locks()
+ if is_alive:
+ self._set_tstate_lock()
+ else:
+ # The thread isn't alive after fork: it doesn't have a tstate
+ # anymore.
+ self._is_stopped = True
+ self._tstate_lock = None
def __repr__(self):
assert self._initialized, "Thread.__init__() was not called"
status = "initial"
if self._started.is_set():
status = "started"
- if self._stopped:
+ self.is_alive() # easy way to get ._is_stopped set when appropriate
+ if self._is_stopped:
status = "stopped"
if self._daemonic:
status += " daemon"
@@ -884,9 +895,18 @@ class Thread:
def _set_ident(self):
self._ident = get_ident()
+ def _set_tstate_lock(self):
+ """
+ Set a lock object which will be released by the interpreter when
+ the underlying thread state (see pystate.h) gets deleted.
+ """
+ self._tstate_lock = _set_sentinel()
+ self._tstate_lock.acquire()
+
def _bootstrap_inner(self):
try:
self._set_ident()
+ self._set_tstate_lock()
self._started.set()
with _active_limbo_lock:
_active[self._ident] = self
@@ -941,7 +961,6 @@ class Thread:
pass
finally:
with _active_limbo_lock:
- self._stop()
try:
# We don't call self._delete() because it also
# grabs _active_limbo_lock.
@@ -950,10 +969,27 @@ class Thread:
pass
def _stop(self):
- self._block.acquire()
- self._stopped = True
- self._block.notify_all()
- self._block.release()
+ # After calling ._stop(), .is_alive() returns False and .join() returns
+ # immediately. ._tstate_lock must be released before calling ._stop().
+ #
+ # Normal case: C code at the end of the thread's life
+ # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
+ # that's detected by our ._wait_for_tstate_lock(), called by .join()
+ # and .is_alive(). Any number of threads _may_ call ._stop()
+ # simultaneously (for example, if multiple threads are blocked in
+ # .join() calls), and they're not serialized. That's harmless -
+ # they'll just make redundant rebindings of ._is_stopped and
+ # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
+ # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
+ # (the assert is executed only if ._tstate_lock is None).
+ #
+ # Special case: _main_thread releases ._tstate_lock via this
+ # module's _shutdown() function.
+ lock = self._tstate_lock
+ if lock is not None:
+ assert not lock.locked()
+ self._is_stopped = True
+ self._tstate_lock = None
def _delete(self):
"Remove current thread from the dict of currently running threads."
@@ -1021,20 +1057,26 @@ class Thread:
if self is current_thread():
raise RuntimeError("cannot join current thread")
- self._block.acquire()
- try:
- if timeout is None:
- while not self._stopped:
- self._block.wait()
- else:
- deadline = _time() + timeout
- while not self._stopped:
- delay = deadline - _time()
- if delay <= 0:
- break
- self._block.wait(delay)
- finally:
- self._block.release()
+ if timeout is None:
+ self._wait_for_tstate_lock()
+ else:
+ # the behavior of a negative timeout isn't documented, but
+ # historically .join(timeout=x) for x<0 has acted as if timeout=0
+ self._wait_for_tstate_lock(timeout=max(timeout, 0))
+
+ def _wait_for_tstate_lock(self, block=True, timeout=-1):
+ # Issue #18808: wait for the thread state to be gone.
+ # At the end of the thread's life, after all knowledge of the thread
+ # is removed from C data structures, C code releases our _tstate_lock.
+ # This method passes its arguments to _tstate_lock.aquire().
+ # If the lock is acquired, the C code is done, and self._stop() is
+ # called. That sets ._is_stopped to True, and ._tstate_lock to None.
+ lock = self._tstate_lock
+ if lock is None: # already determined that the C code is done
+ assert self._is_stopped
+ elif lock.acquire(block, timeout):
+ lock.release()
+ self._stop()
@property
def name(self):
@@ -1073,7 +1115,10 @@ class Thread:
"""
assert self._initialized, "Thread.__init__() not called"
- return self._started.is_set() and not self._stopped
+ if self._is_stopped or not self._started.is_set():
+ return False
+ self._wait_for_tstate_lock(False)
+ return not self._is_stopped
isAlive = is_alive
@@ -1149,25 +1194,12 @@ class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread", daemon=False)
+ self._set_tstate_lock()
self._started.set()
self._set_ident()
with _active_limbo_lock:
_active[self._ident] = self
- def _exitfunc(self):
- self._stop()
- t = _pickSomeNonDaemonThread()
- while t:
- t.join()
- t = _pickSomeNonDaemonThread()
- self._delete()
-
-def _pickSomeNonDaemonThread():
- for t in enumerate():
- if not t.daemon and t.is_alive():
- return t
- return None
-
# Dummy thread class to represent threads not started here.
# These aren't garbage collected when they die, nor can they be waited for.
@@ -1182,11 +1214,6 @@ class _DummyThread(Thread):
def __init__(self):
Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
- # Thread._block consumes an OS-level locking primitive, which
- # can never be used by a _DummyThread. Since a _DummyThread
- # instance is immortal, that's bad, so release this resource.
- del self._block
-
self._started.set()
self._set_ident()
with _active_limbo_lock:
@@ -1248,7 +1275,40 @@ from _thread import stack_size
# and make it available for the interpreter
# (Py_Main) as threading._shutdown.
-_shutdown = _MainThread()._exitfunc
+_main_thread = _MainThread()
+
+def _shutdown():
+ # Obscure: other threads may be waiting to join _main_thread. That's
+ # dubious, but some code does it. We can't wait for C code to release
+ # the main thread's tstate_lock - that won't happen until the interpreter
+ # is nearly dead. So we release it here. Note that just calling _stop()
+ # isn't enough: other threads may already be waiting on _tstate_lock.
+ tlock = _main_thread._tstate_lock
+ # The main thread isn't finished yet, so its thread state lock can't have
+ # been released.
+ assert tlock is not None
+ assert tlock.locked()
+ tlock.release()
+ _main_thread._stop()
+ t = _pickSomeNonDaemonThread()
+ while t:
+ t.join()
+ t = _pickSomeNonDaemonThread()
+ _main_thread._delete()
+
+def _pickSomeNonDaemonThread():
+ for t in enumerate():
+ if not t.daemon and t.is_alive():
+ return t
+ return None
+
+def main_thread():
+ """Return the main thread object.
+
+ In normal conditions, the main thread is the thread from which the
+ Python interpreter was started.
+ """
+ return _main_thread
# get thread-local implementation, either from the thread
# module, or from the python fallback
@@ -1266,25 +1326,31 @@ def _after_fork():
# Reset _active_limbo_lock, in case we forked while the lock was held
# by another (non-forked) thread. http://bugs.python.org/issue874900
- global _active_limbo_lock
+ global _active_limbo_lock, _main_thread
_active_limbo_lock = _allocate_lock()
# fork() only copied the current thread; clear references to others.
new_active = {}
current = current_thread()
+ _main_thread = current
with _active_limbo_lock:
- for thread in _enumerate():
+ # Dangling thread instances must still have their locks reset,
+ # because someone may join() them.
+ threads = set(_enumerate())
+ threads.update(_dangling)
+ for thread in threads:
# Any lock/condition variable may be currently locked or in an
# invalid state, so we reinitialize them.
- thread._reset_internal_locks()
if thread is current:
# There is only one active thread. We reset the ident to
# its new value since it can have changed.
+ thread._reset_internal_locks(True)
ident = get_ident()
thread._ident = ident
new_active[ident] = thread
else:
# All the others are already stopped.
+ thread._reset_internal_locks(False)
thread._stop()
_limbo.clear()