summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r--Lib/multiprocessing/connection.py705
1 files changed, 587 insertions, 118 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index a5dc2a8424..22589d0422 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -4,49 +4,34 @@
# multiprocessing/connection.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
-__all__ = [ 'Client', 'Listener', 'Pipe' ]
+__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+from multiprocessing.forking import ForkingPickler
+try:
+ import _winapi
+ from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ _winapi = None
#
#
@@ -122,6 +107,325 @@ def address_type(address):
raise ValueError('address type of %r unrecognized' % address)
#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = io.BytesIO()
+ ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+ self._send_bytes(buf.getbuffer())
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
+
+if _winapi:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _got_empty_message = False
+
+ def _close(self, _CloseHandle=_winapi.CloseHandle):
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nwritten, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ if self._got_empty_message:
+ self._got_empty_message = False
+ return io.BytesIO()
+ else:
+ bsize = 128 if maxsize is None else min(maxsize, 128)
+ try:
+ ov, err = _winapi.ReadFile(self._handle, bsize,
+ overlapped=True)
+ try:
+ if err == _winapi.ERROR_IO_PENDING:
+ waitres = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nread, err = ov.GetOverlappedResult(True)
+ if err == 0:
+ f = io.BytesIO()
+ f.write(ov.getbuffer())
+ return f
+ elif err == _winapi.ERROR_MORE_DATA:
+ return self._get_more_data(ov, maxsize)
+ except IOError as e:
+ if e.winerror == _winapi.ERROR_BROKEN_PIPE:
+ raise EOFError
+ else:
+ raise
+ raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
+
+ def _poll(self, timeout):
+ if (self._got_empty_message or
+ _winapi.PeekNamedPipe(self._handle)[0] != 0):
+ return True
+ return bool(wait([self], timeout))
+
+ def _get_more_data(self, ov, maxsize):
+ buf = ov.getbuffer()
+ f = io.BytesIO()
+ f.write(buf)
+ left = _winapi.PeekNamedPipe(self._handle)[1]
+ assert left > 0
+ if maxsize is not None and len(buf) + left > maxsize:
+ self._bad_message_length()
+ ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
+ rbytes, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert rbytes == left
+ f.write(ov.getbuffer())
+ return f
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if _winapi:
+ def _close(self, _close=_multiprocessing.closesocket):
+ _close(self._handle)
+ _write = _multiprocessing.send
+ _read = _multiprocessing.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ try:
+ n = write(self._handle, buf)
+ except InterruptedError:
+ continue
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ remaining = size
+ while remaining > 0:
+ try:
+ chunk = read(handle, remaining)
+ except InterruptedError:
+ continue
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ n = len(buf)
+ # For wire compatibility with 3.2 and lower
+ header = struct.pack("!i", n)
+ if n > 16384:
+ # The payload is large so Nagle's algorithm won't be triggered
+ # and we'd better avoid the cost of concatenation.
+ chunks = [header, buf]
+ elif n > 0:
+ # Issue # 20540: concatenate before sending, to avoid delays due
+ # to Nagle's algorithm on a TCP socket.
+ chunks = [header + buf]
+ else:
+ # This code path is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ chunks = [header]
+ for chunk in chunks:
+ self._send(chunk)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("!i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ r = wait([self], timeout)
+ return bool(r)
+
+
+#
# Public functions
#
@@ -154,6 +458,8 @@ class Listener(object):
Returns a `Connection` object.
'''
+ if self._listener is None:
+ raise IOError('listener is closed')
c = self._listener.accept()
if self._authkey:
deliver_challenge(c, self._authkey)
@@ -164,11 +470,19 @@ class Listener(object):
'''
Close the bound socket or named pipe of `self`.
'''
- return self._listener.close()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
address = property(lambda self: self._listener._address)
last_accepted = property(lambda self: self._listener._last_accepted)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.close()
+
def Client(address, family=None, authkey=None):
'''
@@ -201,19 +515,16 @@ if sys.platform != 'win32':
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
def Pipe(duplex=True):
'''
@@ -221,35 +532,35 @@ else:
'''
address = arbitrary_address('AF_PIPE')
if duplex:
- openmode = win32.PIPE_ACCESS_DUPLEX
- access = win32.GENERIC_READ | win32.GENERIC_WRITE
+ openmode = _winapi.PIPE_ACCESS_DUPLEX
+ access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
- openmode = win32.PIPE_ACCESS_INBOUND
- access = win32.GENERIC_WRITE
+ openmode = _winapi.PIPE_ACCESS_INBOUND
+ access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
- h1 = win32.CreateNamedPipe(
- address, openmode,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ h1 = _winapi.CreateNamedPipe(
+ address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
+ _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
- h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ h2 = _winapi.CreateFile(
+ address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
- win32.SetNamedPipeHandleState(
- h2, win32.PIPE_READMODE_MESSAGE, None, None
+ _winapi.SetNamedPipeHandleState(
+ h2, _winapi.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
+ _, err = overlapped.GetOverlappedResult(True)
+ assert err == 0
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -264,12 +575,15 @@ class SocketListener(object):
def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family))
try:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # SO_REUSEADDR has different semantics on Windows (issue #2550).
+ if os.name == 'posix':
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
self._socket.setblocking(True)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
- except socket.error:
+ except OSError:
self._socket.close()
raise
self._family = family
@@ -283,12 +597,15 @@ class SocketListener(object):
self._unlink = None
def accept(self):
- s, self._last_accepted = self._socket.accept()
+ while True:
+ try:
+ s, self._last_accepted = self._socket.accept()
+ except InterruptedError:
+ pass
+ else:
+ break
s.setblocking(True)
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- s.close()
- return conn
+ return Connection(s.detach())
def close(self):
self._socket.close()
@@ -303,24 +620,8 @@ def SocketClient(address):
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
s.setblocking(True)
- t = _init_timeout()
-
- while 1:
- try:
- s.connect(address)
- except socket.error as e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
- else:
- break
- else:
- raise
-
- fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
- return conn
+ s.connect(address)
+ return Connection(s.detach())
#
# Definitions for connections based on named pipes
@@ -334,48 +635,55 @@ if sys.platform == 'win32':
'''
def __init__(self, address, backlog=None):
self._address = address
- handle = win32.CreateNamedPipe(
- address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue = [handle]
- self._last_accepted = None
+ self._handle_queue = [self._new_handle(first=True)]
+ self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
-
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
- def accept(self):
- newhandle = win32.CreateNamedPipe(
- self._address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
+ def _new_handle(self, first=False):
+ flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+ return _winapi.CreateNamedPipe(
+ self._address, flags,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
+ _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
- self._handle_queue.append(newhandle)
+
+ def accept(self):
+ self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
try:
- win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError as e:
+ ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
+ except OSError as e:
+ if e.winerror != _winapi.ERROR_NO_DATA:
+ raise
# ERROR_NO_DATA can occur if a client has already connected,
# written data and then disconnected -- see Issue 14725.
- if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
- win32.ERROR_NO_DATA):
+ else:
+ try:
+ res = _winapi.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ except:
+ ov.cancel()
+ _winapi.CloseHandle(handle)
raise
- return _multiprocessing.PipeConnection(handle)
+ finally:
+ _, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
sub_debug('closing listener with address=%r', address)
for handle in queue:
- close(handle)
+ _winapi.CloseHandle(handle)
def PipeClient(address):
'''
@@ -384,24 +692,25 @@ if sys.platform == 'win32':
t = _init_timeout()
while 1:
try:
- win32.WaitNamedPipe(address, 1000)
- h = win32.CreateFile(
- address, win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ _winapi.WaitNamedPipe(address, 1000)
+ h = _winapi.CreateFile(
+ address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
+ 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
)
except WindowsError as e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY) or _check_timeout(t):
+ if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
+ _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
else:
break
else:
raise
- win32.SetNamedPipeHandleState(
- h, win32.PIPE_READMODE_MESSAGE, None, None
+ _winapi.SetNamedPipeHandleState(
+ h, _winapi.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -458,10 +767,10 @@ class ConnectionWrapper(object):
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
return obj
class XmlListener(Listener):
@@ -475,3 +784,163 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+#
+# Wait
+#
+
+if sys.platform == 'win32':
+
+ def _exhaustive_wait(handles, timeout):
+ # Return ALL handles which are currently signalled. (Only
+ # returning the first signalled might create starvation issues.)
+ L = list(handles)
+ ready = []
+ while L:
+ res = _winapi.WaitForMultipleObjects(L, False, timeout)
+ if res == WAIT_TIMEOUT:
+ break
+ elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
+ res -= WAIT_OBJECT_0
+ elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
+ res -= WAIT_ABANDONED_0
+ else:
+ raise RuntimeError('Should not get here')
+ ready.append(L[res])
+ L = L[res+1:]
+ timeout = 0
+ return ready
+
+ _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is None:
+ timeout = INFINITE
+ elif timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+
+ object_list = list(object_list)
+ waithandle_to_obj = {}
+ ov_list = []
+ ready_objects = set()
+ ready_handles = set()
+
+ try:
+ for o in object_list:
+ try:
+ fileno = getattr(o, 'fileno')
+ except AttributeError:
+ waithandle_to_obj[o.__index__()] = o
+ else:
+ # start an overlapped read of length zero
+ try:
+ ov, err = _winapi.ReadFile(fileno(), 0, True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err == _winapi.ERROR_IO_PENDING:
+ ov_list.append(ov)
+ waithandle_to_obj[ov.event] = o
+ else:
+ # If o.fileno() is an overlapped pipe handle and
+ # err == 0 then there is a zero length message
+ # in the pipe, but it HAS NOT been consumed.
+ ready_objects.add(o)
+ timeout = 0
+
+ ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
+ finally:
+ # request that overlapped reads stop
+ for ov in ov_list:
+ ov.cancel()
+
+ # wait for all overlapped reads to stop
+ for ov in ov_list:
+ try:
+ _, err = ov.GetOverlappedResult(True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err != _winapi.ERROR_OPERATION_ABORTED:
+ o = waithandle_to_obj[ov.event]
+ ready_objects.add(o)
+ if err == 0:
+ # If o.fileno() is an overlapped pipe handle then
+ # a zero length message HAS been consumed.
+ if hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+
+ ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
+ return [o for o in object_list if o in ready_objects]
+
+else:
+
+ if hasattr(select, 'poll'):
+ def _poll(fds, timeout):
+ if timeout is not None:
+ timeout = int(timeout * 1000) # timeout is in milliseconds
+ fd_map = {}
+ pollster = select.poll()
+ for fd in fds:
+ pollster.register(fd, select.POLLIN)
+ if hasattr(fd, 'fileno'):
+ fd_map[fd.fileno()] = fd
+ else:
+ fd_map[fd] = fd
+ ls = []
+ for fd, event in pollster.poll(timeout):
+ if event & select.POLLNVAL:
+ raise ValueError('invalid file descriptor %i' % fd)
+ ls.append(fd_map[fd])
+ return ls
+ else:
+ def _poll(fds, timeout):
+ return select.select(fds, [], [], timeout)[0]
+
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is not None:
+ if timeout <= 0:
+ return _poll(object_list, 0)
+ else:
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return _poll(object_list, timeout)
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
+ if timeout is not None:
+ timeout = deadline - time.time()
+
+#
+# Make connection and socket objects sharable if possible
+#
+
+if sys.platform == 'win32':
+ from . import reduction
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)
+ ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+else:
+ try:
+ from . import reduction
+ except ImportError:
+ pass
+ else:
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)