diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2012-12-18 21:26:36 +0200 |
---|---|---|
committer | Andrew Svetlov <andrew.svetlov@gmail.com> | 2012-12-18 21:26:36 +0200 |
commit | 40929f315c0bb0f7feb4a5204bafa9f6dd7160f7 (patch) | |
tree | 4d149e48aa45aebc236f60d7580bcae869237e58 /Lib/multiprocessing | |
parent | 7338e256fa0a303ebdfe7fe0ef65570636483918 (diff) | |
parent | 6608ad78ccc19c44905a3fecd14508c5c6340741 (diff) | |
download | cpython-40929f315c0bb0f7feb4a5204bafa9f6dd7160f7.tar.gz |
Issue #16714: use 'raise' exceptions, don't 'throw'.
Patch by Serhiy Storchaka.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/__init__.py | 57 | ||||
-rw-r--r-- | Lib/multiprocessing/connection.py | 659 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 17 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/connection.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 190 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 45 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 171 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 116 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 47 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 46 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 407 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 38 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 94 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 71 |
14 files changed, 1169 insertions, 801 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index e6e16c8322..1f3e67c9b8 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -13,32 +13,7 @@ # # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __version__ = '0.70a1' @@ -48,8 +23,8 @@ __all__ = [ 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array', - 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', + 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', + 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] __author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)' @@ -161,7 +136,9 @@ def allow_connection_pickling(): ''' Install support for sending connections and sockets between processes ''' - from multiprocessing import reduction + # This is undocumented. In previous versions of multiprocessing + # its only effect was to make socket objects inheritable on Windows. + import multiprocessing.connection # # Definitions depending on native semaphores @@ -209,6 +186,13 @@ def Event(): from multiprocessing.synchronize import Event return Event() +def Barrier(parties, action=None, timeout=None): + ''' + Returns a barrier object + ''' + from multiprocessing.synchronize import Barrier + return Barrier(parties, action, timeout) + def Queue(maxsize=0): ''' Returns a queue object @@ -223,6 +207,13 @@ def JoinableQueue(maxsize=0): from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) +def SimpleQueue(): + ''' + Returns a queue object + ''' + from multiprocessing.queues import SimpleQueue + return SimpleQueue() + def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object @@ -244,19 +235,19 @@ def RawArray(typecode_or_type, size_or_initializer): from multiprocessing.sharedctypes import RawArray return RawArray(typecode_or_type, size_or_initializer) -def Value(typecode_or_type, *args, **kwds): +def Value(typecode_or_type, *args, lock=True): ''' Returns a synchronized shared object ''' from multiprocessing.sharedctypes import Value - return Value(typecode_or_type, *args, **kwds) + return Value(typecode_or_type, *args, lock=lock) -def Array(typecode_or_type, size_or_initializer, **kwds): +def Array(typecode_or_type, size_or_initializer, *, lock=True): ''' Returns a synchronized shared array ''' from multiprocessing.sharedctypes import Array - return Array(typecode_or_type, size_or_initializer, **kwds) + return Array(typecode_or_type, size_or_initializer, lock=lock) # # diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 4fa6f70bf3..fbbd5d91d3 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -4,49 +4,34 @@ # multiprocessing/connection.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # -__all__ = [ 'Client', 'Listener', 'Pipe' ] +__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] +import io import os import sys +import pickle +import select import socket +import struct import errno import time import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError +from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - +from multiprocessing.forking import ForkingPickler +try: + import _winapi + from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE +except ImportError: + if sys.platform == 'win32': + raise + _winapi = None # # @@ -122,6 +107,309 @@ def address_type(address): raise ValueError('address type of %r unrecognized' % address) # +# Connection classes +# + +class _ConnectionBase: + _handle = None + + def __init__(self, handle, readable=True, writable=True): + handle = handle.__index__() + if handle < 0: + raise ValueError("invalid handle") + if not readable and not writable: + raise ValueError( + "at least one of `readable` and `writable` must be True") + self._handle = handle + self._readable = readable + self._writable = writable + + # XXX should we use util.Finalize instead of a __del__? + + def __del__(self): + if self._handle is not None: + self._close() + + def _check_closed(self): + if self._handle is None: + raise IOError("handle is closed") + + def _check_readable(self): + if not self._readable: + raise IOError("connection is write-only") + + def _check_writable(self): + if not self._writable: + raise IOError("connection is read-only") + + def _bad_message_length(self): + if self._writable: + self._readable = False + else: + self.close() + raise IOError("bad message length") + + @property + def closed(self): + """True if the connection is closed""" + return self._handle is None + + @property + def readable(self): + """True if the connection is readable""" + return self._readable + + @property + def writable(self): + """True if the connection is writable""" + return self._writable + + def fileno(self): + """File descriptor or handle of the connection""" + self._check_closed() + return self._handle + + def close(self): + """Close the connection""" + if self._handle is not None: + try: + self._close() + finally: + self._handle = None + + def send_bytes(self, buf, offset=0, size=None): + """Send the bytes data from a bytes-like object""" + self._check_closed() + self._check_writable() + m = memoryview(buf) + # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) + if m.itemsize > 1: + m = memoryview(bytes(m)) + n = len(m) + if offset < 0: + raise ValueError("offset is negative") + if n < offset: + raise ValueError("buffer length < offset") + if size is None: + size = n - offset + elif size < 0: + raise ValueError("size is negative") + elif offset + size > n: + raise ValueError("buffer length < offset + size") + self._send_bytes(m[offset:offset + size]) + + def send(self, obj): + """Send a (picklable) object""" + self._check_closed() + self._check_writable() + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + self._send_bytes(buf.getbuffer()) + + def recv_bytes(self, maxlength=None): + """ + Receive bytes data as a bytes object. + """ + self._check_closed() + self._check_readable() + if maxlength is not None and maxlength < 0: + raise ValueError("negative maxlength") + buf = self._recv_bytes(maxlength) + if buf is None: + self._bad_message_length() + return buf.getvalue() + + def recv_bytes_into(self, buf, offset=0): + """ + Receive bytes data into a writeable buffer-like object. + Return the number of bytes read. + """ + self._check_closed() + self._check_readable() + with memoryview(buf) as m: + # Get bytesize of arbitrary buffer + itemsize = m.itemsize + bytesize = itemsize * len(m) + if offset < 0: + raise ValueError("negative offset") + elif offset > bytesize: + raise ValueError("offset too large") + result = self._recv_bytes() + size = result.tell() + if bytesize < offset + size: + raise BufferTooShort(result.getvalue()) + # Message can fit in dest + result.seek(0) + result.readinto(m[offset // itemsize : + (offset + size) // itemsize]) + return size + + def recv(self): + """Receive a (picklable) object""" + self._check_closed() + self._check_readable() + buf = self._recv_bytes() + return pickle.loads(buf.getbuffer()) + + def poll(self, timeout=0.0): + """Whether there is any input available to be read""" + self._check_closed() + self._check_readable() + return self._poll(timeout) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + + +if _winapi: + + class PipeConnection(_ConnectionBase): + """ + Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. + """ + _got_empty_message = False + + def _close(self, _CloseHandle=_winapi.CloseHandle): + _CloseHandle(self._handle) + + def _send_bytes(self, buf): + ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + try: + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nwritten, err = ov.GetOverlappedResult(True) + assert err == 0 + assert nwritten == len(buf) + + def _recv_bytes(self, maxsize=None): + if self._got_empty_message: + self._got_empty_message = False + return io.BytesIO() + else: + bsize = 128 if maxsize is None else min(maxsize, 128) + try: + ov, err = _winapi.ReadFile(self._handle, bsize, + overlapped=True) + try: + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nread, err = ov.GetOverlappedResult(True) + if err == 0: + f = io.BytesIO() + f.write(ov.getbuffer()) + return f + elif err == _winapi.ERROR_MORE_DATA: + return self._get_more_data(ov, maxsize) + except IOError as e: + if e.winerror == _winapi.ERROR_BROKEN_PIPE: + raise EOFError + else: + raise + raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") + + def _poll(self, timeout): + if (self._got_empty_message or + _winapi.PeekNamedPipe(self._handle)[0] != 0): + return True + return bool(wait([self], timeout)) + + def _get_more_data(self, ov, maxsize): + buf = ov.getbuffer() + f = io.BytesIO() + f.write(buf) + left = _winapi.PeekNamedPipe(self._handle)[1] + assert left > 0 + if maxsize is not None and len(buf) + left > maxsize: + self._bad_message_length() + ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) + rbytes, err = ov.GetOverlappedResult(True) + assert err == 0 + assert rbytes == left + f.write(ov.getbuffer()) + return f + + +class Connection(_ConnectionBase): + """ + Connection class based on an arbitrary file descriptor (Unix only), or + a socket handle (Windows). + """ + + if _winapi: + def _close(self, _close=_multiprocessing.closesocket): + _close(self._handle) + _write = _multiprocessing.send + _read = _multiprocessing.recv + else: + def _close(self, _close=os.close): + _close(self._handle) + _write = os.write + _read = os.read + + def _send(self, buf, write=_write): + remaining = len(buf) + while True: + n = write(self._handle, buf) + remaining -= n + if remaining == 0: + break + buf = buf[n:] + + def _recv(self, size, read=_read): + buf = io.BytesIO() + handle = self._handle + remaining = size + while remaining > 0: + chunk = read(handle, remaining) + n = len(chunk) + if n == 0: + if remaining == size: + raise EOFError + else: + raise IOError("got end of file during message") + buf.write(chunk) + remaining -= n + return buf + + def _send_bytes(self, buf): + # For wire compatibility with 3.2 and lower + n = len(buf) + self._send(struct.pack("!i", n)) + # The condition is necessary to avoid "broken pipe" errors + # when sending a 0-length buffer if the other end closed the pipe. + if n > 0: + self._send(buf) + + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) + size, = struct.unpack("!i", buf.getvalue()) + if maxsize is not None and size > maxsize: + return None + return self._recv(size) + + def _poll(self, timeout): + r = wait([self._handle], timeout) + return bool(r) + + +# # Public functions # @@ -154,6 +442,8 @@ class Listener(object): Returns a `Connection` object. ''' + if self._listener is None: + raise IOError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) @@ -164,11 +454,19 @@ class Listener(object): ''' Close the bound socket or named pipe of `self`. ''' - return self._listener.close() + if self._listener is not None: + self._listener.close() + self._listener = None address = property(lambda self: self._listener._address) last_accepted = property(lambda self: self._listener._last_accepted) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address, family=None, authkey=None): ''' @@ -201,56 +499,52 @@ if sys.platform != 'win32': s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) - s1.close() - s2.close() + c1 = Connection(s1.detach()) + c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) + c1 = Connection(fd1, writable=False) + c2 = Connection(fd2, readable=False) return c1, c2 else: - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' address = arbitrary_address('AF_PIPE') if duplex: - openmode = win32.PIPE_ACCESS_DUPLEX - access = win32.GENERIC_READ | win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_DUPLEX + access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: - openmode = win32.PIPE_ACCESS_INBOUND - access = win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_INBOUND + access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE - h1 = win32.CreateNamedPipe( - address, openmode, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL + h1 = _winapi.CreateNamedPipe( + address, openmode | _winapi.FILE_FLAG_OVERLAPPED | + _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) - h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + h2 = _winapi.CreateFile( + address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) - win32.SetNamedPipeHandleState( - h2, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h2, _winapi.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) + _, err = overlapped.GetOverlappedResult(True) + assert err == 0 - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + c1 = PipeConnection(h1, writable=duplex) + c2 = PipeConnection(h2, readable=duplex) return c1, c2 @@ -265,12 +559,15 @@ class SocketListener(object): def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) try: - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # SO_REUSEADDR has different semantics on Windows (issue #2550). + if os.name == 'posix': + self._socket.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) self._socket.setblocking(True) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() - except socket.error: + except OSError: self._socket.close() raise self._family = family @@ -286,10 +583,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() s.setblocking(True) - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - s.close() - return conn + return Connection(s.detach()) def close(self): self._socket.close() @@ -304,24 +598,8 @@ def SocketClient(address): family = address_type(address) with socket.socket( getattr(socket, family) ) as s: s.setblocking(True) - t = _init_timeout() - - while 1: - try: - s.connect(address) - except socket.error as e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) - else: - break - else: - raise - - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - return conn + s.connect(address) + return Connection(s.detach()) # # Definitions for connections based on named pipes @@ -335,48 +613,55 @@ if sys.platform == 'win32': ''' def __init__(self, address, backlog=None): self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue = [handle] - self._last_accepted = None + self._handle_queue = [self._new_handle(first=True)] + self._last_accepted = None sub_debug('listener created with address=%r', self._address) - self.close = Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) - def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL + def _new_handle(self, first=False): + flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED + if first: + flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + return _winapi.CreateNamedPipe( + self._address, flags, + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, + _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) - self._handle_queue.append(newhandle) + + def accept(self): + self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError as e: + ov = _winapi.ConnectNamedPipe(handle, overlapped=True) + except OSError as e: + if e.winerror != _winapi.ERROR_NO_DATA: + raise # ERROR_NO_DATA can occur if a client has already connected, # written data and then disconnected -- see Issue 14725. - if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, - win32.ERROR_NO_DATA): + else: + try: + res = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + except: + ov.cancel() + _winapi.CloseHandle(handle) raise - return _multiprocessing.PipeConnection(handle) + finally: + _, err = ov.GetOverlappedResult(True) + assert err == 0 + return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): sub_debug('closing listener with address=%r', address) for handle in queue: - close(handle) + _winapi.CloseHandle(handle) def PipeClient(address): ''' @@ -385,24 +670,25 @@ if sys.platform == 'win32': t = _init_timeout() while 1: try: - win32.WaitNamedPipe(address, 1000) - h = win32.CreateFile( - address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + _winapi.WaitNamedPipe(address, 1000) + h = _winapi.CreateFile( + address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, + 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) except WindowsError as e: - if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): + if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, + _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break else: raise - win32.SetNamedPipeHandleState( - h, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h, _winapi.PIPE_READMODE_MESSAGE, None, None ) - return _multiprocessing.PipeConnection(h) + return PipeConnection(h) # # Authentication stuff @@ -459,10 +745,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): @@ -476,3 +762,140 @@ def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + +# +# Wait +# + +if sys.platform == 'win32': + + def _exhaustive_wait(handles, timeout): + # Return ALL handles which are currently signalled. (Only + # returning the first signalled might create starvation issues.) + L = list(handles) + ready = [] + while L: + res = _winapi.WaitForMultipleObjects(L, False, timeout) + if res == WAIT_TIMEOUT: + break + elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): + res -= WAIT_OBJECT_0 + elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): + res -= WAIT_ABANDONED_0 + else: + raise RuntimeError('Should not get here') + ready.append(L[res]) + L = L[res+1:] + timeout = 0 + return ready + + _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is None: + timeout = INFINITE + elif timeout < 0: + timeout = 0 + else: + timeout = int(timeout * 1000 + 0.5) + + object_list = list(object_list) + waithandle_to_obj = {} + ov_list = [] + ready_objects = set() + ready_handles = set() + + try: + for o in object_list: + try: + fileno = getattr(o, 'fileno') + except AttributeError: + waithandle_to_obj[o.__index__()] = o + else: + # start an overlapped read of length zero + try: + ov, err = _winapi.ReadFile(fileno(), 0, True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err == _winapi.ERROR_IO_PENDING: + ov_list.append(ov) + waithandle_to_obj[ov.event] = o + else: + # If o.fileno() is an overlapped pipe handle and + # err == 0 then there is a zero length message + # in the pipe, but it HAS NOT been consumed. + ready_objects.add(o) + timeout = 0 + + ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) + finally: + # request that overlapped reads stop + for ov in ov_list: + ov.cancel() + + # wait for all overlapped reads to stop + for ov in ov_list: + try: + _, err = ov.GetOverlappedResult(True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err != _winapi.ERROR_OPERATION_ABORTED: + o = waithandle_to_obj[ov.event] + ready_objects.add(o) + if err == 0: + # If o.fileno() is an overlapped pipe handle then + # a zero length message HAS been consumed. + if hasattr(o, '_got_empty_message'): + o._got_empty_message = True + + ready_objects.update(waithandle_to_obj[h] for h in ready_handles) + return [o for o in object_list if o in ready_objects] + +else: + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is not None: + if timeout <= 0: + return select.select(object_list, [], [], 0)[0] + else: + deadline = time.time() + timeout + while True: + try: + return select.select(object_list, [], [], timeout)[0] + except OSError as e: + if e.errno != errno.EINTR: + raise + if timeout is not None: + timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32': + from . import reduction + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) + ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) +else: + try: + from . import reduction + except ImportError: + pass + else: + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 101c3cba4d..e31fc61572 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -35,7 +35,7 @@ __all__ = [ 'Process', 'current_process', 'active_children', 'freeze_support', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' + 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' ] # @@ -46,12 +46,10 @@ import threading import sys import weakref import array -import itertools -from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event +from threading import Event, Condition, Barrier from queue import Queue # @@ -85,17 +83,6 @@ class DummyProcess(threading.Thread): # # -class Condition(threading._Condition): - # XXX - if sys.version_info < (3, 0): - notify_all = threading._Condition.notify_all.__func__ - else: - notify_all = threading._Condition.notify_all - -# -# -# - Process = DummyProcess current_process = threading.current_thread current_process()._children = weakref.WeakKeyDictionary() diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index af105794f1..874ec8e432 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -53,6 +53,12 @@ class Listener(object): address = property(lambda self: self._backlog_queue) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address): _in, _out = Queue(), Queue() @@ -85,3 +91,9 @@ class Connection(object): def close(self): pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index bc8ac44c22..c5501a2f75 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -4,32 +4,7 @@ # multiprocessing/forking.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import os @@ -38,7 +13,7 @@ import signal from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] +__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler'] # # Check that the current thread is spawning a child process @@ -55,18 +30,18 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table + class ForkingPickler(Pickler): - dispatch = Pickler.dispatch.copy() + _extra_reducers = {} + def __init__(self, *args): + Pickler.__init__(self, *args) + self.dispatch_table = dispatch_table.copy() + self.dispatch_table.update(self._extra_reducers) @classmethod def register(cls, type, reduce): - def dispatcher(self, obj): - rv = reduce(obj) - if isinstance(rv, str): - self.save_global(obj, rv) - else: - self.save_reduce(obj=obj, *rv) - cls.dispatch[type] = dispatcher + cls._extra_reducers[type] = reduce def _reduce_method(m): if m.__self__ is None: @@ -100,9 +75,6 @@ else: # if sys.platform != 'win32': - import time - - exit = os._exit duplicate = os.dup close = os.close @@ -118,14 +90,23 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -143,26 +124,20 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + from .connection import wait + if not wait([self.sentinel], timeout): + return None + # This shouldn't block if wait() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError as e: + except OSError: if self.wait(timeout=0.1) is None: raise @@ -177,12 +152,9 @@ if sys.platform != 'win32': else: import _thread import msvcrt - import _subprocess - import time + import _winapi - from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection - from .util import Finalize + from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -195,8 +167,7 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = win32.ExitProcess - close = win32.CloseHandle + close = _winapi.CloseHandle # # _python_exe is the assumed path to the python executable. @@ -218,11 +189,11 @@ else: def duplicate(handle, target_process=None, inheritable=False): if target_process is None: - target_process = _subprocess.GetCurrentProcess() - return _subprocess.DuplicateHandle( - _subprocess.GetCurrentProcess(), handle, target_process, - 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS - ).Detach() + target_process = _winapi.GetCurrentProcess() + return _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), handle, target_process, + 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS + ) # # We define a Popen class similar to the one from subprocess, but @@ -236,6 +207,9 @@ else: _tls = _thread._local() def __init__(self, process_obj): + cmd = ' '.join('"%s"' % x for x in get_command_line()) + prep_data = get_preparation_data(process_obj._name) + # create pipe for communication with child rfd, wfd = os.pipe() @@ -243,30 +217,31 @@ else: rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) os.close(rfd) - # start process - cmd = get_command_line() + [rhandle] - cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _subprocess.CreateProcess( - _python_exe, cmd, None, None, 1, 0, None, None, None - ) - ht.Close() - close(rhandle) - - # set attributes of self - self.pid = pid - self.returncode = None - self._handle = hp - - # send information to child - prep_data = get_preparation_data(process_obj._name) - to_child = os.fdopen(wfd, 'wb') - Popen._tls.process_handle = int(hp) - try: - dump(prep_data, to_child, HIGHEST_PROTOCOL) - dump(process_obj, to_child, HIGHEST_PROTOCOL) - finally: - del Popen._tls.process_handle - to_child.close() + with open(wfd, 'wb', closefd=True) as to_child: + # start process + try: + hp, ht, pid, tid = _winapi.CreateProcess( + _python_exe, cmd + (' %s' % rhandle), + None, None, 1, 0, None, None, None + ) + _winapi.CloseHandle(ht) + finally: + close(rhandle) + + # set attributes of self + self.pid = pid + self.returncode = None + self._handle = hp + self.sentinel = int(hp) + util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) + + # send information to child + Popen._tls.process_handle = int(hp) + try: + dump(prep_data, to_child, HIGHEST_PROTOCOL) + dump(process_obj, to_child, HIGHEST_PROTOCOL) + finally: + del Popen._tls.process_handle @staticmethod def thread_is_spawning(): @@ -279,13 +254,13 @@ else: def wait(self, timeout=None): if self.returncode is None: if timeout is None: - msecs = _subprocess.INFINITE + msecs = _winapi.INFINITE else: msecs = max(0, int(timeout * 1000 + 0.5)) - res = _subprocess.WaitForSingleObject(int(self._handle), msecs) - if res == _subprocess.WAIT_OBJECT_0: - code = _subprocess.GetExitCodeProcess(self._handle) + res = _winapi.WaitForSingleObject(int(self._handle), msecs) + if res == _winapi.WAIT_OBJECT_0: + code = _winapi.GetExitCodeProcess(self._handle) if code == TERMINATE: code = -signal.SIGTERM self.returncode = code @@ -298,9 +273,9 @@ else: def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) - except WindowsError: - if self.wait(timeout=0.1) is None: + _winapi.TerminateProcess(int(self._handle), TERMINATE) + except OSError: + if self.wait(timeout=1.0) is None: raise # @@ -350,7 +325,8 @@ else: return [sys.executable, '--multiprocessing-fork'] else: prog = 'from multiprocessing.forking import main; main()' - return [_python_exe, '-c', prog, '--multiprocessing-fork'] + opts = util._args_from_interpreter_flags() + return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] def main(): @@ -372,7 +348,7 @@ else: from_parent.close() exitcode = self._bootstrap() - exit(exitcode) + sys.exit(exitcode) def get_preparation_data(name): @@ -405,22 +381,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 0a25ef05c7..4e93c12f81 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -4,37 +4,11 @@ # multiprocessing/heap.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import bisect import mmap -import tempfile import os import sys import threading @@ -52,7 +26,7 @@ __all__ = ['BufferWrapper'] if sys.platform == 'win32': - from _multiprocessing import win32 + import _winapi class Arena(object): @@ -62,7 +36,7 @@ if sys.platform == 'win32': self.size = size self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == 0, 'tagname already in use' + assert _winapi.GetLastError() == 0, 'tagname already in use' self._state = (self.size, self.name) def __getstate__(self): @@ -72,7 +46,7 @@ if sys.platform == 'win32': def __setstate__(self, state): self.size, self.name = self._state = state self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS + assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS else: @@ -231,7 +205,7 @@ class Heap(object): self._lock.release() # -# Class representing a chunk of an mmap -- can be inherited +# Class representing a chunk of an mmap -- can be inherited by child process # class BufferWrapper(object): @@ -244,11 +218,6 @@ class BufferWrapper(object): self._state = (block, size) Finalize(self, BufferWrapper._heap.free, args=(block,)) - def get_address(self): + def create_memoryview(self): (arena, start, stop), size = self._state - address, length = _multiprocessing.address_of_buffer(arena.buffer) - assert size <= length - return address + start - - def get_size(self): - return self._state[1] + return memoryview(arena.buffer)[start:start+size] diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 5588ead116..1ab147e29e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -5,32 +5,7 @@ # multiprocessing/managers.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] @@ -39,19 +14,16 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] # Imports # -import os import sys -import weakref import threading import array import queue from traceback import format_exc -from pickle import PicklingError from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler -from multiprocessing.util import Finalize, info +from multiprocessing.forking import Popen, ForkingPickler +from time import time as _time # # Register some things for pickling @@ -168,28 +140,38 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} self.mutex = threading.RLock() - self.stop = 0 def serve_forever(self): ''' Run the server forever ''' + self.stop_event = threading.Event() current_process()._manager_server = self try: + accepter = threading.Thread(target=self.accepter) + accepter.daemon = True + accepter.start() try: - while 1: - try: - c = self.listener.accept() - except (OSError, IOError): - continue - t = threading.Thread(target=self.handle_request, args=(c,)) - t.daemon = True - t.start() + while not self.stop_event.is_set(): + self.stop_event.wait(1) except (KeyboardInterrupt, SystemExit): pass finally: - self.stop = 999 - self.listener.close() + if sys.stdout != sys.__stdout__: + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + sys.exit(0) + + def accepter(self): + while True: + try: + c = self.listener.accept() + except (OSError, IOError): + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.daemon = True + t.start() def handle_request(self, c): ''' @@ -236,7 +218,7 @@ class Server(object): send = conn.send id_to_obj = self.id_to_obj - while not self.stop: + while not self.stop_event.is_set(): try: methodname = obj = None @@ -346,32 +328,13 @@ class Server(object): Shutdown this process ''' try: - try: - util.debug('manager received shutdown message') - c.send(('#RETURN', None)) - - if sys.stdout != sys.__stdout__: - util.debug('resetting stdout, stderr') - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - - util._run_finalizers(0) - - for p in active_children(): - util.debug('terminating a child process of manager') - p.terminate() - - for p in active_children(): - util.debug('terminating a child process of manager') - p.join() - - util._run_finalizers() - util.info('manager exiting with exitcode 0') - except: - import traceback - traceback.print_exc() + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + except: + import traceback + traceback.print_exc() finally: - exit(0) + self.stop_event.set() def create(self, c, typeid, *args, **kwds): ''' @@ -483,10 +446,6 @@ class BaseManager(object): self._serializer = serializer self._Listener, self._Client = listener_client[serializer] - def __reduce__(self): - return type(self).from_address, \ - (self._address, self._authkey, self._serializer) - def get_server(self): ''' Return server object with serve_forever() method and address attribute @@ -576,7 +535,10 @@ class BaseManager(object): ''' Join the manager process (if it has been spawned) ''' - self._process.join(timeout) + if self._process is not None: + self._process.join(timeout) + if not self._process.is_alive(): + self._process = None def _debug_info(self): ''' @@ -599,6 +561,9 @@ class BaseManager(object): conn.close() def __enter__(self): + if self._state.value == State.INITIAL: + self.start() + assert self._state.value == State.STARTED return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -620,7 +585,7 @@ class BaseManager(object): except Exception: pass - process.join(timeout=0.2) + process.join(timeout=1.0) if process.is_alive(): util.info('manager still alive') if hasattr(process, 'terminate'): @@ -982,8 +947,9 @@ class IteratorProxy(BaseProxy): class AcquirerProxy(BaseProxy): _exposed_ = ('acquire', 'release') - def acquire(self, blocking=True): - return self._callmethod('acquire', (blocking,)) + def acquire(self, blocking=True, timeout=None): + args = (blocking,) if timeout is None else (blocking, timeout) + return self._callmethod('acquire', args) def release(self): return self._callmethod('release') def __enter__(self): @@ -1000,6 +966,24 @@ class ConditionProxy(AcquirerProxy): return self._callmethod('notify') def notify_all(self): return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + class EventProxy(BaseProxy): _exposed_ = ('is_set', 'set', 'clear', 'wait') @@ -1012,6 +996,26 @@ class EventProxy(BaseProxy): def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) + +class BarrierProxy(BaseProxy): + _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def abort(self): + return self._callmethod('abort') + def reset(self): + return self._callmethod('reset') + @property + def parties(self): + return self._callmethod('__getattribute__', ('parties',)) + @property + def n_waiting(self): + return self._callmethod('__getattribute__', ('n_waiting',)) + @property + def broken(self): + return self._callmethod('__getattribute__', ('broken',)) + + class NamespaceProxy(BaseProxy): _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') def __getattr__(self, key): @@ -1041,12 +1045,11 @@ class ValueProxy(BaseProxy): BaseListProxy = MakeProxyType('BaseListProxy', ( - '__add__', '__contains__', '__delitem__', '__delslice__', - '__getitem__', '__getslice__', '__len__', '__mul__', - '__reversed__', '__rmul__', '__setitem__', '__setslice__', + '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', + '__mul__', '__reversed__', '__rmul__', '__setitem__', 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort', '__imul__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + )) class ListProxy(BaseListProxy): def __iadd__(self, value): self._callmethod('extend', (value,)) @@ -1064,17 +1067,18 @@ DictProxy = MakeProxyType('DictProxy', ( ArrayProxy = MakeProxyType('ArrayProxy', ( - '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + '__len__', '__getitem__', '__setitem__' + )) PoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'terminate' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate' )) PoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', 'map_async': 'AsyncResult', + 'starmap_async': 'AsyncResult', 'imap': 'Iterator', 'imap_unordered': 'Iterator' } @@ -1103,6 +1107,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Barrier', threading.Barrier, BarrierProxy) SyncManager.register('Pool', Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 7502ff89f4..7f73b441c2 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -4,32 +4,7 @@ # multiprocessing/pool.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Pool'] @@ -64,6 +39,9 @@ job_counter = itertools.count() def mapstar(args): return list(map(*args)) +def starmapstar(args): + return list(itertools.starmap(args[0], args[1])) + # # Code run by worker processes # @@ -247,14 +225,30 @@ class Pool(object): Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' - assert self._state == RUN - return self.map_async(func, iterable, chunksize).get() + return self._map_async(func, iterable, mapstar, chunksize).get() + + def starmap(self, func, iterable, chunksize=None): + ''' + Like `map()` method but the elements of the `iterable` are expected to + be iterables as well and will be unpacked as arguments. Hence + `func` and (a, b) becomes func(a, b). + ''' + return self._map_async(func, iterable, starmapstar, chunksize).get() + + def starmap_async(self, func, iterable, chunksize=None, callback=None, + error_callback=None): + ''' + Asynchronous version of `starmap()` method. + ''' + return self._map_async(func, iterable, starmapstar, chunksize, + callback, error_callback) def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -272,7 +266,8 @@ class Pool(object): ''' Like `imap()` method but ordering of results is arbitrary. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -291,7 +286,8 @@ class Pool(object): ''' Asynchronous version of `apply()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -301,7 +297,16 @@ class Pool(object): ''' Asynchronous version of `map()` method. ''' - assert self._state == RUN + return self._map_async(func, iterable, mapstar, chunksize, callback, + error_callback) + + def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, + error_callback=None): + ''' + Helper function to implement map, starmap and their async counterparts. + ''' + if self._state != RUN: + raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -315,7 +320,7 @@ class Pool(object): task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + self._taskqueue.put((((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), None)) return result @@ -519,6 +524,12 @@ class Pool(object): debug('cleaning up worker %d' % p.pid) p.join() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + # # Class whose instances are returned by `Pool.apply_async()` # @@ -526,32 +537,26 @@ class Pool(object): class ApplyResult(object): def __init__(self, cache, callback, error_callback): - self._cond = threading.Condition(threading.Lock()) + self._event = threading.Event() self._job = next(job_counter) self._cache = cache - self._ready = False self._callback = callback self._error_callback = error_callback cache[self._job] = self def ready(self): - return self._ready + return self._event.is_set() def successful(self): - assert self._ready + assert self.ready() return self._success def wait(self, timeout=None): - self._cond.acquire() - try: - if not self._ready: - self._cond.wait(timeout) - finally: - self._cond.release() + self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) - if not self._ready: + if not self.ready(): raise TimeoutError if self._success: return self._value @@ -564,12 +569,7 @@ class ApplyResult(object): self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() del self._cache[self._job] # @@ -586,7 +586,7 @@ class MapResult(ApplyResult): self._chunksize = chunksize if chunksize <= 0: self._number_left = 0 - self._ready = True + self._event.set() del cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) @@ -600,24 +600,14 @@ class MapResult(ApplyResult): if self._callback: self._callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() else: self._success = False self._value = result if self._error_callback: self._error_callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() # # Class whose instances are returned by `Pool.imap()` diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 3262b50f50..893507bcd5 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -4,32 +4,7 @@ # multiprocessing/process.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Process', 'current_process', 'active_children'] @@ -92,12 +67,16 @@ class Process(object): ''' _Popen = None - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey - self._daemonic = _current_process._daemonic + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None @@ -130,6 +109,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -216,6 +196,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 51d991245c..37271fb4eb 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -4,32 +4,7 @@ # multiprocessing/queues.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] @@ -39,12 +14,12 @@ import os import threading import collections import time -import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -67,6 +42,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -75,11 +52,11 @@ class Queue(object): def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, + return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() @@ -182,7 +159,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -233,7 +210,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -275,6 +252,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has @@ -356,6 +335,7 @@ class SimpleQueue(object): def __init__(self): self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() + self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: @@ -363,7 +343,7 @@ class SimpleQueue(object): self._make_methods() def empty(self): - return not self._reader.poll() + return not self._poll() def __getstate__(self): assert_spawning(self) diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc9de..656fa8ff6b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -5,53 +5,29 @@ # multiprocessing/reduction.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # -__all__ = [] +__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys import socket import threading +import struct +import signal -import _multiprocessing from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.util import is_exiting, sub_warning # # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -59,157 +35,246 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): # if sys.platform == 'win32': - import _subprocess - from _multiprocessing import win32 - - def send_handle(conn, handle, destination_pid): - process_handle = win32.OpenProcess( - win32.PROCESS_ALL_ACCESS, False, destination_pid - ) - try: - new_handle = duplicate(handle, process_handle) - conn.send(new_handle) - finally: - close(process_handle) - - def recv_handle(conn): - return conn.recv() + # Windows + __all__ += ['reduce_pipe_connection'] + import _winapi -else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) + conn.send(dh) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) - -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): - global _lock, _listener, _cache - for h in _cache: - close(h) - _cache.clear() - _lock = threading.Lock() - _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): - global _listener - - if _listener is None: - _lock.acquire() - try: - if _listener is None: - debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().authkey) - t = threading.Thread(target=_serve) - t.daemon = True - t.start() - finally: - _lock.release() - - return _listener - -def _serve(): - from .util import is_exiting, sub_warning - - while 1: - try: - conn = _listener.accept() - handle_wanted, destination_pid = conn.recv() - _cache.remove(handle_wanted) - send_handle(conn, handle_wanted, destination_pid) - close(handle_wanted) - conn.close() - except: - if not is_exiting(): - import traceback - sub_warning( - 'thread for sharing handles raised exception :\n' + - '-'*79 + '\n' + traceback.format_exc() + '-'*79 - ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): - if Popen.thread_is_spawning(): - return (None, Popen.duplicate_for_child(handle), True) - dup_handle = duplicate(handle) - _cache.add(dup_handle) - sub_debug('reducing handle %d', handle) - return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): - address, handle, inherited = pickled_data - if inherited: - return handle - sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().authkey) - conn.send((handle, os.getpid())) - new_handle = recv_handle(conn) - conn.close() - return new_handle + return conn.recv().detach() + + class DupHandle(object): + def __init__(self, handle, access, pid=None): + # duplicate handle for process with given pid + if pid is None: + pid = os.getpid() + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) + try: + self._handle = _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), + handle, proc, access, False, 0) + finally: + _winapi.CloseHandle(proc) + self._access = access + self._pid = pid + + def detach(self): + # retrieve handle from process which currently owns it + if self._pid == os.getpid(): + return self._handle + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, + self._pid) + try: + return _winapi.DuplicateHandle( + proc, self._handle, _winapi.GetCurrentProcess(), + self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) + finally: + _winapi.CloseHandle(proc) + + class DupSocket(object): + def __init__(self, sock): + new_sock = sock.dup() + def send(conn, pid): + share = new_sock.share(pid) + conn.send_bytes(share) + self._id = resource_sharer.register(send, new_sock.close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + share = conn.recv_bytes() + return socket.fromshare(share) + finally: + conn.close() + + def reduce_socket(s): + return rebuild_socket, (DupSocket(s),) + + def rebuild_socket(ds): + return ds.detach() + + def reduce_connection(conn): + handle = conn.fileno() + with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: + ds = DupSocket(s) + return rebuild_connection, (ds, conn.readable, conn.writable) + + def rebuild_connection(ds, readable, writable): + from .connection import Connection + sock = ds.detach() + return Connection(sock.detach(), readable, writable) -# -# Register `_multiprocessing.Connection` with `ForkingPickler` -# - -def reduce_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( - handle, readable=readable, writable=writable - ) - -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) + def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) -# -# Register `socket.socket` with `ForkingPickler` -# + def rebuild_pipe_connection(dh, readable, writable): + from .connection import PipeConnection + handle = dh.detach() + return PipeConnection(handle, readable, writable) -def fromfd(fd, family, type_, proto=0): - s = socket.fromfd(fd, family, type_, proto) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s +else: + # Unix -def reduce_socket(s): - reduced_handle = reduce_handle(s.fileno()) - return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) + # On MacOSX we should acknowledge receipt of fds -- see Issue14669 + ACKNOWLEDGE = sys.platform == 'darwin' -def rebuild_socket(reduced_handle, family, type_, proto): - fd = rebuild_handle(reduced_handle) - _sock = fromfd(fd, family, type_, proto) - close(fd) - return _sock + def send_handle(conn, handle, destination_pid): + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) + if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': + raise RuntimeError('did not receive acknowledgement of fd') -ForkingPickler.register(socket.socket, reduce_socket) + def recv_handle(conn): + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + if ACKNOWLEDGE: + conn.send_bytes(b'ACK') + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + + class DupFd(object): + def __init__(self, fd): + new_fd = os.dup(fd) + def send(conn, pid): + send_handle(conn, new_fd, pid) + def close(): + os.close(new_fd) + self._id = resource_sharer.register(send, close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + return recv_handle(conn) + finally: + conn.close() + + def reduce_socket(s): + df = DupFd(s.fileno()) + return rebuild_socket, (df, s.family, s.type, s.proto) + + def rebuild_socket(df, family, type, proto): + fd = df.detach() + s = socket.fromfd(fd, family, type, proto) + os.close(fd) + return s + + def reduce_connection(conn): + df = DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) + + def rebuild_connection(df, readable, writable): + from .connection import Connection + fd = df.detach() + return Connection(fd, readable, writable) # -# Register `_multiprocessing.PipeConnection` with `ForkingPickler` +# Server which shares registered resources with clients # -if sys.platform == 'win32': - - def reduce_pipe_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - - def rebuild_pipe_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( - handle, readable=readable, writable=writable - ) - - ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) +class ResourceSharer(object): + def __init__(self): + self._key = 0 + self._cache = {} + self._old_locks = [] + self._lock = threading.Lock() + self._listener = None + self._address = None + self._thread = None + register_after_fork(self, ResourceSharer._afterfork) + + def register(self, send, close): + with self._lock: + if self._address is None: + self._start() + self._key += 1 + self._cache[self._key] = (send, close) + return (self._address, self._key) + + @staticmethod + def get_connection(ident): + from .connection import Client + address, key = ident + c = Client(address, authkey=current_process().authkey) + c.send((key, os.getpid())) + return c + + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + + def _afterfork(self): + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + # If self._lock was locked at the time of the fork, it may be broken + # -- see issue 6721. Replace it without letting it be gc'ed. + self._old_locks.append(self._lock) + self._lock = threading.Lock() + if self._listener is not None: + self._listener.close() + self._listener = None + self._address = None + self._thread = None + + def _start(self): + from .connection import Listener + assert self._listener is None + debug('starting listener and thread for sending handles') + self._listener = Listener(authkey=current_process().authkey) + self._address = self._listener.address + t = threading.Thread(target=self._serve) + t.daemon = True + t.start() + self._thread = t + + def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) + while 1: + try: + conn = self._listener.accept() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg + send, close = self._cache.pop(key) + send(conn, destination_pid) + close() + conn.close() + except: + if not is_exiting(): + import traceback + sub_warning( + 'thread for sharing handles raised exception :\n' + + '-'*79 + '\n' + traceback.format_exc() + '-'*79 + ) + +resource_sharer = ResourceSharer() diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 1e694da49d..a358ed4f12 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -4,35 +4,9 @@ # multiprocessing/sharedctypes.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # -import sys import ctypes import weakref @@ -89,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer): result.__init__(*size_or_initializer) return result -def Value(typecode_or_type, *args, lock=None): +def Value(typecode_or_type, *args, lock=True): ''' Return a synchronization wrapper for a Value ''' @@ -102,13 +76,10 @@ def Value(typecode_or_type, *args, lock=None): raise AttributeError("'%r' has no method 'acquire'" % lock) return synchronized(obj, lock) -def Array(typecode_or_type, size_or_initializer, **kwds): +def Array(typecode_or_type, size_or_initializer, *, lock=True): ''' Return a synchronization wrapper for a RawArray ''' - lock = kwds.pop('lock', None) - if kwds: - raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys())) obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj @@ -158,7 +129,8 @@ def rebuild_ctype(type_, wrapper, length): if length is not None: type_ = type_ * length ForkingPickler.register(type_, reduce_ctype) - obj = type_.from_address(wrapper.get_address()) + buf = wrapper.create_memoryview() + obj = type_.from_buffer(buf) obj._wrapper = wrapper return obj diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 70ae82569c..22eabe55b8 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -4,32 +4,7 @@ # multiprocessing/synchronize.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ @@ -37,15 +12,13 @@ __all__ = [ ] import threading -import os import sys -from time import time as _time, sleep as _sleep - import _multiprocessing from multiprocessing.process import current_process -from multiprocessing.util import Finalize, register_after_fork, debug +from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from time import time as _time # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. @@ -243,7 +216,7 @@ class Condition(object): try: # wait for notification or timeout - ret = self._wait_semaphore.acquire(True, timeout) + return self._wait_semaphore.acquire(True, timeout) finally: # indicate that this thread has woken self._woken_count.release() @@ -251,7 +224,6 @@ class Condition(object): # reacquire lock for i in range(count): self._lock.acquire() - return ret def notify(self): assert self._lock._semlock._is_mine(), 'lock is not owned' @@ -293,6 +265,24 @@ class Condition(object): while self._wait_semaphore.acquire(False): pass + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + # # Event # @@ -343,3 +333,43 @@ class Event(object): return False finally: self._cond.release() + +# +# Barrier +# + +class Barrier(threading.Barrier): + + def __init__(self, parties, action=None, timeout=None): + import struct + from multiprocessing.heap import BufferWrapper + wrapper = BufferWrapper(struct.calcsize('i') * 2) + cond = Condition() + self.__setstate__((parties, action, timeout, cond, wrapper)) + self._state = 0 + self._count = 0 + + def __setstate__(self, state): + (self._parties, self._action, self._timeout, + self._cond, self._wrapper) = state + self._array = self._wrapper.create_memoryview().cast('i') + + def __getstate__(self): + return (self._parties, self._action, self._timeout, + self._cond, self._wrapper) + + @property + def _state(self): + return self._array[0] + + @_state.setter + def _state(self, value): + self._array[0] = value + + @property + def _count(self): + return self._array[1] + + @_count.setter + def _count(self, value): + self._array[1] = value diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 61b05335ac..72385a8fa3 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -4,39 +4,18 @@ # multiprocessing/util.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # +import sys +import functools +import os import itertools import weakref import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does +from subprocess import _args_from_interpreter_flags from multiprocessing.process import current_process, active_children @@ -84,7 +63,7 @@ def get_logger(): Returns logger used by multiprocessing ''' global _logger - import logging, atexit + import logging logging._acquireLock() try: @@ -183,10 +162,15 @@ class Finalize(object): self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, next(_finalizer_counter)) + self._pid = os.getpid() _finalizer_registry[self._key] = self - def __call__(self, wr=None): + def __call__(self, wr=None, + # Need to bind these locally because the globals can have + # been cleared at shutdown + _finalizer_registry=_finalizer_registry, + sub_debug=sub_debug, getpid=os.getpid): ''' Run the callback unless it has already been called or cancelled ''' @@ -195,9 +179,13 @@ class Finalize(object): except KeyError: sub_debug('finalizer no longer registered') else: - sub_debug('finalizer calling %s with args %s and kwargs %s', - self._callback, self._args, self._kwargs) - res = self._callback(*self._args, **self._kwargs) + if self._pid != getpid(): + sub_debug('finalizer ignored because different process') + res = None + else: + sub_debug('finalizer calling %s with args %s and kwargs %s', + self._callback, self._args, self._kwargs) + res = self._callback(*self._args, **self._kwargs) self._weakref = self._callback = self._args = \ self._kwargs = self._key = None return res @@ -299,16 +287,21 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) + if current_process() is not None: # We check if the current process is None here because if - # it's None, any call to ``active_children()`` will raise an - # AttributeError (active_children winds up trying to get - # attributes from util._current_process). This happens in a - # variety of shutdown circumstances that are not well-understood - # because module-scope variables are not apparently supposed to - # be destroyed until after this function is called. However, - # they are indeed destroyed before this function is called. See - # issues #9775 and #15881. Also related: #4106, #9205, and #9207. + # it's None, any call to ``active_children()`` will raise + # an AttributeError (active_children winds up trying to + # get attributes from util._current_process). One + # situation where this can happen is if someone has + # manipulated sys.modules, causing this module to be + # garbage collected. The destructor for the module type + # then replaces all values in the module dict with None. + # For instance, after setuptools runs a test it replaces + # sys.modules with a copy created earlier. See issues + # #9775 and #15881. Also related: #4106, #9205, and + # #9207. + for p in active_children(): if p._daemonic: info('calling terminate() for daemon %s', p.name) |