summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2012-12-18 21:26:36 +0200
committerAndrew Svetlov <andrew.svetlov@gmail.com>2012-12-18 21:26:36 +0200
commit40929f315c0bb0f7feb4a5204bafa9f6dd7160f7 (patch)
tree4d149e48aa45aebc236f60d7580bcae869237e58 /Lib/multiprocessing
parent7338e256fa0a303ebdfe7fe0ef65570636483918 (diff)
parent6608ad78ccc19c44905a3fecd14508c5c6340741 (diff)
downloadcpython-40929f315c0bb0f7feb4a5204bafa9f6dd7160f7.tar.gz
Issue #16714: use 'raise' exceptions, don't 'throw'.
Patch by Serhiy Storchaka.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py57
-rw-r--r--Lib/multiprocessing/connection.py659
-rw-r--r--Lib/multiprocessing/dummy/__init__.py17
-rw-r--r--Lib/multiprocessing/dummy/connection.py12
-rw-r--r--Lib/multiprocessing/forking.py190
-rw-r--r--Lib/multiprocessing/heap.py45
-rw-r--r--Lib/multiprocessing/managers.py171
-rw-r--r--Lib/multiprocessing/pool.py116
-rw-r--r--Lib/multiprocessing/process.py47
-rw-r--r--Lib/multiprocessing/queues.py46
-rw-r--r--Lib/multiprocessing/reduction.py407
-rw-r--r--Lib/multiprocessing/sharedctypes.py38
-rw-r--r--Lib/multiprocessing/synchronize.py94
-rw-r--r--Lib/multiprocessing/util.py71
14 files changed, 1169 insertions, 801 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index e6e16c8322..1f3e67c9b8 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -13,32 +13,7 @@
#
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__version__ = '0.70a1'
@@ -48,8 +23,8 @@ __all__ = [
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
- 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+ 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
+ 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]
__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
@@ -161,7 +136,9 @@ def allow_connection_pickling():
'''
Install support for sending connections and sockets between processes
'''
- from multiprocessing import reduction
+ # This is undocumented. In previous versions of multiprocessing
+ # its only effect was to make socket objects inheritable on Windows.
+ import multiprocessing.connection
#
# Definitions depending on native semaphores
@@ -209,6 +186,13 @@ def Event():
from multiprocessing.synchronize import Event
return Event()
+def Barrier(parties, action=None, timeout=None):
+ '''
+ Returns a barrier object
+ '''
+ from multiprocessing.synchronize import Barrier
+ return Barrier(parties, action, timeout)
+
def Queue(maxsize=0):
'''
Returns a queue object
@@ -223,6 +207,13 @@ def JoinableQueue(maxsize=0):
from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize)
+def SimpleQueue():
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import SimpleQueue
+ return SimpleQueue()
+
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
@@ -244,19 +235,19 @@ def RawArray(typecode_or_type, size_or_initializer):
from multiprocessing.sharedctypes import RawArray
return RawArray(typecode_or_type, size_or_initializer)
-def Value(typecode_or_type, *args, **kwds):
+def Value(typecode_or_type, *args, lock=True):
'''
Returns a synchronized shared object
'''
from multiprocessing.sharedctypes import Value
- return Value(typecode_or_type, *args, **kwds)
+ return Value(typecode_or_type, *args, lock=lock)
-def Array(typecode_or_type, size_or_initializer, **kwds):
+def Array(typecode_or_type, size_or_initializer, *, lock=True):
'''
Returns a synchronized shared array
'''
from multiprocessing.sharedctypes import Array
- return Array(typecode_or_type, size_or_initializer, **kwds)
+ return Array(typecode_or_type, size_or_initializer, lock=lock)
#
#
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 4fa6f70bf3..fbbd5d91d3 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -4,49 +4,34 @@
# multiprocessing/connection.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
-__all__ = [ 'Client', 'Listener', 'Pipe' ]
+__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+from multiprocessing.forking import ForkingPickler
+try:
+ import _winapi
+ from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ _winapi = None
#
#
@@ -122,6 +107,309 @@ def address_type(address):
raise ValueError('address type of %r unrecognized' % address)
#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = io.BytesIO()
+ ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+ self._send_bytes(buf.getbuffer())
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
+
+if _winapi:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _got_empty_message = False
+
+ def _close(self, _CloseHandle=_winapi.CloseHandle):
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nwritten, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ if self._got_empty_message:
+ self._got_empty_message = False
+ return io.BytesIO()
+ else:
+ bsize = 128 if maxsize is None else min(maxsize, 128)
+ try:
+ ov, err = _winapi.ReadFile(self._handle, bsize,
+ overlapped=True)
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nread, err = ov.GetOverlappedResult(True)
+ if err == 0:
+ f = io.BytesIO()
+ f.write(ov.getbuffer())
+ return f
+ elif err == _winapi.ERROR_MORE_DATA:
+ return self._get_more_data(ov, maxsize)
+ except IOError as e:
+ if e.winerror == _winapi.ERROR_BROKEN_PIPE:
+ raise EOFError
+ else:
+ raise
+ raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
+
+ def _poll(self, timeout):
+ if (self._got_empty_message or
+ _winapi.PeekNamedPipe(self._handle)[0] != 0):
+ return True
+ return bool(wait([self], timeout))
+
+ def _get_more_data(self, ov, maxsize):
+ buf = ov.getbuffer()
+ f = io.BytesIO()
+ f.write(buf)
+ left = _winapi.PeekNamedPipe(self._handle)[1]
+ assert left > 0
+ if maxsize is not None and len(buf) + left > maxsize:
+ self._bad_message_length()
+ ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
+ rbytes, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert rbytes == left
+ f.write(ov.getbuffer())
+ return f
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if _winapi:
+ def _close(self, _close=_multiprocessing.closesocket):
+ _close(self._handle)
+ _write = _multiprocessing.send
+ _read = _multiprocessing.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ remaining = size
+ while remaining > 0:
+ chunk = read(handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ # For wire compatibility with 3.2 and lower
+ n = len(buf)
+ self._send(struct.pack("!i", n))
+ # The condition is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ if n > 0:
+ self._send(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("!i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ r = wait([self._handle], timeout)
+ return bool(r)
+
+
+#
# Public functions
#
@@ -154,6 +442,8 @@ class Listener(object):
Returns a `Connection` object.
'''
+ if self._listener is None:
+ raise IOError('listener is closed')
c = self._listener.accept()
if self._authkey:
deliver_challenge(c, self._authkey)
@@ -164,11 +454,19 @@ class Listener(object):
'''
Close the bound socket or named pipe of `self`.
'''
- return self._listener.close()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
address = property(lambda self: self._listener._address)
last_accepted = property(lambda self: self._listener._last_accepted)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
def Client(address, family=None, authkey=None):
'''
@@ -201,56 +499,52 @@ if sys.platform != 'win32':
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
-
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
address = arbitrary_address('AF_PIPE')
if duplex:
- openmode = win32.PIPE_ACCESS_DUPLEX
- access = win32.GENERIC_READ | win32.GENERIC_WRITE
+ openmode = _winapi.PIPE_ACCESS_DUPLEX
+ access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
- openmode = win32.PIPE_ACCESS_INBOUND
- access = win32.GENERIC_WRITE
+ openmode = _winapi.PIPE_ACCESS_INBOUND
+ access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
- h1 = win32.CreateNamedPipe(
- address, openmode,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ h1 = _winapi.CreateNamedPipe(
+ address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
+ _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
- h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ h2 = _winapi.CreateFile(
+ address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
- win32.SetNamedPipeHandleState(
- h2, win32.PIPE_READMODE_MESSAGE, None, None
+ _winapi.SetNamedPipeHandleState(
+ h2, _winapi.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
+ _, err = overlapped.GetOverlappedResult(True)
+ assert err == 0
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -265,12 +559,15 @@ class SocketListener(object):
def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family))
try:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # SO_REUSEADDR has different semantics on Windows (issue #2550).
+ if os.name == 'posix':
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
self._socket.setblocking(True)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
- except socket.error:
+ except OSError:
self._socket.close()
raise
self._family = family
@@ -286,10 +583,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
s.setblocking(True)
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- s.close()
- return conn
+ return Connection(s.detach())
def close(self):
self._socket.close()
@@ -304,24 +598,8 @@ def SocketClient(address):
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
s.setblocking(True)
- t = _init_timeout()
-
- while 1:
- try:
- s.connect(address)
- except socket.error as e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
- else:
- break
- else:
- raise
-
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- return conn
+ s.connect(address)
+ return Connection(s.detach())
#
# Definitions for connections based on named pipes
@@ -335,48 +613,55 @@ if sys.platform == 'win32':
'''
def __init__(self, address, backlog=None):
self._address = address
- handle = win32.CreateNamedPipe(
- address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue = [handle]
- self._last_accepted = None
+ self._handle_queue = [self._new_handle(first=True)]
+ self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
-
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
- def accept(self):
- newhandle = win32.CreateNamedPipe(
- self._address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ def _new_handle(self, first=False):
+ flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+ return _winapi.CreateNamedPipe(
+ self._address, flags,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
+ _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
- self._handle_queue.append(newhandle)
+
+ def accept(self):
+ self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
try:
- win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError as e:
+ ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
+ except OSError as e:
+ if e.winerror != _winapi.ERROR_NO_DATA:
+ raise
# ERROR_NO_DATA can occur if a client has already connected,
# written data and then disconnected -- see Issue 14725.
- if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
- win32.ERROR_NO_DATA):
+ else:
+ try:
+ res = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ except:
+ ov.cancel()
+ _winapi.CloseHandle(handle)
raise
- return _multiprocessing.PipeConnection(handle)
+ finally:
+ _, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
sub_debug('closing listener with address=%r', address)
for handle in queue:
- close(handle)
+ _winapi.CloseHandle(handle)
def PipeClient(address):
'''
@@ -385,24 +670,25 @@ if sys.platform == 'win32':
t = _init_timeout()
while 1:
try:
- win32.WaitNamedPipe(address, 1000)
- h = win32.CreateFile(
- address, win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ _winapi.WaitNamedPipe(address, 1000)
+ h = _winapi.CreateFile(
+ address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
+ 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
except WindowsError as e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY) or _check_timeout(t):
+ if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
+ _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
else:
break
else:
raise
- win32.SetNamedPipeHandleState(
- h, win32.PIPE_READMODE_MESSAGE, None, None
+ _winapi.SetNamedPipeHandleState(
+ h, _winapi.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -459,10 +745,10 @@ class ConnectionWrapper(object):
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
return obj
class XmlListener(Listener):
@@ -476,3 +762,140 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+#
+# Wait
+#
+
+if sys.platform == 'win32':
+
+ def _exhaustive_wait(handles, timeout):
+ # Return ALL handles which are currently signalled. (Only
+ # returning the first signalled might create starvation issues.)
+ L = list(handles)
+ ready = []
+ while L:
+ res = _winapi.WaitForMultipleObjects(L, False, timeout)
+ if res == WAIT_TIMEOUT:
+ break
+ elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
+ res -= WAIT_OBJECT_0
+ elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
+ res -= WAIT_ABANDONED_0
+ else:
+ raise RuntimeError('Should not get here')
+ ready.append(L[res])
+ L = L[res+1:]
+ timeout = 0
+ return ready
+
+ _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is None:
+ timeout = INFINITE
+ elif timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+
+ object_list = list(object_list)
+ waithandle_to_obj = {}
+ ov_list = []
+ ready_objects = set()
+ ready_handles = set()
+
+ try:
+ for o in object_list:
+ try:
+ fileno = getattr(o, 'fileno')
+ except AttributeError:
+ waithandle_to_obj[o.__index__()] = o
+ else:
+ # start an overlapped read of length zero
+ try:
+ ov, err = _winapi.ReadFile(fileno(), 0, True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err == _winapi.ERROR_IO_PENDING:
+ ov_list.append(ov)
+ waithandle_to_obj[ov.event] = o
+ else:
+ # If o.fileno() is an overlapped pipe handle and
+ # err == 0 then there is a zero length message
+ # in the pipe, but it HAS NOT been consumed.
+ ready_objects.add(o)
+ timeout = 0
+
+ ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
+ finally:
+ # request that overlapped reads stop
+ for ov in ov_list:
+ ov.cancel()
+
+ # wait for all overlapped reads to stop
+ for ov in ov_list:
+ try:
+ _, err = ov.GetOverlappedResult(True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err != _winapi.ERROR_OPERATION_ABORTED:
+ o = waithandle_to_obj[ov.event]
+ ready_objects.add(o)
+ if err == 0:
+ # If o.fileno() is an overlapped pipe handle then
+ # a zero length message HAS been consumed.
+ if hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+
+ ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
+ return [o for o in object_list if o in ready_objects]
+
+else:
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is not None:
+ if timeout <= 0:
+ return select.select(object_list, [], [], 0)[0]
+ else:
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return select.select(object_list, [], [], timeout)[0]
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
+ if timeout is not None:
+ timeout = deadline - time.time()
+
+#
+# Make connection and socket objects sharable if possible
+#
+
+if sys.platform == 'win32':
+ from . import reduction
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)
+ ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+else:
+ try:
+ from . import reduction
+ except ImportError:
+ pass
+ else:
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 101c3cba4d..e31fc61572 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -35,7 +35,7 @@
__all__ = [
'Process', 'current_process', 'active_children', 'freeze_support',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
+ 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
]
#
@@ -46,12 +46,10 @@ import threading
import sys
import weakref
import array
-import itertools
-from multiprocessing import TimeoutError, cpu_count
from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
-from threading import Event
+from threading import Event, Condition, Barrier
from queue import Queue
#
@@ -85,17 +83,6 @@ class DummyProcess(threading.Thread):
#
#
-class Condition(threading._Condition):
- # XXX
- if sys.version_info < (3, 0):
- notify_all = threading._Condition.notify_all.__func__
- else:
- notify_all = threading._Condition.notify_all
-
-#
-#
-#
-
Process = DummyProcess
current_process = threading.current_thread
current_process()._children = weakref.WeakKeyDictionary()
diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py
index af105794f1..874ec8e432 100644
--- a/Lib/multiprocessing/dummy/connection.py
+++ b/Lib/multiprocessing/dummy/connection.py
@@ -53,6 +53,12 @@ class Listener(object):
address = property(lambda self: self._backlog_queue)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
def Client(address):
_in, _out = Queue(), Queue()
@@ -85,3 +91,9 @@ class Connection(object):
def close(self):
pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index bc8ac44c22..c5501a2f75 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -4,32 +4,7 @@
# multiprocessing/forking.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
import os
@@ -38,7 +13,7 @@ import signal
from multiprocessing import util, process
-__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
+__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
#
# Check that the current thread is spawning a child process
@@ -55,18 +30,18 @@ def assert_spawning(self):
# Try making some callable types picklable
#
-from pickle import _Pickler as Pickler
+from pickle import Pickler
+from copyreg import dispatch_table
+
class ForkingPickler(Pickler):
- dispatch = Pickler.dispatch.copy()
+ _extra_reducers = {}
+ def __init__(self, *args):
+ Pickler.__init__(self, *args)
+ self.dispatch_table = dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
@classmethod
def register(cls, type, reduce):
- def dispatcher(self, obj):
- rv = reduce(obj)
- if isinstance(rv, str):
- self.save_global(obj, rv)
- else:
- self.save_reduce(obj=obj, *rv)
- cls.dispatch[type] = dispatcher
+ cls._extra_reducers[type] = reduce
def _reduce_method(m):
if m.__self__ is None:
@@ -100,9 +75,6 @@ else:
#
if sys.platform != 'win32':
- import time
-
- exit = os._exit
duplicate = os.dup
close = os.close
@@ -118,14 +90,23 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
+ r, w = os.pipe()
+ self.sentinel = r
+
self.pid = os.fork()
if self.pid == 0:
+ os.close(r)
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
os._exit(code)
+ # `w` will be closed when the child exits, at which point `r`
+ # will become ready for reading (using e.g. select()).
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
@@ -143,26 +124,20 @@ if sys.platform != 'win32':
return self.returncode
def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
+ if self.returncode is None:
+ if timeout is not None:
+ from .connection import wait
+ if not wait([self.sentinel], timeout):
+ return None
+ # This shouldn't block if wait() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
def terminate(self):
if self.returncode is None:
try:
os.kill(self.pid, signal.SIGTERM)
- except OSError as e:
+ except OSError:
if self.wait(timeout=0.1) is None:
raise
@@ -177,12 +152,9 @@ if sys.platform != 'win32':
else:
import _thread
import msvcrt
- import _subprocess
- import time
+ import _winapi
- from pickle import dump, load, HIGHEST_PROTOCOL
- from _multiprocessing import win32, Connection, PipeConnection
- from .util import Finalize
+ from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
@@ -195,8 +167,7 @@ else:
WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
- exit = win32.ExitProcess
- close = win32.CloseHandle
+ close = _winapi.CloseHandle
#
# _python_exe is the assumed path to the python executable.
@@ -218,11 +189,11 @@ else:
def duplicate(handle, target_process=None, inheritable=False):
if target_process is None:
- target_process = _subprocess.GetCurrentProcess()
- return _subprocess.DuplicateHandle(
- _subprocess.GetCurrentProcess(), handle, target_process,
- 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
- ).Detach()
+ target_process = _winapi.GetCurrentProcess()
+ return _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(), handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
+ )
#
# We define a Popen class similar to the one from subprocess, but
@@ -236,6 +207,9 @@ else:
_tls = _thread._local()
def __init__(self, process_obj):
+ cmd = ' '.join('"%s"' % x for x in get_command_line())
+ prep_data = get_preparation_data(process_obj._name)
+
# create pipe for communication with child
rfd, wfd = os.pipe()
@@ -243,30 +217,31 @@ else:
rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
os.close(rfd)
- # start process
- cmd = get_command_line() + [rhandle]
- cmd = ' '.join('"%s"' % x for x in cmd)
- hp, ht, pid, tid = _subprocess.CreateProcess(
- _python_exe, cmd, None, None, 1, 0, None, None, None
- )
- ht.Close()
- close(rhandle)
-
- # set attributes of self
- self.pid = pid
- self.returncode = None
- self._handle = hp
-
- # send information to child
- prep_data = get_preparation_data(process_obj._name)
- to_child = os.fdopen(wfd, 'wb')
- Popen._tls.process_handle = int(hp)
- try:
- dump(prep_data, to_child, HIGHEST_PROTOCOL)
- dump(process_obj, to_child, HIGHEST_PROTOCOL)
- finally:
- del Popen._tls.process_handle
- to_child.close()
+ with open(wfd, 'wb', closefd=True) as to_child:
+ # start process
+ try:
+ hp, ht, pid, tid = _winapi.CreateProcess(
+ _python_exe, cmd + (' %s' % rhandle),
+ None, None, 1, 0, None, None, None
+ )
+ _winapi.CloseHandle(ht)
+ finally:
+ close(rhandle)
+
+ # set attributes of self
+ self.pid = pid
+ self.returncode = None
+ self._handle = hp
+ self.sentinel = int(hp)
+ util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+
+ # send information to child
+ Popen._tls.process_handle = int(hp)
+ try:
+ dump(prep_data, to_child, HIGHEST_PROTOCOL)
+ dump(process_obj, to_child, HIGHEST_PROTOCOL)
+ finally:
+ del Popen._tls.process_handle
@staticmethod
def thread_is_spawning():
@@ -279,13 +254,13 @@ else:
def wait(self, timeout=None):
if self.returncode is None:
if timeout is None:
- msecs = _subprocess.INFINITE
+ msecs = _winapi.INFINITE
else:
msecs = max(0, int(timeout * 1000 + 0.5))
- res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
- if res == _subprocess.WAIT_OBJECT_0:
- code = _subprocess.GetExitCodeProcess(self._handle)
+ res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+ if res == _winapi.WAIT_OBJECT_0:
+ code = _winapi.GetExitCodeProcess(self._handle)
if code == TERMINATE:
code = -signal.SIGTERM
self.returncode = code
@@ -298,9 +273,9 @@ else:
def terminate(self):
if self.returncode is None:
try:
- _subprocess.TerminateProcess(int(self._handle), TERMINATE)
- except WindowsError:
- if self.wait(timeout=0.1) is None:
+ _winapi.TerminateProcess(int(self._handle), TERMINATE)
+ except OSError:
+ if self.wait(timeout=1.0) is None:
raise
#
@@ -350,7 +325,8 @@ else:
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
- return [_python_exe, '-c', prog, '--multiprocessing-fork']
+ opts = util._args_from_interpreter_flags()
+ return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
def main():
@@ -372,7 +348,7 @@ else:
from_parent.close()
exitcode = self._bootstrap()
- exit(exitcode)
+ sys.exit(exitcode)
def get_preparation_data(name):
@@ -405,22 +381,6 @@ else:
return d
- #
- # Make (Pipe)Connection picklable
- #
-
- def reduce_connection(conn):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- 'By default %s objects can only be shared between processes\n'
- 'using inheritance' % type(conn).__name__
- )
- return type(conn), (Popen.duplicate_for_child(conn.fileno()),
- conn.readable, conn.writable)
-
- ForkingPickler.register(Connection, reduce_connection)
- ForkingPickler.register(PipeConnection, reduce_connection)
-
#
# Prepare current process
#
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 0a25ef05c7..4e93c12f81 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -4,37 +4,11 @@
# multiprocessing/heap.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
import bisect
import mmap
-import tempfile
import os
import sys
import threading
@@ -52,7 +26,7 @@ __all__ = ['BufferWrapper']
if sys.platform == 'win32':
- from _multiprocessing import win32
+ import _winapi
class Arena(object):
@@ -62,7 +36,7 @@ if sys.platform == 'win32':
self.size = size
self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert win32.GetLastError() == 0, 'tagname already in use'
+ assert _winapi.GetLastError() == 0, 'tagname already in use'
self._state = (self.size, self.name)
def __getstate__(self):
@@ -72,7 +46,7 @@ if sys.platform == 'win32':
def __setstate__(self, state):
self.size, self.name = self._state = state
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
+ assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
else:
@@ -231,7 +205,7 @@ class Heap(object):
self._lock.release()
#
-# Class representing a chunk of an mmap -- can be inherited
+# Class representing a chunk of an mmap -- can be inherited by child process
#
class BufferWrapper(object):
@@ -244,11 +218,6 @@ class BufferWrapper(object):
self._state = (block, size)
Finalize(self, BufferWrapper._heap.free, args=(block,))
- def get_address(self):
+ def create_memoryview(self):
(arena, start, stop), size = self._state
- address, length = _multiprocessing.address_of_buffer(arena.buffer)
- assert size <= length
- return address + start
-
- def get_size(self):
- return self._state[1]
+ return memoryview(arena.buffer)[start:start+size]
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 5588ead116..1ab147e29e 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -5,32 +5,7 @@
# multiprocessing/managers.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
@@ -39,19 +14,16 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
# Imports
#
-import os
import sys
-import weakref
import threading
import array
import queue
from traceback import format_exc
-from pickle import PicklingError
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
-from multiprocessing.util import Finalize, info
+from multiprocessing.forking import Popen, ForkingPickler
+from time import time as _time
#
# Register some things for pickling
@@ -168,28 +140,38 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
self.mutex = threading.RLock()
- self.stop = 0
def serve_forever(self):
'''
Run the server forever
'''
+ self.stop_event = threading.Event()
current_process()._manager_server = self
try:
+ accepter = threading.Thread(target=self.accepter)
+ accepter.daemon = True
+ accepter.start()
try:
- while 1:
- try:
- c = self.listener.accept()
- except (OSError, IOError):
- continue
- t = threading.Thread(target=self.handle_request, args=(c,))
- t.daemon = True
- t.start()
+ while not self.stop_event.is_set():
+ self.stop_event.wait(1)
except (KeyboardInterrupt, SystemExit):
pass
finally:
- self.stop = 999
- self.listener.close()
+ if sys.stdout != sys.__stdout__:
+ util.debug('resetting stdout, stderr')
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+ sys.exit(0)
+
+ def accepter(self):
+ while True:
+ try:
+ c = self.listener.accept()
+ except (OSError, IOError):
+ continue
+ t = threading.Thread(target=self.handle_request, args=(c,))
+ t.daemon = True
+ t.start()
def handle_request(self, c):
'''
@@ -236,7 +218,7 @@ class Server(object):
send = conn.send
id_to_obj = self.id_to_obj
- while not self.stop:
+ while not self.stop_event.is_set():
try:
methodname = obj = None
@@ -346,32 +328,13 @@ class Server(object):
Shutdown this process
'''
try:
- try:
- util.debug('manager received shutdown message')
- c.send(('#RETURN', None))
-
- if sys.stdout != sys.__stdout__:
- util.debug('resetting stdout, stderr')
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
-
- util._run_finalizers(0)
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.terminate()
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.join()
-
- util._run_finalizers()
- util.info('manager exiting with exitcode 0')
- except:
- import traceback
- traceback.print_exc()
+ util.debug('manager received shutdown message')
+ c.send(('#RETURN', None))
+ except:
+ import traceback
+ traceback.print_exc()
finally:
- exit(0)
+ self.stop_event.set()
def create(self, c, typeid, *args, **kwds):
'''
@@ -483,10 +446,6 @@ class BaseManager(object):
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
- def __reduce__(self):
- return type(self).from_address, \
- (self._address, self._authkey, self._serializer)
-
def get_server(self):
'''
Return server object with serve_forever() method and address attribute
@@ -576,7 +535,10 @@ class BaseManager(object):
'''
Join the manager process (if it has been spawned)
'''
- self._process.join(timeout)
+ if self._process is not None:
+ self._process.join(timeout)
+ if not self._process.is_alive():
+ self._process = None
def _debug_info(self):
'''
@@ -599,6 +561,9 @@ class BaseManager(object):
conn.close()
def __enter__(self):
+ if self._state.value == State.INITIAL:
+ self.start()
+ assert self._state.value == State.STARTED
return self
def __exit__(self, exc_type, exc_val, exc_tb):
@@ -620,7 +585,7 @@ class BaseManager(object):
except Exception:
pass
- process.join(timeout=0.2)
+ process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
@@ -982,8 +947,9 @@ class IteratorProxy(BaseProxy):
class AcquirerProxy(BaseProxy):
_exposed_ = ('acquire', 'release')
- def acquire(self, blocking=True):
- return self._callmethod('acquire', (blocking,))
+ def acquire(self, blocking=True, timeout=None):
+ args = (blocking,) if timeout is None else (blocking, timeout)
+ return self._callmethod('acquire', args)
def release(self):
return self._callmethod('release')
def __enter__(self):
@@ -1000,6 +966,24 @@ class ConditionProxy(AcquirerProxy):
return self._callmethod('notify')
def notify_all(self):
return self._callmethod('notify_all')
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = _time() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
class EventProxy(BaseProxy):
_exposed_ = ('is_set', 'set', 'clear', 'wait')
@@ -1012,6 +996,26 @@ class EventProxy(BaseProxy):
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
+
+class BarrierProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def abort(self):
+ return self._callmethod('abort')
+ def reset(self):
+ return self._callmethod('reset')
+ @property
+ def parties(self):
+ return self._callmethod('__getattribute__', ('parties',))
+ @property
+ def n_waiting(self):
+ return self._callmethod('__getattribute__', ('n_waiting',))
+ @property
+ def broken(self):
+ return self._callmethod('__getattribute__', ('broken',))
+
+
class NamespaceProxy(BaseProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
def __getattr__(self, key):
@@ -1041,12 +1045,11 @@ class ValueProxy(BaseProxy):
BaseListProxy = MakeProxyType('BaseListProxy', (
- '__add__', '__contains__', '__delitem__', '__delslice__',
- '__getitem__', '__getslice__', '__len__', '__mul__',
- '__reversed__', '__rmul__', '__setitem__', '__setslice__',
+ '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
+ '__mul__', '__reversed__', '__rmul__', '__setitem__',
'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
'reverse', 'sort', '__imul__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+ ))
class ListProxy(BaseListProxy):
def __iadd__(self, value):
self._callmethod('extend', (value,))
@@ -1064,17 +1067,18 @@ DictProxy = MakeProxyType('DictProxy', (
ArrayProxy = MakeProxyType('ArrayProxy', (
- '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+ '__len__', '__getitem__', '__setitem__'
+ ))
PoolProxy = MakeProxyType('PoolProxy', (
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
- 'map', 'map_async', 'terminate'
+ 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
))
PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
+ 'starmap_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}
@@ -1103,6 +1107,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 7502ff89f4..7f73b441c2 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -4,32 +4,7 @@
# multiprocessing/pool.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = ['Pool']
@@ -64,6 +39,9 @@ job_counter = itertools.count()
def mapstar(args):
return list(map(*args))
+def starmapstar(args):
+ return list(itertools.starmap(args[0], args[1]))
+
#
# Code run by worker processes
#
@@ -247,14 +225,30 @@ class Pool(object):
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
- assert self._state == RUN
- return self.map_async(func, iterable, chunksize).get()
+ return self._map_async(func, iterable, mapstar, chunksize).get()
+
+ def starmap(self, func, iterable, chunksize=None):
+ '''
+ Like `map()` method but the elements of the `iterable` are expected to
+ be iterables as well and will be unpacked as arguments. Hence
+ `func` and (a, b) becomes func(a, b).
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize).get()
+
+ def starmap_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `starmap()` method.
+ '''
+ return self._map_async(func, iterable, starmapstar, chunksize,
+ callback, error_callback)
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -272,7 +266,8 @@ class Pool(object):
'''
Like `imap()` method but ordering of results is arbitrary.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -291,7 +286,8 @@ class Pool(object):
'''
Asynchronous version of `apply()` method.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@@ -301,7 +297,16 @@ class Pool(object):
'''
Asynchronous version of `map()` method.
'''
- assert self._state == RUN
+ return self._map_async(func, iterable, mapstar, chunksize, callback,
+ error_callback)
+
+ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Helper function to implement map, starmap and their async counterparts.
+ '''
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
@@ -315,7 +320,7 @@ class Pool(object):
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
@@ -519,6 +524,12 @@ class Pool(object):
debug('cleaning up worker %d' % p.pid)
p.join()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.terminate()
+
#
# Class whose instances are returned by `Pool.apply_async()`
#
@@ -526,32 +537,26 @@ class Pool(object):
class ApplyResult(object):
def __init__(self, cache, callback, error_callback):
- self._cond = threading.Condition(threading.Lock())
+ self._event = threading.Event()
self._job = next(job_counter)
self._cache = cache
- self._ready = False
self._callback = callback
self._error_callback = error_callback
cache[self._job] = self
def ready(self):
- return self._ready
+ return self._event.is_set()
def successful(self):
- assert self._ready
+ assert self.ready()
return self._success
def wait(self, timeout=None):
- self._cond.acquire()
- try:
- if not self._ready:
- self._cond.wait(timeout)
- finally:
- self._cond.release()
+ self._event.wait(timeout)
def get(self, timeout=None):
self.wait(timeout)
- if not self._ready:
+ if not self.ready():
raise TimeoutError
if self._success:
return self._value
@@ -564,12 +569,7 @@ class ApplyResult(object):
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
del self._cache[self._job]
#
@@ -586,7 +586,7 @@ class MapResult(ApplyResult):
self._chunksize = chunksize
if chunksize <= 0:
self._number_left = 0
- self._ready = True
+ self._event.set()
del cache[self._job]
else:
self._number_left = length//chunksize + bool(length % chunksize)
@@ -600,24 +600,14 @@ class MapResult(ApplyResult):
if self._callback:
self._callback(self._value)
del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
else:
self._success = False
self._value = result
if self._error_callback:
self._error_callback(self._value)
del self._cache[self._job]
- self._cond.acquire()
- try:
- self._ready = True
- self._cond.notify()
- finally:
- self._cond.release()
+ self._event.set()
#
# Class whose instances are returned by `Pool.imap()`
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 3262b50f50..893507bcd5 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -4,32 +4,7 @@
# multiprocessing/process.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = ['Process', 'current_process', 'active_children']
@@ -92,12 +67,16 @@ class Process(object):
'''
_Popen = None
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
+ *, daemon=None):
assert group is None, 'group argument must be None for now'
count = next(_current_process._counter)
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
- self._daemonic = _current_process._daemonic
+ if daemon is not None:
+ self._daemonic = daemon
+ else:
+ self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid()
self._popen = None
@@ -130,6 +109,7 @@ class Process(object):
else:
from .forking import Popen
self._popen = Popen(self)
+ self._sentinel = self._popen.sentinel
_current_process._children.add(self)
def terminate(self):
@@ -216,6 +196,17 @@ class Process(object):
pid = ident
+ @property
+ def sentinel(self):
+ '''
+ Return a file descriptor (Unix) or handle (Windows) suitable for
+ waiting for process termination.
+ '''
+ try:
+ return self._sentinel
+ except AttributeError:
+ raise ValueError("process not started")
+
def __repr__(self):
if self is _current_process:
status = 'started'
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 51d991245c..37271fb4eb 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -4,32 +4,7 @@
# multiprocessing/queues.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
@@ -39,12 +14,12 @@ import os
import threading
import collections
import time
-import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -67,6 +42,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -75,11 +52,11 @@ class Queue(object):
def __getstate__(self):
assert_spawning(self)
- return (self._maxsize, self._reader, self._writer,
+ return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
- (self._maxsize, self._reader, self._writer,
+ (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
@@ -182,7 +159,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -233,7 +210,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -275,6 +252,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
@@ -356,6 +335,7 @@ class SimpleQueue(object):
def __init__(self):
self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock()
+ self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
@@ -363,7 +343,7 @@ class SimpleQueue(object):
self._make_methods()
def empty(self):
- return not self._reader.poll()
+ return not self._poll()
def __getstate__(self):
assert_spawning(self)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc9de..656fa8ff6b 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -5,53 +5,29 @@
# multiprocessing/reduction.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
-__all__ = []
+__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
import os
import sys
import socket
import threading
+import struct
+import signal
-import _multiprocessing
from multiprocessing import current_process
-from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.util import is_exiting, sub_warning
#
#
#
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported')
#
@@ -59,157 +35,246 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
#
if sys.platform == 'win32':
- import _subprocess
- from _multiprocessing import win32
-
- def send_handle(conn, handle, destination_pid):
- process_handle = win32.OpenProcess(
- win32.PROCESS_ALL_ACCESS, False, destination_pid
- )
- try:
- new_handle = duplicate(handle, process_handle)
- conn.send(new_handle)
- finally:
- close(process_handle)
-
- def recv_handle(conn):
- return conn.recv()
+ # Windows
+ __all__ += ['reduce_pipe_connection']
+ import _winapi
-else:
def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
+ dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
+ conn.send(dh)
def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
-
-#
-# Support for a per-process server thread which caches pickled handles
-#
-
-_cache = set()
-
-def _reset(obj):
- global _lock, _listener, _cache
- for h in _cache:
- close(h)
- _cache.clear()
- _lock = threading.Lock()
- _listener = None
-
-_reset(None)
-register_after_fork(_reset, _reset)
-
-def _get_listener():
- global _listener
-
- if _listener is None:
- _lock.acquire()
- try:
- if _listener is None:
- debug('starting listener and thread for sending handles')
- _listener = Listener(authkey=current_process().authkey)
- t = threading.Thread(target=_serve)
- t.daemon = True
- t.start()
- finally:
- _lock.release()
-
- return _listener
-
-def _serve():
- from .util import is_exiting, sub_warning
-
- while 1:
- try:
- conn = _listener.accept()
- handle_wanted, destination_pid = conn.recv()
- _cache.remove(handle_wanted)
- send_handle(conn, handle_wanted, destination_pid)
- close(handle_wanted)
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-#
-# Functions to be used for pickling/unpickling objects with handles
-#
-
-def reduce_handle(handle):
- if Popen.thread_is_spawning():
- return (None, Popen.duplicate_for_child(handle), True)
- dup_handle = duplicate(handle)
- _cache.add(dup_handle)
- sub_debug('reducing handle %d', handle)
- return (_get_listener().address, dup_handle, False)
-
-def rebuild_handle(pickled_data):
- address, handle, inherited = pickled_data
- if inherited:
- return handle
- sub_debug('rebuilding handle %d', handle)
- conn = Client(address, authkey=current_process().authkey)
- conn.send((handle, os.getpid()))
- new_handle = recv_handle(conn)
- conn.close()
- return new_handle
+ return conn.recv().detach()
+
+ class DupHandle(object):
+ def __init__(self, handle, access, pid=None):
+ # duplicate handle for process with given pid
+ if pid is None:
+ pid = os.getpid()
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
+ try:
+ self._handle = _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(),
+ handle, proc, access, False, 0)
+ finally:
+ _winapi.CloseHandle(proc)
+ self._access = access
+ self._pid = pid
+
+ def detach(self):
+ # retrieve handle from process which currently owns it
+ if self._pid == os.getpid():
+ return self._handle
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
+ self._pid)
+ try:
+ return _winapi.DuplicateHandle(
+ proc, self._handle, _winapi.GetCurrentProcess(),
+ self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(proc)
+
+ class DupSocket(object):
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+ finally:
+ conn.close()
+
+ def reduce_socket(s):
+ return rebuild_socket, (DupSocket(s),)
+
+ def rebuild_socket(ds):
+ return ds.detach()
+
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ ds = DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+
+ def rebuild_connection(ds, readable, writable):
+ from .connection import Connection
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
-#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
-#
-
-def reduce_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_connection, (rh, conn.readable, conn.writable)
-
-def rebuild_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
- handle, readable=readable, writable=writable
- )
-
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-#
-# Register `socket.socket` with `ForkingPickler`
-#
+ def rebuild_pipe_connection(dh, readable, writable):
+ from .connection import PipeConnection
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
-def fromfd(fd, family, type_, proto=0):
- s = socket.fromfd(fd, family, type_, proto)
- if s.__class__ is not socket.socket:
- s = socket.socket(_sock=s)
- return s
+else:
+ # Unix
-def reduce_socket(s):
- reduced_handle = reduce_handle(s.fileno())
- return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
+ # On MacOSX we should acknowledge receipt of fds -- see Issue14669
+ ACKNOWLEDGE = sys.platform == 'darwin'
-def rebuild_socket(reduced_handle, family, type_, proto):
- fd = rebuild_handle(reduced_handle)
- _sock = fromfd(fd, family, type_, proto)
- close(fd)
- return _sock
+ def send_handle(conn, handle, destination_pid):
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+ struct.pack("@i", handle))])
+ if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
+ raise RuntimeError('did not receive acknowledgement of fd')
-ForkingPickler.register(socket.socket, reduce_socket)
+ def recv_handle(conn):
+ size = struct.calcsize("@i")
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+ try:
+ if ACKNOWLEDGE:
+ conn.send_bytes(b'ACK')
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ return struct.unpack("@i", cmsg_data[:size])[0]
+ except (ValueError, IndexError, struct.error):
+ pass
+ raise RuntimeError('Invalid data received')
+
+ class DupFd(object):
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = resource_sharer.register(send, close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ return recv_handle(conn)
+ finally:
+ conn.close()
+
+ def reduce_socket(s):
+ df = DupFd(s.fileno())
+ return rebuild_socket, (df, s.family, s.type, s.proto)
+
+ def rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ s = socket.fromfd(fd, family, type, proto)
+ os.close(fd)
+ return s
+
+ def reduce_connection(conn):
+ df = DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
+
+ def rebuild_connection(df, readable, writable):
+ from .connection import Connection
+ fd = df.detach()
+ return Connection(fd, readable, writable)
#
-# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
+# Server which shares registered resources with clients
#
-if sys.platform == 'win32':
-
- def reduce_pipe_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
- handle, readable=readable, writable=writable
- )
-
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+class ResourceSharer(object):
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._old_locks = []
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ self._thread = None
+ register_after_fork(self, ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ # If self._lock was locked at the time of the fork, it may be broken
+ # -- see issue 6721. Replace it without letting it be gc'ed.
+ self._old_locks.append(self._lock)
+ self._lock = threading.Lock()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+ self._thread = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None
+ debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=current_process().authkey)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+ self._thread = t
+
+ def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+ while 1:
+ try:
+ conn = self._listener.accept()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
+ send, close = self._cache.pop(key)
+ send(conn, destination_pid)
+ close()
+ conn.close()
+ except:
+ if not is_exiting():
+ import traceback
+ sub_warning(
+ 'thread for sharing handles raised exception :\n' +
+ '-'*79 + '\n' + traceback.format_exc() + '-'*79
+ )
+
+resource_sharer = ResourceSharer()
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 1e694da49d..a358ed4f12 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -4,35 +4,9 @@
# multiprocessing/sharedctypes.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
-import sys
import ctypes
import weakref
@@ -89,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer):
result.__init__(*size_or_initializer)
return result
-def Value(typecode_or_type, *args, lock=None):
+def Value(typecode_or_type, *args, lock=True):
'''
Return a synchronization wrapper for a Value
'''
@@ -102,13 +76,10 @@ def Value(typecode_or_type, *args, lock=None):
raise AttributeError("'%r' has no method 'acquire'" % lock)
return synchronized(obj, lock)
-def Array(typecode_or_type, size_or_initializer, **kwds):
+def Array(typecode_or_type, size_or_initializer, *, lock=True):
'''
Return a synchronization wrapper for a RawArray
'''
- lock = kwds.pop('lock', None)
- if kwds:
- raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
@@ -158,7 +129,8 @@ def rebuild_ctype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
ForkingPickler.register(type_, reduce_ctype)
- obj = type_.from_address(wrapper.get_address())
+ buf = wrapper.create_memoryview()
+ obj = type_.from_buffer(buf)
obj._wrapper = wrapper
return obj
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 70ae82569c..22eabe55b8 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -4,32 +4,7 @@
# multiprocessing/synchronize.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = [
@@ -37,15 +12,13 @@ __all__ = [
]
import threading
-import os
import sys
-from time import time as _time, sleep as _sleep
-
import _multiprocessing
from multiprocessing.process import current_process
-from multiprocessing.util import Finalize, register_after_fork, debug
+from multiprocessing.util import register_after_fork, debug
from multiprocessing.forking import assert_spawning, Popen
+from time import time as _time
# Try to import the mp.synchronize module cleanly, if it fails
# raise ImportError for platforms lacking a working sem_open implementation.
@@ -243,7 +216,7 @@ class Condition(object):
try:
# wait for notification or timeout
- ret = self._wait_semaphore.acquire(True, timeout)
+ return self._wait_semaphore.acquire(True, timeout)
finally:
# indicate that this thread has woken
self._woken_count.release()
@@ -251,7 +224,6 @@ class Condition(object):
# reacquire lock
for i in range(count):
self._lock.acquire()
- return ret
def notify(self):
assert self._lock._semlock._is_mine(), 'lock is not owned'
@@ -293,6 +265,24 @@ class Condition(object):
while self._wait_semaphore.acquire(False):
pass
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = _time() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
#
# Event
#
@@ -343,3 +333,43 @@ class Event(object):
return False
finally:
self._cond.release()
+
+#
+# Barrier
+#
+
+class Barrier(threading.Barrier):
+
+ def __init__(self, parties, action=None, timeout=None):
+ import struct
+ from multiprocessing.heap import BufferWrapper
+ wrapper = BufferWrapper(struct.calcsize('i') * 2)
+ cond = Condition()
+ self.__setstate__((parties, action, timeout, cond, wrapper))
+ self._state = 0
+ self._count = 0
+
+ def __setstate__(self, state):
+ (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper) = state
+ self._array = self._wrapper.create_memoryview().cast('i')
+
+ def __getstate__(self):
+ return (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper)
+
+ @property
+ def _state(self):
+ return self._array[0]
+
+ @_state.setter
+ def _state(self, value):
+ self._array[0] = value
+
+ @property
+ def _count(self):
+ return self._array[1]
+
+ @_count.setter
+ def _count(self, value):
+ self._array[1] = value
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 61b05335ac..72385a8fa3 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -4,39 +4,18 @@
# multiprocessing/util.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
+import sys
+import functools
+import os
import itertools
import weakref
import atexit
import threading # we want threading to install it's
# cleanup function before multiprocessing does
+from subprocess import _args_from_interpreter_flags
from multiprocessing.process import current_process, active_children
@@ -84,7 +63,7 @@ def get_logger():
Returns logger used by multiprocessing
'''
global _logger
- import logging, atexit
+ import logging
logging._acquireLock()
try:
@@ -183,10 +162,15 @@ class Finalize(object):
self._args = args
self._kwargs = kwargs or {}
self._key = (exitpriority, next(_finalizer_counter))
+ self._pid = os.getpid()
_finalizer_registry[self._key] = self
- def __call__(self, wr=None):
+ def __call__(self, wr=None,
+ # Need to bind these locally because the globals can have
+ # been cleared at shutdown
+ _finalizer_registry=_finalizer_registry,
+ sub_debug=sub_debug, getpid=os.getpid):
'''
Run the callback unless it has already been called or cancelled
'''
@@ -195,9 +179,13 @@ class Finalize(object):
except KeyError:
sub_debug('finalizer no longer registered')
else:
- sub_debug('finalizer calling %s with args %s and kwargs %s',
- self._callback, self._args, self._kwargs)
- res = self._callback(*self._args, **self._kwargs)
+ if self._pid != getpid():
+ sub_debug('finalizer ignored because different process')
+ res = None
+ else:
+ sub_debug('finalizer calling %s with args %s and kwargs %s',
+ self._callback, self._args, self._kwargs)
+ res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
return res
@@ -299,16 +287,21 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
+
if current_process() is not None:
# We check if the current process is None here because if
- # it's None, any call to ``active_children()`` will raise an
- # AttributeError (active_children winds up trying to get
- # attributes from util._current_process). This happens in a
- # variety of shutdown circumstances that are not well-understood
- # because module-scope variables are not apparently supposed to
- # be destroyed until after this function is called. However,
- # they are indeed destroyed before this function is called. See
- # issues #9775 and #15881. Also related: #4106, #9205, and #9207.
+ # it's None, any call to ``active_children()`` will raise
+ # an AttributeError (active_children winds up trying to
+ # get attributes from util._current_process). One
+ # situation where this can happen is if someone has
+ # manipulated sys.modules, causing this module to be
+ # garbage collected. The destructor for the module type
+ # then replaces all values in the module dict with None.
+ # For instance, after setuptools runs a test it replaces
+ # sys.modules with a copy created earlier. See issues
+ # #9775 and #15881. Also related: #4106, #9205, and
+ # #9207.
+
for p in active_children():
if p._daemonic:
info('calling terminate() for daemon %s', p.name)