summaryrefslogtreecommitdiff
path: root/Doc/library/multiprocessing.rst
diff options
context:
space:
mode:
Diffstat (limited to 'Doc/library/multiprocessing.rst')
-rw-r--r--Doc/library/multiprocessing.rst506
1 files changed, 361 insertions, 145 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 3e40faf79d..5f50cf1014 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -29,7 +29,7 @@ Windows.
Functionality within this package requires that the ``__main__`` module be
importable by the children. This is covered in :ref:`multiprocessing-programming`
however it is worth pointing out here. This means that some examples, such
- as the :class:`multiprocessing.Pool` examples will not work in the
+ as the :class:`multiprocessing.pool.Pool` examples will not work in the
interactive interpreter. For example::
>>> from multiprocessing import Pool
@@ -121,9 +121,7 @@ processes:
print(q.get()) # prints "[42, None, 'hello']"
p.join()
- Queues are thread and process safe, but note that they must never
- be instantiated as a side effect of importing a module: this can lead
- to a deadlock! (see :ref:`threaded-imports`)
+ Queues are thread and process safe.
**Pipes**
@@ -229,11 +227,11 @@ However, if you really do need to use some shared data then
holds Python objects and allows other processes to manipulate them using
proxies.
- A manager returned by :func:`Manager` will support types :class:`list`,
- :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
- :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
- :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For
- example, ::
+ A manager returned by :func:`Manager` will support types
+ :class:`list`, :class:`dict`, :class:`Namespace`, :class:`Lock`,
+ :class:`RLock`, :class:`Semaphore`, :class:`BoundedSemaphore`,
+ :class:`Condition`, :class:`Event`, :class:`Barrier`,
+ :class:`Queue`, :class:`Value` and :class:`Array`. For example, ::
from multiprocessing import Process, Manager
@@ -244,17 +242,16 @@ However, if you really do need to use some shared data then
l.reverse()
if __name__ == '__main__':
- manager = Manager()
+ with Manager() as manager:
+ d = manager.dict()
+ l = manager.list(range(10))
- d = manager.dict()
- l = manager.list(range(10))
+ p = Process(target=f, args=(d, l))
+ p.start()
+ p.join()
- p = Process(target=f, args=(d, l))
- p.start()
- p.join()
-
- print(d)
- print(l)
+ print(d)
+ print(l)
will print ::
@@ -282,10 +279,13 @@ For example::
return x*x
if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
- result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ with Pool(processes=4) as pool: # start 4 worker processes
+ result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
+ print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
+ print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+
+Note that the methods of a pool should only ever be used by the
+process which created it.
Reference
@@ -298,7 +298,8 @@ The :mod:`multiprocessing` package mostly replicates the API of the
:class:`Process` and exceptions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-.. class:: Process(group=None, target=None, name=None, args=(), kwargs={})
+.. class:: Process(group=None, target=None, name=None, args=(), kwargs={}, \
+ *, daemon=None)
Process objects represent activity that is run in a separate process. The
:class:`Process` class has equivalents of all the methods of
@@ -308,18 +309,22 @@ The :mod:`multiprocessing` package mostly replicates the API of the
should always be ``None``; it exists solely for compatibility with
:class:`threading.Thread`. *target* is the callable object to be invoked by
the :meth:`run()` method. It defaults to ``None``, meaning nothing is
- called. *name* is the process name. By default, a unique name is constructed
- of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
- :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
- is determined by the *generation* of the process. *args* is the argument
- tuple for the target invocation. *kwargs* is a dictionary of keyword
- arguments for the target invocation. By default, no arguments are passed to
- *target*.
+ called. *name* is the process name (see :attr:`name` for more details).
+ *args* is the argument tuple for the target invocation. *kwargs* is a
+ dictionary of keyword arguments for the target invocation. If provided,
+ the keyword-only *daemon* argument sets the process :attr:`daemon` flag
+ to ``True`` or ``False``. If ``None`` (the default), this flag will be
+ inherited from the creating process.
+
+ By default, no arguments are passed to *target*.
If a subclass overrides the constructor, it must make sure it invokes the
base class constructor (:meth:`Process.__init__`) before doing anything else
to the process.
+ .. versionchanged:: 3.3
+ Added the *daemon* argument.
+
.. method:: run()
Method representing the process's activity.
@@ -338,10 +343,9 @@ The :mod:`multiprocessing` package mostly replicates the API of the
.. method:: join([timeout])
- Block the calling thread until the process whose :meth:`join` method is
- called terminates or until the optional timeout occurs.
-
- If *timeout* is ``None`` then there is no timeout.
+ If the optional argument *timeout* is ``None`` (the default), the method
+ blocks until the process whose :meth:`join` method is called terminates.
+ If *timeout* is a positive number, it blocks at most *timeout* seconds.
A process can be joined many times.
@@ -350,11 +354,14 @@ The :mod:`multiprocessing` package mostly replicates the API of the
.. attribute:: name
- The process's name.
+ The process's name. The name is a string used for identification purposes
+ only. It has no semantics. Multiple processes may be given the same
+ name.
- The name is a string used for identification purposes only. It has no
- semantics. Multiple processes may be given the same name. The initial
- name is set by the constructor.
+ The initial name is set by the constructor. If no explicit name is
+ provided to the constructor, a name of the form
+ 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' is constructed, where
+ each N\ :sub:`k` is the N-th child of its parent.
.. method:: is_alive
@@ -379,7 +386,7 @@ The :mod:`multiprocessing` package mostly replicates the API of the
Unix daemons or services, they are normal processes that will be
terminated (and not joined) if non-daemonic processes have exited.
- In addition to the :class:`Threading.Thread` API, :class:`Process` objects
+ In addition to the :class:`threading.Thread` API, :class:`Process` objects
also support the following attributes and methods:
.. attribute:: pid
@@ -398,7 +405,7 @@ The :mod:`multiprocessing` package mostly replicates the API of the
The process's authentication key (a byte string).
When :mod:`multiprocessing` is initialized the main process is assigned a
- random string using :func:`os.random`.
+ random string using :func:`os.urandom`.
When a :class:`Process` object is created, it will inherit the
authentication key of its parent process, although this may be changed by
@@ -406,6 +413,21 @@ The :mod:`multiprocessing` package mostly replicates the API of the
See :ref:`multiprocessing-auth-keys`.
+ .. attribute:: sentinel
+
+ A numeric handle of a system object which will become "ready" when
+ the process ends.
+
+ You can use this value if you want to wait on several events at
+ once using :func:`multiprocessing.connection.wait`. Otherwise
+ calling :meth:`join()` is simpler.
+
+ On Windows, this is an OS handle usable with the ``WaitForSingleObject``
+ and ``WaitForMultipleObjects`` family of API calls. On Unix, this is
+ a file descriptor usable with primitives from the :mod:`select` module.
+
+ .. versionadded:: 3.3
+
.. method:: terminate()
Terminate the process. On Unix this is done using the ``SIGTERM`` signal;
@@ -424,7 +446,7 @@ The :mod:`multiprocessing` package mostly replicates the API of the
cause other processes to deadlock.
Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
- :meth:`terminate` and :attr:`exit_code` methods should only be called by
+ :meth:`terminate` and :attr:`exitcode` methods should only be called by
the process that created the process object.
Example usage of some of the methods of :class:`Process`:
@@ -445,6 +467,9 @@ The :mod:`multiprocessing` package mostly replicates the API of the
>>> p.exitcode == -signal.SIGTERM
True
+.. exception:: ProcessError
+
+ The base class of all :mod:`multiprocessing` exceptions.
.. exception:: BufferTooShort
@@ -454,6 +479,13 @@ The :mod:`multiprocessing` package mostly replicates the API of the
If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
the message as a byte string.
+.. exception:: AuthenticationError
+
+ Raised when there is an authentication error.
+
+.. exception:: TimeoutError
+
+ Raised by methods with a timeout when the timeout expires.
Pipes and Queues
~~~~~~~~~~~~~~~~
@@ -465,7 +497,7 @@ primitives like locks.
For passing messages one can use :func:`Pipe` (for a connection between two
processes) or a queue (which allows multiple producers and consumers).
-The :class:`Queue`, :class:`multiprocessing.queues.SimpleQueue` and :class:`JoinableQueue` types are multi-producer,
+The :class:`Queue`, :class:`SimpleQueue` and :class:`JoinableQueue` types are multi-producer,
multi-consumer FIFO queues modelled on the :class:`queue.Queue` class in the
standard library. They differ in that :class:`Queue` lacks the
:meth:`~queue.Queue.task_done` and :meth:`~queue.Queue.join` methods introduced
@@ -486,6 +518,24 @@ Note that one can also create a shared queue by using a manager object -- see
the :mod:`multiprocessing` namespace so you need to import them from
:mod:`queue`.
+.. note::
+
+ When an object is put on a queue, the object is pickled and a
+ background thread later flushes the pickled data to an underlying
+ pipe. This has some consequences which are a little surprising,
+ but should not cause any practical difficulties -- if they really
+ bother you then you can instead use a queue created with a
+ :ref:`manager <multiprocessing-managers>`.
+
+ (1) After putting an object on an empty queue there may be an
+ infinitesimal delay before the queue's :meth:`~Queue.empty`
+ method returns :const:`False` and :meth:`~Queue.get_nowait` can
+ return without raising :exc:`queue.Empty`.
+
+ (2) If multiple processes are enqueuing objects, it is possible for
+ the objects to be received at the other end out-of-order.
+ However, objects enqueued by the same process will always be in
+ the expected order with respect to each other.
.. warning::
@@ -497,7 +547,8 @@ Note that one can also create a shared queue by using a manager object -- see
.. warning::
As mentioned above, if a child process has put items on a queue (and it has
- not used :meth:`JoinableQueue.cancel_join_thread`), then that process will
+ not used :meth:`JoinableQueue.cancel_join_thread
+ <multiprocessing.Queue.cancel_join_thread>`), then that process will
not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless
@@ -530,7 +581,7 @@ For an example of the usage of queues for interprocess communication see
thread is started which transfers objects from a buffer into the pipe.
The usual :exc:`queue.Empty` and :exc:`queue.Full` exceptions from the
- standard library's :mod:`Queue` module are raised to signal timeouts.
+ standard library's :mod:`queue` module are raised to signal timeouts.
:class:`Queue` implements all the methods of :class:`queue.Queue` except for
:meth:`~queue.Queue.task_done` and :meth:`~queue.Queue.join`.
@@ -609,8 +660,15 @@ For an example of the usage of queues for interprocess communication see
the background thread from being joined automatically when the process
exits -- see :meth:`join_thread`.
+ A better name for this method might be
+ ``allow_exit_without_flush()``. It is likely to cause enqueued
+ data to lost, and you almost certainly will not need to use it.
+ It is really only there if you need the current process to exit
+ immediately without waiting to flush enqueued data to the
+ underlying pipe, and you don't care about lost data.
+
-.. class:: multiprocessing.queues.SimpleQueue()
+.. class:: SimpleQueue()
It is a simplified :class:`Queue` type, very close to a locked :class:`Pipe`.
@@ -634,12 +692,12 @@ For an example of the usage of queues for interprocess communication see
.. method:: task_done()
- Indicate that a formerly enqueued task is complete. Used by queue consumer
- threads. For each :meth:`~Queue.get` used to fetch a task, a subsequent
+ Indicate that a formerly enqueued task is complete. Used by queue
+ consumers. For each :meth:`~Queue.get` used to fetch a task, a subsequent
call to :meth:`task_done` tells the queue that the processing on the task
is complete.
- If a :meth:`~Queue.join` is currently blocking, it will resume when all
+ If a :meth:`~queue.Queue.join` is currently blocking, it will resume when all
items have been processed (meaning that a :meth:`task_done` call was
received for every item that had been :meth:`~Queue.put` into the queue).
@@ -652,10 +710,10 @@ For an example of the usage of queues for interprocess communication see
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls
+ queue. The count goes down whenever a consumer calls
:meth:`task_done` to indicate that the item was retrieved and all work on
it is complete. When the count of unfinished tasks drops to zero,
- :meth:`~Queue.join` unblocks.
+ :meth:`~queue.Queue.join` unblocks.
Miscellaneous
@@ -766,10 +824,12 @@ Connection objects are usually created using :func:`Pipe` -- see also
*timeout* is a number then this specifies the maximum time in seconds to
block. If *timeout* is ``None`` then an infinite timeout is used.
+ Note that multiple connection objects may be polled at once by
+ using :func:`multiprocessing.connection.wait`.
+
.. method:: send_bytes(buffer[, offset[, size]])
- Send byte data from an object supporting the buffer interface as a
- complete message.
+ Send byte data from a :term:`bytes-like object` as a complete message.
If *offset* is given then data is read from that position in *buffer*. If
*size* is given then that many bytes will be read from buffer. Very large
@@ -784,9 +844,14 @@ Connection objects are usually created using :func:`Pipe` -- see also
to receive and the other end has closed.
If *maxlength* is specified and the message is longer than *maxlength*
- then :exc:`IOError` is raised and the connection will no longer be
+ then :exc:`OSError` is raised and the connection will no longer be
readable.
+ .. versionchanged:: 3.3
+ This function used to raise a :exc:`IOError`, which is now an
+ alias of :exc:`OSError`.
+
+
.. method:: recv_bytes_into(buffer[, offset])
Read into *buffer* a complete message of byte data sent from the other end
@@ -795,7 +860,7 @@ Connection objects are usually created using :func:`Pipe` -- see also
:exc:`EOFError` if there is nothing left to receive and the other end was
closed.
- *buffer* must be an object satisfying the writable buffer interface. If
+ *buffer* must be a writable :term:`bytes-like object`. If
*offset* is given then the message will be written into the buffer from
that position. Offset must be a non-negative integer less than the
length of *buffer* (in bytes).
@@ -804,6 +869,14 @@ Connection objects are usually created using :func:`Pipe` -- see also
raised and the complete message is available as ``e.args[0]`` where ``e``
is the exception instance.
+ .. versionchanged:: 3.3
+ Connection objects themselves can now be transferred between processes
+ using :meth:`Connection.send` and :meth:`Connection.recv`.
+
+ .. versionadded:: 3.3
+ Connection objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the
+ connection object, and :meth:`~contextmanager.__exit__` calls :meth:`close`.
For example:
@@ -855,6 +928,12 @@ program as they are in a multithreaded program. See the documentation for
Note that one can also create synchronization primitives by using a manager
object -- see :ref:`multiprocessing-managers`.
+.. class:: Barrier(parties[, action[, timeout]])
+
+ A barrier object: a clone of :class:`threading.Barrier`.
+
+ .. versionadded:: 3.3
+
.. class:: BoundedSemaphore([value])
A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
@@ -864,20 +943,17 @@ object -- see :ref:`multiprocessing-managers`.
.. class:: Condition([lock])
- A condition variable: a clone of :class:`threading.Condition`.
+ A condition variable: an alias for :class:`threading.Condition`.
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
object from :mod:`multiprocessing`.
+ .. versionchanged:: 3.3
+ The :meth:`~threading.Condition.wait_for` method was added.
+
.. class:: Event()
A clone of :class:`threading.Event`.
- This method returns the state of the internal semaphore on exit, so it
- will always return ``True`` except if a timeout is given and the operation
- times out.
-
- .. versionchanged:: 3.1
- Previously, the method always returned ``None``.
.. class:: Lock()
@@ -893,6 +969,12 @@ object -- see :ref:`multiprocessing-managers`.
.. note::
+ The :meth:`acquire` and :meth:`wait` methods of each of these types
+ treat negative timeouts as zero timeouts. This differs from
+ :mod:`threading` where, since version 3.2, the equivalent
+ :meth:`acquire` methods treat negative timeouts as infinite
+ timeouts.
+
On Mac OS X, ``sem_timedwait`` is unsupported, so calling ``acquire()`` with
a timeout will emulate that function's behavior using a sleeping loop.
@@ -914,21 +996,34 @@ Shared :mod:`ctypes` Objects
It is possible to create shared objects using shared memory which can be
inherited by child processes.
-.. function:: Value(typecode_or_type, *args[, lock])
+.. function:: Value(typecode_or_type, *args, lock=True)
Return a :mod:`ctypes` object allocated from shared memory. By default the
- return value is actually a synchronized wrapper for the object.
+ return value is actually a synchronized wrapper for the object. The object
+ itself can be accessed via the *value* attribute of a :class:`Value`.
*typecode_or_type* determines the type of the returned object: it is either a
ctypes type or a one character typecode of the kind used by the :mod:`array`
module. *\*args* is passed on to the constructor for the type.
- If *lock* is ``True`` (the default) then a new lock object is created to
- synchronize access to the value. If *lock* is a :class:`Lock` or
- :class:`RLock` object then that will be used to synchronize access to the
- value. If *lock* is ``False`` then access to the returned object will not be
- automatically protected by a lock, so it will not necessarily be
- "process-safe".
+ If *lock* is ``True`` (the default) then a new recursive lock
+ object is created to synchronize access to the value. If *lock* is
+ a :class:`Lock` or :class:`RLock` object then that will be used to
+ synchronize access to the value. If *lock* is ``False`` then
+ access to the returned object will not be automatically protected
+ by a lock, so it will not necessarily be "process-safe".
+
+ Operations like ``+=`` which involve a read and write are not
+ atomic. So if, for instance, you want to atomically increment a
+ shared value it is insufficient to just do ::
+
+ counter.value += 1
+
+ Assuming the associated lock is recursive (which it is by default)
+ you can instead do ::
+
+ with counter.get_lock():
+ counter.value += 1
Note that *lock* is a keyword-only argument.
@@ -1006,30 +1101,31 @@ processes.
attributes which allow one to use it to store and retrieve strings -- see
documentation for :mod:`ctypes`.
-.. function:: Array(typecode_or_type, size_or_initializer, *args[, lock])
+.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
The same as :func:`RawArray` except that depending on the value of *lock* a
process-safe synchronization wrapper may be returned instead of a raw ctypes
array.
If *lock* is ``True`` (the default) then a new lock object is created to
- synchronize access to the value. If *lock* is a :class:`Lock` or
- :class:`RLock` object then that will be used to synchronize access to the
+ synchronize access to the value. If *lock* is a
+ :class:`~multiprocessing.Lock` or :class:`~multiprocessing.RLock` object
+ then that will be used to synchronize access to the
value. If *lock* is ``False`` then access to the returned object will not be
automatically protected by a lock, so it will not necessarily be
"process-safe".
Note that *lock* is a keyword-only argument.
-.. function:: Value(typecode_or_type, *args[, lock])
+.. function:: Value(typecode_or_type, *args, lock=True)
The same as :func:`RawValue` except that depending on the value of *lock* a
process-safe synchronization wrapper may be returned instead of a raw ctypes
object.
If *lock* is ``True`` (the default) then a new lock object is created to
- synchronize access to the value. If *lock* is a :class:`Lock` or
- :class:`RLock` object then that will be used to synchronize access to the
+ synchronize access to the value. If *lock* is a :class:`~multiprocessing.Lock` or
+ :class:`~multiprocessing.RLock` object then that will be used to synchronize access to the
value. If *lock* is ``False`` then access to the returned object will not be
automatically protected by a lock, so it will not necessarily be
"process-safe".
@@ -1123,8 +1219,10 @@ Managers
~~~~~~~~
Managers provide a way to create data which can be shared between different
-processes. A manager object controls a server process which manages *shared
-objects*. Other processes can access the shared objects by using proxies.
+processes, including sharing over a network between processes running on
+different machines. A manager object controls a server process which manages
+*shared objects*. Other processes can access the shared objects by using
+proxies.
.. function:: multiprocessing.Manager()
@@ -1197,9 +1295,10 @@ their parent process exits. The manager classes are defined in the
type of shared object. This must be a string.
*callable* is a callable used for creating objects for this type
- identifier. If a manager instance will be created using the
- :meth:`from_address` classmethod or if the *create_method* argument is
- ``False`` then this can be left as ``None``.
+ identifier. If a manager instance will be connected to the
+ server using the :meth:`connect` method, or if the
+ *create_method* argument is ``False`` then this can be left as
+ ``None``.
*proxytype* is a subclass of :class:`BaseProxy` which is used to create
proxies for shared objects with this *typeid*. If ``None`` then a proxy
@@ -1207,12 +1306,12 @@ their parent process exits. The manager classes are defined in the
*exposed* is used to specify a sequence of method names which proxies for
this typeid should be allowed to access using
- :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then
+ :meth:`BaseProxy._callmethod`. (If *exposed* is ``None`` then
:attr:`proxytype._exposed_` is used instead if it exists.) In the case
where no exposed list is specified, all "public methods" of the shared
object will be accessible. (Here a "public method" means any attribute
- which has a :meth:`__call__` method and whose name does not begin with
- ``'_'``.)
+ which has a :meth:`~object.__call__` method and whose name does not begin
+ with ``'_'``.)
*method_to_typeid* is a mapping used to specify the return type of those
exposed methods which should return a proxy. It maps method names to
@@ -1231,6 +1330,14 @@ their parent process exits. The manager classes are defined in the
The address used by the manager.
+ .. versionchanged:: 3.3
+ Manager objects support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` starts the
+ server process (if it has not already started) and then returns the
+ manager object. :meth:`~contextmanager.__exit__` calls :meth:`shutdown`.
+
+ In previous versions :meth:`~contextmanager.__enter__` did not start the
+ manager's server process if it was not already started.
.. class:: SyncManager
@@ -1240,6 +1347,13 @@ their parent process exits. The manager classes are defined in the
It also supports creation of shared lists and dictionaries.
+ .. method:: Barrier(parties[, action[, timeout]])
+
+ Create a shared :class:`threading.Barrier` object and return a
+ proxy for it.
+
+ .. versionadded:: 3.3
+
.. method:: BoundedSemaphore([value])
Create a shared :class:`threading.BoundedSemaphore` object and return a
@@ -1253,6 +1367,9 @@ their parent process exits. The manager classes are defined in the
If *lock* is supplied then it should be a proxy for a
:class:`threading.Lock` or :class:`threading.RLock` object.
+ .. versionchanged:: 3.3
+ The :meth:`~threading.Condition.wait_for` method was added.
+
.. method:: Event()
Create a shared :class:`threading.Event` object and return a proxy for it.
@@ -1358,11 +1475,10 @@ callables with the manager class. For example::
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
- manager = MyManager()
- manager.start()
- maths = manager.Maths()
- print(maths.add(4, 3)) # prints 7
- print(maths.mul(7, 8)) # prints 56
+ with MyManager() as manager:
+ maths = manager.Maths()
+ print(maths.add(4, 3)) # prints 7
+ print(maths.mul(7, 8)) # prints 56
Using a remote manager
@@ -1562,7 +1678,7 @@ Process Pools
One can create a pool of processes which will carry out tasks submitted to it
with the :class:`Pool` class.
-.. class:: multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
+.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
A process pool object which controls a pool of worker processes to which jobs
can be submitted. It supports asynchronous results with timeouts and
@@ -1573,6 +1689,9 @@ with the :class:`Pool` class.
*initializer* is not ``None`` then each worker process will call
``initializer(*initargs)`` when it starts.
+ Note that the methods of the pool object should only be called by
+ the process which created the pool.
+
.. versionadded:: 3.2
*maxtasksperchild* is the number of tasks a worker process can complete
before it will exit and be replaced with a fresh worker process, to enable
@@ -1657,6 +1776,24 @@ with the :class:`Pool` class.
returned iterator should be considered arbitrary. (Only when there is
only one worker process is the order guaranteed to be "correct".)
+ .. method:: starmap(func, iterable[, chunksize])
+
+ Like :meth:`map` except that the elements of the `iterable` are expected
+ to be iterables that are unpacked as arguments.
+
+ Hence an `iterable` of `[(1,2), (3, 4)]` results in `[func(1,2),
+ func(3,4)]`.
+
+ .. versionadded:: 3.3
+
+ .. method:: starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
+
+ A combination of :meth:`starmap` and :meth:`map_async` that iterates over
+ `iterable` of iterables and calls `func` with the iterables unpacked.
+ Returns a result object.
+
+ .. versionadded:: 3.3
+
.. method:: close()
Prevents any more tasks from being submitted to the pool. Once all the
@@ -1673,6 +1810,11 @@ with the :class:`Pool` class.
Wait for the worker processes to exit. One must call :meth:`close` or
:meth:`terminate` before using :meth:`join`.
+ .. versionadded:: 3.3
+ Pool objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the
+ pool object, and :meth:`~contextmanager.__exit__` calls :meth:`terminate`.
+
.. class:: AsyncResult
@@ -1707,21 +1849,20 @@ The following example demonstrates the use of a pool::
return x*x
if __name__ == '__main__':
- pool = Pool(processes=4) # start 4 worker processes
+ with Pool(processes=4) as pool: # start 4 worker processes
+ result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
+ print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
+ print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ it = pool.imap(f, range(10))
+ print(next(it)) # prints "0"
+ print(next(it)) # prints "1"
+ print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
- it = pool.imap(f, range(10))
- print(next(it)) # prints "0"
- print(next(it)) # prints "1"
- print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
-
- import time
- result = pool.apply_async(time.sleep, (10,))
- print(result.get(timeout=1)) # raises TimeoutError
+ import time
+ result = pool.apply_async(time.sleep, (10,))
+ print(result.get(timeout=1)) # raises TimeoutError
.. _multiprocessing-listeners-clients:
@@ -1733,12 +1874,14 @@ Listeners and Clients
:synopsis: API for dealing with sockets.
Usually message passing between processes is done using queues or by using
-:class:`Connection` objects returned by :func:`Pipe`.
+:class:`~multiprocessing.Connection` objects returned by
+:func:`~multiprocessing.Pipe`.
However, the :mod:`multiprocessing.connection` module allows some extra
flexibility. It basically gives a high level message oriented API for dealing
-with sockets or Windows named pipes, and also has support for *digest
-authentication* using the :mod:`hmac` module.
+with sockets or Windows named pipes. It also has support for *digest
+authentication* using the :mod:`hmac` module, and for polling
+multiple connections at the same time.
.. function:: deliver_challenge(connection, authkey)
@@ -1748,15 +1891,15 @@ authentication* using the :mod:`hmac` module.
If the reply matches the digest of the message using *authkey* as the key
then a welcome message is sent to the other end of the connection. Otherwise
- :exc:`AuthenticationError` is raised.
+ :exc:`~multiprocessing.AuthenticationError` is raised.
-.. function:: answerChallenge(connection, authkey)
+.. function:: answer_challenge(connection, authkey)
Receive a message, calculate the digest of the message using *authkey* as the
key, and then send the digest back.
- If a welcome message is not received, then :exc:`AuthenticationError` is
- raised.
+ If a welcome message is not received, then
+ :exc:`~multiprocessing.AuthenticationError` is raised.
.. function:: Client(address[, family[, authenticate[, authkey]]])
@@ -1770,7 +1913,8 @@ authentication* using the :mod:`hmac` module.
If *authenticate* is ``True`` or *authkey* is a byte string then digest
authentication is used. The key used for authentication will be either
*authkey* or ``current_process().authkey`` if *authkey* is ``None``.
- If authentication fails then :exc:`AuthenticationError` is raised. See
+ If authentication fails then
+ :exc:`~multiprocessing.AuthenticationError` is raised. See
:ref:`multiprocessing-auth-keys`.
.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
@@ -1799,7 +1943,8 @@ authentication* using the :mod:`hmac` module.
private temporary directory created using :func:`tempfile.mkstemp`.
If the listener object uses a socket then *backlog* (1 by default) is passed
- to the :meth:`listen` method of the socket once it has been bound.
+ to the :meth:`~socket.socket.listen` method of the socket once it has been
+ bound.
If *authenticate* is ``True`` (``False`` by default) or *authkey* is not
``None`` then digest authentication is used.
@@ -1811,13 +1956,15 @@ authentication* using the :mod:`hmac` module.
``current_process().authkey`` is used as the authentication key. If
*authkey* is ``None`` and *authenticate* is ``False`` then no
authentication is done. If authentication fails then
- :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`.
+ :exc:`~multiprocessing.AuthenticationError` is raised.
+ See :ref:`multiprocessing-auth-keys`.
.. method:: accept()
Accept a connection on the bound socket or named pipe of the listener
- object and return a :class:`Connection` object. If authentication is
- attempted and fails, then :exc:`AuthenticationError` is raised.
+ object and return a :class:`~multiprocessing.Connection` object. If
+ authentication is attempted and fails, then
+ :exc:`~multiprocessing.AuthenticationError` is raised.
.. method:: close()
@@ -1836,12 +1983,44 @@ authentication* using the :mod:`hmac` module.
The address from which the last accepted connection came. If this is
unavailable then it is ``None``.
+ .. versionadded:: 3.3
+ Listener objects now support the context manager protocol -- see
+ :ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the
+ listener object, and :meth:`~contextmanager.__exit__` calls :meth:`close`.
-The module defines two exceptions:
+.. function:: wait(object_list, timeout=None)
-.. exception:: AuthenticationError
+ Wait till an object in *object_list* is ready. Returns the list of
+ those objects in *object_list* which are ready. If *timeout* is a
+ float then the call blocks for at most that many seconds. If
+ *timeout* is ``None`` then it will block for an unlimited period.
+ A negative timeout is equivalent to a zero timeout.
+
+ For both Unix and Windows, an object can appear in *object_list* if
+ it is
+
+ * a readable :class:`~multiprocessing.Connection` object;
+ * a connected and readable :class:`socket.socket` object; or
+ * the :attr:`~multiprocessing.Process.sentinel` attribute of a
+ :class:`~multiprocessing.Process` object.
+
+ A connection or socket object is ready when there is data available
+ to be read from it, or the other end has been closed.
- Exception raised when there is an authentication error.
+ **Unix**: ``wait(object_list, timeout)`` almost equivalent
+ ``select.select(object_list, [], [], timeout)``. The difference is
+ that, if :func:`select.select` is interrupted by a signal, it can
+ raise :exc:`OSError` with an error number of ``EINTR``, whereas
+ :func:`wait` will not.
+
+ **Windows**: An item in *object_list* must either be an integer
+ handle which is waitable (according to the definition used by the
+ documentation of the Win32 function ``WaitForMultipleObjects()``)
+ or it can be an object with a :meth:`fileno` method which returns a
+ socket handle or pipe handle. (Note that pipe handles and socket
+ handles are **not** waitable handles.)
+
+ .. versionadded:: 3.3
**Examples**
@@ -1854,19 +2033,16 @@ the client::
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
- listener = Listener(address, authkey=b'secret password')
-
- conn = listener.accept()
- print('connection accepted from', listener.last_accepted)
- conn.send([2.25, None, 'junk', float])
+ with Listener(address, authkey=b'secret password') as listener:
+ with listener.accept() as conn:
+ print('connection accepted from', listener.last_accepted)
- conn.send_bytes(b'hello')
+ conn.send([2.25, None, 'junk', float])
- conn.send_bytes(array('i', [42, 1729]))
+ conn.send_bytes(b'hello')
- conn.close()
- listener.close()
+ conn.send_bytes(array('i', [42, 1729]))
The following code connects to the server and receives some data from the
server::
@@ -1875,17 +2051,50 @@ server::
from array import array
address = ('localhost', 6000)
- conn = Client(address, authkey=b'secret password')
- print(conn.recv()) # => [2.25, None, 'junk', float]
+ with Client(address, authkey=b'secret password') as conn:
+ print(conn.recv()) # => [2.25, None, 'junk', float]
+
+ print(conn.recv_bytes()) # => 'hello'
+
+ arr = array('i', [0, 0, 0, 0, 0])
+ print(conn.recv_bytes_into(arr)) # => 8
+ print(arr) # => array('i', [42, 1729, 0, 0, 0])
- print(conn.recv_bytes()) # => 'hello'
+The following code uses :func:`~multiprocessing.connection.wait` to
+wait for messages from multiple processes at once::
- arr = array('i', [0, 0, 0, 0, 0])
- print(conn.recv_bytes_into(arr)) # => 8
- print(arr) # => array('i', [42, 1729, 0, 0, 0])
+ import time, random
+ from multiprocessing import Process, Pipe, current_process
+ from multiprocessing.connection import wait
- conn.close()
+ def foo(w):
+ for i in range(10):
+ w.send((i, current_process().name))
+ w.close()
+
+ if __name__ == '__main__':
+ readers = []
+
+ for i in range(4):
+ r, w = Pipe(duplex=False)
+ readers.append(r)
+ p = Process(target=foo, args=(w,))
+ p.start()
+ # We close the writable end of the pipe now to be sure that
+ # p is the only process which owns a handle for it. This
+ # ensures that when p closes its handle for the writable end,
+ # wait() will promptly report the readable end as being ready.
+ w.close()
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ else:
+ print(msg)
.. _multiprocessing-address-formats:
@@ -1913,7 +2122,8 @@ an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address.
Authentication keys
~~~~~~~~~~~~~~~~~~~
-When one uses :meth:`Connection.recv`, the data received is automatically
+When one uses :meth:`Connection.recv <multiprocessing.Connection.recv>`, the
+data received is automatically
unpickled. Unfortunately unpickling data from an untrusted source is a security
risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module
to provide digest authentication.
@@ -2046,7 +2256,7 @@ Avoid shared state
It is probably best to stick to using queues or pipes for communication
between processes rather than using the lower level synchronization
- primitives from the :mod:`threading` module.
+ primitives.
Picklability
@@ -2063,9 +2273,10 @@ Joining zombie processes
On Unix when a process finishes but has not been joined it becomes a zombie.
There should never be very many because each time a new process starts (or
- :func:`active_children` is called) all completed processes which have not
- yet been joined will be joined. Also calling a finished process's
- :meth:`Process.is_alive` will join the process. Even so it is probably good
+ :func:`~multiprocessing.active_children` is called) all completed processes
+ which have not yet been joined will be joined. Also calling a finished
+ process's :meth:`Process.is_alive <multiprocessing.Process.is_alive>` will
+ join the process. Even so it is probably good
practice to explicitly join all the processes that you start.
Better to inherit than pickle/unpickle
@@ -2078,20 +2289,23 @@ Better to inherit than pickle/unpickle
Avoid terminating processes
- Using the :meth:`Process.terminate` method to stop a process is liable to
+ Using the :meth:`Process.terminate <multiprocessing.Process.terminate>`
+ method to stop a process is liable to
cause any shared resources (such as locks, semaphores, pipes and queues)
currently being used by the process to become broken or unavailable to other
processes.
Therefore it is probably best to only consider using
- :meth:`Process.terminate` on processes which never use any shared resources.
+ :meth:`Process.terminate <multiprocessing.Process.terminate>` on processes
+ which never use any shared resources.
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before
terminating until all the buffered items are fed by the "feeder" thread to
the underlying pipe. (The child process can call the
- :meth:`Queue.cancel_join_thread` method of the queue to avoid this behaviour.)
+ :meth:`Queue.cancel_join_thread <multiprocessing.Queue.cancel_join_thread>`
+ method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all
items which have been put on the queue will eventually be removed before the
@@ -2168,7 +2382,7 @@ Beware of replacing :data:`sys.stdin` with a "file like object"
resulting in a bad file descriptor error, but introduces a potential danger
to applications which replace :func:`sys.stdin` with a "file-like object"
with output buffering. This danger is that if multiple processes call
- :func:`close()` on this file-like object, it could result in the same
+ :meth:`~io.IOBase.close()` on this file-like object, it could result in the same
data being flushed to the object multiple times, resulting in corruption.
If you write a file-like object and implement your own caching, you can
@@ -2197,14 +2411,16 @@ More picklability
as the ``target`` argument on Windows --- just define a function and use
that instead.
- Also, if you subclass :class:`Process` then make sure that instances will be
- picklable when the :meth:`Process.start` method is called.
+ Also, if you subclass :class:`~multiprocessing.Process` then make sure that
+ instances will be picklable when the :meth:`Process.start
+ <multiprocessing.Process.start>` method is called.
Global variables
Bear in mind that if code run in a child process tries to access a global
variable, then the value it sees (if any) may not be the same as the value
- in the parent process at the time that :meth:`Process.start` was called.
+ in the parent process at the time that :meth:`Process.start
+ <multiprocessing.Process.start>` was called.
However, global variables which are just module level constants cause no
problems.
@@ -2260,7 +2476,7 @@ Demonstration of how to create and use customized managers and proxies:
:language: python3
-Using :class:`Pool`:
+Using :class:`~multiprocessing.pool.Pool`:
.. literalinclude:: ../includes/mp_pool.py
:language: python3