diff options
author | R David Murray <rdmurray@bitdance.com> | 2012-04-13 21:27:19 -0400 |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2012-04-13 21:27:19 -0400 |
commit | ea7bc055b08fb442986e0735b1b5b325b0b51ecc (patch) | |
tree | 20c2b837c787daf7e95ece075e6544627b17625a /Lib/multiprocessing/queues.py | |
parent | c32365d2ce552d1d17279e30a564509235ffb1db (diff) | |
parent | c2c9c905706160bf34aea12f2348210aac3e0da2 (diff) | |
download | cpython-ea7bc055b08fb442986e0735b1b5b325b0b51ecc.tar.gz |
Merge #14399: corrected news item
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 51d991245c..262fd85733 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -39,12 +39,12 @@ import os import threading import collections import time -import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -67,6 +67,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -75,11 +77,11 @@ class Queue(object): def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, + return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() @@ -182,7 +184,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -233,7 +235,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -275,6 +277,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has @@ -356,6 +360,7 @@ class SimpleQueue(object): def __init__(self): self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() + self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: @@ -363,7 +368,7 @@ class SimpleQueue(object): self._make_methods() def empty(self): - return not self._reader.poll() + return not self._poll() def __getstate__(self): assert_spawning(self) |