diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/connection.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/context.py | 13 | ||||
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 100 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 20 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_forkserver.py | 7 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 7 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_win32.py | 9 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 10 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 34 | ||||
-rw-r--r-- | Lib/multiprocessing/resource_sharer.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 9 |
14 files changed, 159 insertions, 73 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d0a1b86b13..d49e8f0d32 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -20,11 +20,11 @@ import itertools import _multiprocessing -from . import reduction from . import util from . import AuthenticationError, BufferTooShort -from .reduction import ForkingPickler +from .context import reduction +_ForkingPickler = reduction.ForkingPickler try: import _winapi @@ -203,7 +203,7 @@ class _ConnectionBase: """Send a (picklable) object""" self._check_closed() self._check_writable() - self._send_bytes(ForkingPickler.dumps(obj)) + self._send_bytes(_ForkingPickler.dumps(obj)) def recv_bytes(self, maxlength=None): """ @@ -248,7 +248,7 @@ class _ConnectionBase: self._check_closed() self._check_readable() buf = self._recv_bytes() - return ForkingPickler.loads(buf.getbuffer()) + return _ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py index 115d4bf654..623f6fb733 100644 --- a/Lib/multiprocessing/context.py +++ b/Lib/multiprocessing/context.py @@ -3,6 +3,7 @@ import sys import threading from . import process +from . import reduction __all__ = [] # things are copied from here to __init__.py @@ -198,6 +199,16 @@ class BaseContext(object): def set_start_method(self, method, force=False): raise ValueError('cannot set start method of concrete context') + @property + def reducer(self): + '''Controls how objects will be reduced to a form that can be + shared with other processes.''' + return globals().get('reduction') + + @reducer.setter + def reducer(self, reduction): + globals()['reduction'] = reduction + def _check_available(self): pass @@ -245,7 +256,6 @@ class DefaultContext(BaseContext): if sys.platform == 'win32': return ['spawn'] else: - from . import reduction if reduction.HAVE_SEND_HANDLE: return ['fork', 'spawn', 'forkserver'] else: @@ -292,7 +302,6 @@ if sys.platform != 'win32': _name = 'forkserver' Process = ForkServerProcess def _check_available(self): - from . import reduction if not reduction.HAVE_SEND_HANDLE: raise ValueError('forkserver start method not available') diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index ad01ede0e0..f2c179e4e0 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -9,7 +9,7 @@ import threading from . import connection from . import process -from . import reduction +from .context import reduction from . import semaphore_tracker from . import spawn from . import util diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 44d9638ff6..443321535e 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -14,8 +14,7 @@ import sys import tempfile import threading -from . import context -from . import reduction +from .context import reduction, assert_spawning from . import util __all__ = ['BufferWrapper'] @@ -48,7 +47,7 @@ if sys.platform == 'win32': self._state = (self.size, self.name) def __getstate__(self): - context.assert_spawning(self) + assert_spawning(self) return self._state def __setstate__(self, state): diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index b175470db9..b9ce84b2d8 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -23,10 +23,9 @@ from time import time as _time from traceback import format_exc from . import connection -from . import context +from .context import reduction, get_spawning_popen from . import pool from . import process -from . import reduction from . import util from . import get_context @@ -142,7 +141,8 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} - self.mutex = threading.RLock() + self.id_to_local_proxy_obj = {} + self.mutex = threading.Lock() def serve_forever(self): ''' @@ -227,7 +227,14 @@ class Server(object): methodname = obj = None request = recv() ident, methodname, args, kwds = request - obj, exposed, gettypeid = id_to_obj[ident] + try: + obj, exposed, gettypeid = id_to_obj[ident] + except KeyError as ke: + try: + obj, exposed, gettypeid = \ + self.id_to_local_proxy_obj[ident] + except KeyError as second_ke: + raise ke if methodname not in exposed: raise AttributeError( @@ -308,7 +315,7 @@ class Server(object): ''' with self.mutex: result = [] - keys = list(self.id_to_obj.keys()) + keys = list(self.id_to_refcount.keys()) keys.sort() for ident in keys: if ident != '0': @@ -321,7 +328,8 @@ class Server(object): ''' Number of shared objects ''' - return len(self.id_to_obj) - 1 # don't count ident='0' + # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' + return len(self.id_to_refcount) def shutdown(self, c): ''' @@ -363,13 +371,9 @@ class Server(object): self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) if ident not in self.id_to_refcount: self.id_to_refcount[ident] = 0 - # increment the reference count immediately, to avoid - # this object being garbage collected before a Proxy - # object for it can be created. The caller of create() - # is responsible for doing a decref once the Proxy object - # has been created. - self.incref(c, ident) - return ident, tuple(exposed) + + self.incref(c, ident) + return ident, tuple(exposed) def get_methods(self, c, token): ''' @@ -387,15 +391,45 @@ class Server(object): def incref(self, c, ident): with self.mutex: - self.id_to_refcount[ident] += 1 + try: + self.id_to_refcount[ident] += 1 + except KeyError as ke: + # If no external references exist but an internal (to the + # manager) still does and a new external reference is created + # from it, restore the manager's tracking of it from the + # previously stashed internal ref. + if ident in self.id_to_local_proxy_obj: + self.id_to_refcount[ident] = 1 + self.id_to_obj[ident] = \ + self.id_to_local_proxy_obj[ident] + obj, exposed, gettypeid = self.id_to_obj[ident] + util.debug('Server re-enabled tracking & INCREF %r', ident) + else: + raise ke def decref(self, c, ident): + if ident not in self.id_to_refcount and \ + ident in self.id_to_local_proxy_obj: + util.debug('Server DECREF skipping %r', ident) + return + with self.mutex: assert self.id_to_refcount[ident] >= 1 self.id_to_refcount[ident] -= 1 if self.id_to_refcount[ident] == 0: - del self.id_to_obj[ident], self.id_to_refcount[ident] - util.debug('disposing of obj with id %r', ident) + del self.id_to_refcount[ident] + + if ident not in self.id_to_refcount: + # Two-step process in case the object turns out to contain other + # proxy objects (e.g. a managed list of managed lists). + # Otherwise, deleting self.id_to_obj[ident] would trigger the + # deleting of the stored value (another managed object) which would + # in turn attempt to acquire the mutex that is already held here. + self.id_to_obj[ident] = (None, (), None) # thread-safe + util.debug('disposing of obj with id %r', ident) + with self.mutex: + del self.id_to_obj[ident] + # # Class to represent state of a manager @@ -658,7 +692,7 @@ class BaseProxy(object): _mutex = util.ForkAwareThreadLock() def __init__(self, token, serializer, manager=None, - authkey=None, exposed=None, incref=True): + authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: tls_idset = BaseProxy._address_to_local.get(token.address, None) if tls_idset is None: @@ -680,6 +714,12 @@ class BaseProxy(object): self._serializer = serializer self._Client = listener_client[serializer][1] + # Should be set to True only when a proxy object is being created + # on the manager server; primary use case: nested proxy objects. + # RebuildProxy detects when a proxy is being created on the manager + # and sets this value appropriately. + self._owned_by_manager = manager_owned + if authkey is not None: self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: @@ -738,6 +778,10 @@ class BaseProxy(object): return self._callmethod('#GETVALUE') def _incref(self): + if self._owned_by_manager: + util.debug('owned_by_manager skipped INCREF of %r', self._token.id) + return + conn = self._Client(self._token.address, authkey=self._authkey) dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) @@ -788,7 +832,7 @@ class BaseProxy(object): def __reduce__(self): kwds = {} - if context.get_spawning_popen() is not None: + if get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): @@ -822,19 +866,19 @@ class BaseProxy(object): def RebuildProxy(func, token, serializer, kwds): ''' Function used for unpickling proxy objects. - - If possible the shared object is returned, or otherwise a proxy for it. ''' server = getattr(process.current_process(), '_manager_server', None) - if server and server.address == token.address: - return server.id_to_obj[token.id][0] - else: - incref = ( - kwds.pop('incref', True) and - not getattr(process.current_process(), '_inheriting', False) - ) - return func(token, serializer, incref=incref, **kwds) + util.debug('Rebuild a proxy owned by manager, token=%r', token) + kwds['manager_owned'] = True + if token.id not in server.id_to_local_proxy_obj: + server.id_to_local_proxy_obj[token.id] = \ + server.id_to_obj[token.id] + incref = ( + kwds.pop('incref', True) and + not getattr(process.current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) # # Functions to create proxies and proxy types diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 6d25469e16..ffdf42614d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -638,22 +638,26 @@ class MapResult(ApplyResult): self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result): + self._number_left -= 1 success, result = success_result - if success: + if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result - self._number_left -= 1 if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._event.set() else: - self._success = False - self._value = result - if self._error_callback: - self._error_callback(self._value) - del self._cache[self._job] - self._event.set() + if not success and self._success: + # only store first exception + self._success = False + self._value = result + if self._number_left == 0: + # only consider the result ready once all jobs are done + if self._error_callback: + self._error_callback(self._value) + del self._cache[self._job] + self._event.set() # # Class whose instances are returned by `Pool.imap()` diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index e792194f44..222db2d90a 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -1,10 +1,9 @@ import io import os -from . import reduction +from .context import reduction, set_spawning_popen if not reduction.HAVE_SEND_HANDLE: raise ImportError('No support for sending fds between processes') -from . import context from . import forkserver from . import popen_fork from . import spawn @@ -42,12 +41,12 @@ class Popen(popen_fork.Popen): def _launch(self, process_obj): prep_data = spawn.get_preparation_data(process_obj._name) buf = io.BytesIO() - context.set_spawning_popen(self) + set_spawning_popen(self) try: reduction.dump(prep_data, buf) reduction.dump(process_obj, buf) finally: - context.set_spawning_popen(None) + set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) util.Finalize(self, os.close, (self.sentinel,)) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 6b0a8d635f..98f8f0ab33 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -1,9 +1,8 @@ import io import os -from . import context +from .context import reduction, set_spawning_popen from . import popen_fork -from . import reduction from . import spawn from . import util @@ -42,12 +41,12 @@ class Popen(popen_fork.Popen): self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() - context.set_spawning_popen(self) + set_spawning_popen(self) try: reduction.dump(prep_data, fp) reduction.dump(process_obj, fp) finally: - context.set_spawning_popen(None) + set_spawning_popen(None) parent_r = child_w = child_r = parent_w = None try: diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py index 3b53068be4..6fd588f542 100644 --- a/Lib/multiprocessing/popen_spawn_win32.py +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -4,9 +4,8 @@ import signal import sys import _winapi -from . import context +from .context import reduction, get_spawning_popen, set_spawning_popen from . import spawn -from . import reduction from . import util __all__ = ['Popen'] @@ -60,15 +59,15 @@ class Popen(object): util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) # send information to child - context.set_spawning_popen(self) + set_spawning_popen(self) try: reduction.dump(prep_data, to_child) reduction.dump(process_obj, to_child) finally: - context.set_spawning_popen(None) + set_spawning_popen(None) def duplicate_for_child(self, handle): - assert self is context.get_spawning_popen() + assert self is get_spawning_popen() return reduction.duplicate(handle, self.sentinel) def wait(self, timeout=None): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 786a303b33..dda03ddf54 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -23,9 +23,9 @@ import _multiprocessing from . import connection from . import context +_ForkingPickler = context.reduction.ForkingPickler from .util import debug, info, Finalize, register_after_fork, is_exiting -from .reduction import ForkingPickler # # Queue type using a pipe, buffer and thread @@ -110,7 +110,7 @@ class Queue(object): finally: self._rlock.release() # unserialize the data after having released the lock - return ForkingPickler.loads(res) + return _ForkingPickler.loads(res) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -238,7 +238,7 @@ class Queue(object): return # serialize the data before acquiring the lock - obj = ForkingPickler.dumps(obj) + obj = _ForkingPickler.dumps(obj) if wacquire is None: send_bytes(obj) else: @@ -342,11 +342,11 @@ class SimpleQueue(object): with self._rlock: res = self._reader.recv_bytes() # unserialize the data after having released the lock - return ForkingPickler.loads(res) + return _ForkingPickler.loads(res) def put(self, obj): # serialize the data before acquiring the lock - obj = ForkingPickler.dumps(obj) + obj = _ForkingPickler.dumps(obj) if self._wlock is None: # writes to a message oriented win32 pipe are atomic self._writer.send_bytes(obj) diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 8f209b47da..c043c9a0dc 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -7,6 +7,7 @@ # Licensed to PSF under a Contributor Agreement. # +from abc import ABCMeta, abstractmethod import copyreg import functools import io @@ -238,3 +239,36 @@ else: fd = df.detach() return socket.socket(family, type, proto, fileno=fd) register(socket.socket, _reduce_socket) + + +class AbstractReducer(metaclass=ABCMeta): + '''Abstract base class for use in implementing a Reduction class + suitable for use in replacing the standard reduction mechanism + used in multiprocessing.''' + ForkingPickler = ForkingPickler + register = register + dump = dump + send_handle = send_handle + recv_handle = recv_handle + + if sys.platform == 'win32': + steal_handle = steal_handle + duplicate = duplicate + DupHandle = DupHandle + else: + sendfds = sendfds + recvfds = recvfds + DupFd = DupFd + + _reduce_method = _reduce_method + _reduce_method_descriptor = _reduce_method_descriptor + _rebuild_partial = _rebuild_partial + _reduce_socket = _reduce_socket + _rebuild_socket = _rebuild_socket + + def __init__(self, *args): + register(type(_C().f), _reduce_method) + register(type(list.append), _reduce_method_descriptor) + register(type(int.__add__), _reduce_method_descriptor) + register(functools.partial, _reduce_partial) + register(socket.socket, _reduce_socket) diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py index 5e46fc65b4..e44a728fa9 100644 --- a/Lib/multiprocessing/resource_sharer.py +++ b/Lib/multiprocessing/resource_sharer.py @@ -15,7 +15,7 @@ import sys import threading from . import process -from . import reduction +from .context import reduction from . import util __all__ = ['stop'] diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 4258f591c4..25cbcf2ae4 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -13,8 +13,8 @@ import weakref from . import heap from . import get_context -from .context import assert_spawning -from .reduction import ForkingPickler +from .context import reduction, assert_spawning +_ForkingPickler = reduction.ForkingPickler __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] @@ -134,7 +134,7 @@ def reduce_ctype(obj): def rebuild_ctype(type_, wrapper, length): if length is not None: type_ = type_ * length - ForkingPickler.register(type_, reduce_ctype) + _ForkingPickler.register(type_, reduce_ctype) buf = wrapper.create_memoryview() obj = type_.from_buffer(buf) obj._wrapper = wrapper diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 392c1599cc..4aba372e48 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -9,13 +9,13 @@ # import os -import pickle import sys import runpy import types from . import get_start_method, set_start_method from . import process +from .context import reduction from . import util __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable', @@ -96,8 +96,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): assert is_forking(sys.argv) if sys.platform == 'win32': import msvcrt - from .reduction import steal_handle - new_handle = steal_handle(parent_pid, pipe_handle) + new_handle = reduction.steal_handle(parent_pid, pipe_handle) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: from . import semaphore_tracker @@ -111,9 +110,9 @@ def _main(fd): with os.fdopen(fd, 'rb', closefd=True) as from_parent: process.current_process()._inheriting = True try: - preparation_data = pickle.load(from_parent) + preparation_data = reduction.pickle.load(from_parent) prepare(preparation_data) - self = pickle.load(from_parent) + self = reduction.pickle.load(from_parent) finally: del process.current_process()._inheriting return self._bootstrap() |