summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/connection.py8
-rw-r--r--Lib/multiprocessing/context.py13
-rw-r--r--Lib/multiprocessing/forkserver.py2
-rw-r--r--Lib/multiprocessing/heap.py5
-rw-r--r--Lib/multiprocessing/managers.py100
-rw-r--r--Lib/multiprocessing/pool.py20
-rw-r--r--Lib/multiprocessing/popen_forkserver.py7
-rw-r--r--Lib/multiprocessing/popen_spawn_posix.py7
-rw-r--r--Lib/multiprocessing/popen_spawn_win32.py9
-rw-r--r--Lib/multiprocessing/queues.py10
-rw-r--r--Lib/multiprocessing/reduction.py34
-rw-r--r--Lib/multiprocessing/resource_sharer.py2
-rw-r--r--Lib/multiprocessing/sharedctypes.py6
-rw-r--r--Lib/multiprocessing/spawn.py9
14 files changed, 159 insertions, 73 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index d0a1b86b13..d49e8f0d32 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -20,11 +20,11 @@ import itertools
import _multiprocessing
-from . import reduction
from . import util
from . import AuthenticationError, BufferTooShort
-from .reduction import ForkingPickler
+from .context import reduction
+_ForkingPickler = reduction.ForkingPickler
try:
import _winapi
@@ -203,7 +203,7 @@ class _ConnectionBase:
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
- self._send_bytes(ForkingPickler.dumps(obj))
+ self._send_bytes(_ForkingPickler.dumps(obj))
def recv_bytes(self, maxlength=None):
"""
@@ -248,7 +248,7 @@ class _ConnectionBase:
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
- return ForkingPickler.loads(buf.getbuffer())
+ return _ForkingPickler.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py
index 115d4bf654..623f6fb733 100644
--- a/Lib/multiprocessing/context.py
+++ b/Lib/multiprocessing/context.py
@@ -3,6 +3,7 @@ import sys
import threading
from . import process
+from . import reduction
__all__ = [] # things are copied from here to __init__.py
@@ -198,6 +199,16 @@ class BaseContext(object):
def set_start_method(self, method, force=False):
raise ValueError('cannot set start method of concrete context')
+ @property
+ def reducer(self):
+ '''Controls how objects will be reduced to a form that can be
+ shared with other processes.'''
+ return globals().get('reduction')
+
+ @reducer.setter
+ def reducer(self, reduction):
+ globals()['reduction'] = reduction
+
def _check_available(self):
pass
@@ -245,7 +256,6 @@ class DefaultContext(BaseContext):
if sys.platform == 'win32':
return ['spawn']
else:
- from . import reduction
if reduction.HAVE_SEND_HANDLE:
return ['fork', 'spawn', 'forkserver']
else:
@@ -292,7 +302,6 @@ if sys.platform != 'win32':
_name = 'forkserver'
Process = ForkServerProcess
def _check_available(self):
- from . import reduction
if not reduction.HAVE_SEND_HANDLE:
raise ValueError('forkserver start method not available')
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index ad01ede0e0..f2c179e4e0 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -9,7 +9,7 @@ import threading
from . import connection
from . import process
-from . import reduction
+from .context import reduction
from . import semaphore_tracker
from . import spawn
from . import util
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 44d9638ff6..443321535e 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -14,8 +14,7 @@ import sys
import tempfile
import threading
-from . import context
-from . import reduction
+from .context import reduction, assert_spawning
from . import util
__all__ = ['BufferWrapper']
@@ -48,7 +47,7 @@ if sys.platform == 'win32':
self._state = (self.size, self.name)
def __getstate__(self):
- context.assert_spawning(self)
+ assert_spawning(self)
return self._state
def __setstate__(self, state):
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index b175470db9..b9ce84b2d8 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -23,10 +23,9 @@ from time import time as _time
from traceback import format_exc
from . import connection
-from . import context
+from .context import reduction, get_spawning_popen
from . import pool
from . import process
-from . import reduction
from . import util
from . import get_context
@@ -142,7 +141,8 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
- self.mutex = threading.RLock()
+ self.id_to_local_proxy_obj = {}
+ self.mutex = threading.Lock()
def serve_forever(self):
'''
@@ -227,7 +227,14 @@ class Server(object):
methodname = obj = None
request = recv()
ident, methodname, args, kwds = request
- obj, exposed, gettypeid = id_to_obj[ident]
+ try:
+ obj, exposed, gettypeid = id_to_obj[ident]
+ except KeyError as ke:
+ try:
+ obj, exposed, gettypeid = \
+ self.id_to_local_proxy_obj[ident]
+ except KeyError as second_ke:
+ raise ke
if methodname not in exposed:
raise AttributeError(
@@ -308,7 +315,7 @@ class Server(object):
'''
with self.mutex:
result = []
- keys = list(self.id_to_obj.keys())
+ keys = list(self.id_to_refcount.keys())
keys.sort()
for ident in keys:
if ident != '0':
@@ -321,7 +328,8 @@ class Server(object):
'''
Number of shared objects
'''
- return len(self.id_to_obj) - 1 # don't count ident='0'
+ # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
+ return len(self.id_to_refcount)
def shutdown(self, c):
'''
@@ -363,13 +371,9 @@ class Server(object):
self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
if ident not in self.id_to_refcount:
self.id_to_refcount[ident] = 0
- # increment the reference count immediately, to avoid
- # this object being garbage collected before a Proxy
- # object for it can be created. The caller of create()
- # is responsible for doing a decref once the Proxy object
- # has been created.
- self.incref(c, ident)
- return ident, tuple(exposed)
+
+ self.incref(c, ident)
+ return ident, tuple(exposed)
def get_methods(self, c, token):
'''
@@ -387,15 +391,45 @@ class Server(object):
def incref(self, c, ident):
with self.mutex:
- self.id_to_refcount[ident] += 1
+ try:
+ self.id_to_refcount[ident] += 1
+ except KeyError as ke:
+ # If no external references exist but an internal (to the
+ # manager) still does and a new external reference is created
+ # from it, restore the manager's tracking of it from the
+ # previously stashed internal ref.
+ if ident in self.id_to_local_proxy_obj:
+ self.id_to_refcount[ident] = 1
+ self.id_to_obj[ident] = \
+ self.id_to_local_proxy_obj[ident]
+ obj, exposed, gettypeid = self.id_to_obj[ident]
+ util.debug('Server re-enabled tracking & INCREF %r', ident)
+ else:
+ raise ke
def decref(self, c, ident):
+ if ident not in self.id_to_refcount and \
+ ident in self.id_to_local_proxy_obj:
+ util.debug('Server DECREF skipping %r', ident)
+ return
+
with self.mutex:
assert self.id_to_refcount[ident] >= 1
self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0:
- del self.id_to_obj[ident], self.id_to_refcount[ident]
- util.debug('disposing of obj with id %r', ident)
+ del self.id_to_refcount[ident]
+
+ if ident not in self.id_to_refcount:
+ # Two-step process in case the object turns out to contain other
+ # proxy objects (e.g. a managed list of managed lists).
+ # Otherwise, deleting self.id_to_obj[ident] would trigger the
+ # deleting of the stored value (another managed object) which would
+ # in turn attempt to acquire the mutex that is already held here.
+ self.id_to_obj[ident] = (None, (), None) # thread-safe
+ util.debug('disposing of obj with id %r', ident)
+ with self.mutex:
+ del self.id_to_obj[ident]
+
#
# Class to represent state of a manager
@@ -658,7 +692,7 @@ class BaseProxy(object):
_mutex = util.ForkAwareThreadLock()
def __init__(self, token, serializer, manager=None,
- authkey=None, exposed=None, incref=True):
+ authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
@@ -680,6 +714,12 @@ class BaseProxy(object):
self._serializer = serializer
self._Client = listener_client[serializer][1]
+ # Should be set to True only when a proxy object is being created
+ # on the manager server; primary use case: nested proxy objects.
+ # RebuildProxy detects when a proxy is being created on the manager
+ # and sets this value appropriately.
+ self._owned_by_manager = manager_owned
+
if authkey is not None:
self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None:
@@ -738,6 +778,10 @@ class BaseProxy(object):
return self._callmethod('#GETVALUE')
def _incref(self):
+ if self._owned_by_manager:
+ util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
+ return
+
conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)
@@ -788,7 +832,7 @@ class BaseProxy(object):
def __reduce__(self):
kwds = {}
- if context.get_spawning_popen() is not None:
+ if get_spawning_popen() is not None:
kwds['authkey'] = self._authkey
if getattr(self, '_isauto', False):
@@ -822,19 +866,19 @@ class BaseProxy(object):
def RebuildProxy(func, token, serializer, kwds):
'''
Function used for unpickling proxy objects.
-
- If possible the shared object is returned, or otherwise a proxy for it.
'''
server = getattr(process.current_process(), '_manager_server', None)
-
if server and server.address == token.address:
- return server.id_to_obj[token.id][0]
- else:
- incref = (
- kwds.pop('incref', True) and
- not getattr(process.current_process(), '_inheriting', False)
- )
- return func(token, serializer, incref=incref, **kwds)
+ util.debug('Rebuild a proxy owned by manager, token=%r', token)
+ kwds['manager_owned'] = True
+ if token.id not in server.id_to_local_proxy_obj:
+ server.id_to_local_proxy_obj[token.id] = \
+ server.id_to_obj[token.id]
+ incref = (
+ kwds.pop('incref', True) and
+ not getattr(process.current_process(), '_inheriting', False)
+ )
+ return func(token, serializer, incref=incref, **kwds)
#
# Functions to create proxies and proxy types
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 6d25469e16..ffdf42614d 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -638,22 +638,26 @@ class MapResult(ApplyResult):
self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
+ self._number_left -= 1
success, result = success_result
- if success:
+ if success and self._success:
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
- self._number_left -= 1
if self._number_left == 0:
if self._callback:
self._callback(self._value)
del self._cache[self._job]
self._event.set()
else:
- self._success = False
- self._value = result
- if self._error_callback:
- self._error_callback(self._value)
- del self._cache[self._job]
- self._event.set()
+ if not success and self._success:
+ # only store first exception
+ self._success = False
+ self._value = result
+ if self._number_left == 0:
+ # only consider the result ready once all jobs are done
+ if self._error_callback:
+ self._error_callback(self._value)
+ del self._cache[self._job]
+ self._event.set()
#
# Class whose instances are returned by `Pool.imap()`
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
index e792194f44..222db2d90a 100644
--- a/Lib/multiprocessing/popen_forkserver.py
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -1,10 +1,9 @@
import io
import os
-from . import reduction
+from .context import reduction, set_spawning_popen
if not reduction.HAVE_SEND_HANDLE:
raise ImportError('No support for sending fds between processes')
-from . import context
from . import forkserver
from . import popen_fork
from . import spawn
@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
def _launch(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
buf = io.BytesIO()
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
try:
reduction.dump(prep_data, buf)
reduction.dump(process_obj, buf)
finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
util.Finalize(self, os.close, (self.sentinel,))
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
index 6b0a8d635f..98f8f0ab33 100644
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -1,9 +1,8 @@
import io
import os
-from . import context
+from .context import reduction, set_spawning_popen
from . import popen_fork
-from . import reduction
from . import spawn
from . import util
@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
try:
reduction.dump(prep_data, fp)
reduction.dump(process_obj, fp)
finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
parent_r = child_w = child_r = parent_w = None
try:
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
index 3b53068be4..6fd588f542 100644
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -4,9 +4,8 @@ import signal
import sys
import _winapi
-from . import context
+from .context import reduction, get_spawning_popen, set_spawning_popen
from . import spawn
-from . import reduction
from . import util
__all__ = ['Popen']
@@ -60,15 +59,15 @@ class Popen(object):
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
# send information to child
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
try:
reduction.dump(prep_data, to_child)
reduction.dump(process_obj, to_child)
finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
def duplicate_for_child(self, handle):
- assert self is context.get_spawning_popen()
+ assert self is get_spawning_popen()
return reduction.duplicate(handle, self.sentinel)
def wait(self, timeout=None):
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 786a303b33..dda03ddf54 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -23,9 +23,9 @@ import _multiprocessing
from . import connection
from . import context
+_ForkingPickler = context.reduction.ForkingPickler
from .util import debug, info, Finalize, register_after_fork, is_exiting
-from .reduction import ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -110,7 +110,7 @@ class Queue(object):
finally:
self._rlock.release()
# unserialize the data after having released the lock
- return ForkingPickler.loads(res)
+ return _ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -238,7 +238,7 @@ class Queue(object):
return
# serialize the data before acquiring the lock
- obj = ForkingPickler.dumps(obj)
+ obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
@@ -342,11 +342,11 @@ class SimpleQueue(object):
with self._rlock:
res = self._reader.recv_bytes()
# unserialize the data after having released the lock
- return ForkingPickler.loads(res)
+ return _ForkingPickler.loads(res)
def put(self, obj):
# serialize the data before acquiring the lock
- obj = ForkingPickler.dumps(obj)
+ obj = _ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self._writer.send_bytes(obj)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 8f209b47da..c043c9a0dc 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -7,6 +7,7 @@
# Licensed to PSF under a Contributor Agreement.
#
+from abc import ABCMeta, abstractmethod
import copyreg
import functools
import io
@@ -238,3 +239,36 @@ else:
fd = df.detach()
return socket.socket(family, type, proto, fileno=fd)
register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+ '''Abstract base class for use in implementing a Reduction class
+ suitable for use in replacing the standard reduction mechanism
+ used in multiprocessing.'''
+ ForkingPickler = ForkingPickler
+ register = register
+ dump = dump
+ send_handle = send_handle
+ recv_handle = recv_handle
+
+ if sys.platform == 'win32':
+ steal_handle = steal_handle
+ duplicate = duplicate
+ DupHandle = DupHandle
+ else:
+ sendfds = sendfds
+ recvfds = recvfds
+ DupFd = DupFd
+
+ _reduce_method = _reduce_method
+ _reduce_method_descriptor = _reduce_method_descriptor
+ _rebuild_partial = _rebuild_partial
+ _reduce_socket = _reduce_socket
+ _rebuild_socket = _rebuild_socket
+
+ def __init__(self, *args):
+ register(type(_C().f), _reduce_method)
+ register(type(list.append), _reduce_method_descriptor)
+ register(type(int.__add__), _reduce_method_descriptor)
+ register(functools.partial, _reduce_partial)
+ register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
index 5e46fc65b4..e44a728fa9 100644
--- a/Lib/multiprocessing/resource_sharer.py
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -15,7 +15,7 @@ import sys
import threading
from . import process
-from . import reduction
+from .context import reduction
from . import util
__all__ = ['stop']
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 4258f591c4..25cbcf2ae4 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -13,8 +13,8 @@ import weakref
from . import heap
from . import get_context
-from .context import assert_spawning
-from .reduction import ForkingPickler
+from .context import reduction, assert_spawning
+_ForkingPickler = reduction.ForkingPickler
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
@@ -134,7 +134,7 @@ def reduce_ctype(obj):
def rebuild_ctype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
- ForkingPickler.register(type_, reduce_ctype)
+ _ForkingPickler.register(type_, reduce_ctype)
buf = wrapper.create_memoryview()
obj = type_.from_buffer(buf)
obj._wrapper = wrapper
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
index 392c1599cc..4aba372e48 100644
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -9,13 +9,13 @@
#
import os
-import pickle
import sys
import runpy
import types
from . import get_start_method, set_start_method
from . import process
+from .context import reduction
from . import util
__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
@@ -96,8 +96,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
assert is_forking(sys.argv)
if sys.platform == 'win32':
import msvcrt
- from .reduction import steal_handle
- new_handle = steal_handle(parent_pid, pipe_handle)
+ new_handle = reduction.steal_handle(parent_pid, pipe_handle)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
from . import semaphore_tracker
@@ -111,9 +110,9 @@ def _main(fd):
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
- preparation_data = pickle.load(from_parent)
+ preparation_data = reduction.pickle.load(from_parent)
prepare(preparation_data)
- self = pickle.load(from_parent)
+ self = reduction.pickle.load(from_parent)
finally:
del process.current_process()._inheriting
return self._bootstrap()