diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-01-26 22:26:54 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-01-26 22:26:54 +0100 |
commit | 43edf148f8eeb12f516289cb36decc0702cbfd81 (patch) | |
tree | ab833211defb5dbe2bf105a8df21bee472597ef5 | |
parent | c05854eb73b54b352d4e3b2fb86281bc2b60ac2c (diff) | |
parent | 31fa887dbd1e6c89728b0ed99f7974266852d3a8 (diff) | |
download | trollius-43edf148f8eeb12f516289cb36decc0702cbfd81.tar.gz |
Merge Tulip into Trollius
-rw-r--r-- | examples/simple_tcp_server.py | 4 | ||||
-rw-r--r-- | overlapped.c | 123 | ||||
-rwxr-xr-x | release.py | 3 | ||||
-rw-r--r-- | tests/test_windows_events.py | 13 | ||||
-rw-r--r-- | trollius/base_events.py | 49 | ||||
-rw-r--r-- | trollius/windows_events.py | 161 | ||||
-rw-r--r-- | trollius/windows_utils.py | 2 |
7 files changed, 171 insertions, 184 deletions
diff --git a/examples/simple_tcp_server.py b/examples/simple_tcp_server.py index 882938e..247f6e6 100644 --- a/examples/simple_tcp_server.py +++ b/examples/simple_tcp_server.py @@ -4,7 +4,7 @@ style and uses asyncio.streams.start_server() and asyncio.streams.open_connection(). Note that running this example starts both the TCP server and client -in the same process. It listens on port 1234 on 127.0.0.1, so it will +in the same process. It listens on port 12345 on 127.0.0.1, so it will fail if this port is currently in use. """ @@ -88,7 +88,7 @@ class MyServer: def start(self, loop): """ - Starts the TCP server, so that it listens on port 1234. + Starts the TCP server, so that it listens on port 12345. For each client that connects, the accept_client method gets called. This method runs the loop until the server sockets diff --git a/overlapped.c b/overlapped.c index 9b4cca3..1a1ee5d 100644 --- a/overlapped.c +++ b/overlapped.c @@ -64,12 +64,6 @@ typedef struct { }; } OverlappedObject; -typedef struct { - OVERLAPPED *Overlapped; - HANDLE IocpHandle; - char Address[1]; -} WaitNamedPipeAndConnectContext; - /* * Map Windows error codes to subclasses of OSError */ @@ -756,7 +750,7 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args) switch (err) { case ERROR_BROKEN_PIPE: mark_as_completed(&self->overlapped); - Py_RETURN_NONE; + return SetFromWindowsErr(err); case ERROR_SUCCESS: case ERROR_MORE_DATA: case ERROR_IO_PENDING: @@ -815,7 +809,7 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args) switch (err) { case ERROR_BROKEN_PIPE: mark_as_completed(&self->overlapped); - Py_RETURN_NONE; + return SetFromWindowsErr(err); case ERROR_SUCCESS: case ERROR_MORE_DATA: case ERROR_IO_PENDING: @@ -1151,109 +1145,43 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args) switch (err) { case ERROR_PIPE_CONNECTED: mark_as_completed(&self->overlapped); - Py_RETURN_NONE; + Py_RETURN_TRUE; case ERROR_SUCCESS: case ERROR_IO_PENDING: - Py_RETURN_NONE; + Py_RETURN_FALSE; default: self->type = TYPE_NOT_STARTED; return SetFromWindowsErr(err); } } -/* Unfortunately there is no way to do an overlapped connect to a - pipe. We instead use WaitNamedPipe() and CreateFile() in a thread - pool thread. If a connection succeeds within a time limit (10 - seconds) then PostQueuedCompletionStatus() is used to return the - pipe handle to the completion port. */ - -static DWORD WINAPI -WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx) -{ - HANDLE PipeHandle = INVALID_HANDLE_VALUE; - DWORD Start = GetTickCount(); - DWORD Deadline = Start + 10*1000; - DWORD Error = 0; - DWORD Timeout; - BOOL Success; - - for ( ; ; ) { - Timeout = Deadline - GetTickCount(); - if ((int)Timeout < 0) - break; - Success = WaitNamedPipe(ctx->Address, Timeout); - Error = Success ? ERROR_SUCCESS : GetLastError(); - switch (Error) { - case ERROR_SUCCESS: - PipeHandle = CreateFile(ctx->Address, - GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, NULL); - if (PipeHandle == INVALID_HANDLE_VALUE) - continue; - break; - case ERROR_SEM_TIMEOUT: - continue; - } - break; - } - if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error, - (ULONG_PTR)PipeHandle, ctx->Overlapped)) - CloseHandle(PipeHandle); - free(ctx); - return 0; -} - PyDoc_STRVAR( - Overlapped_WaitNamedPipeAndConnect_doc, - "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n" - "Start overlapped connection to address, notifying iocp_handle when\n" - "finished"); + ConnectPipe_doc, + "ConnectPipe(addr) -> pipe_handle\n\n" + "Connect to the pipe for asynchronous I/O (overlapped)."); static PyObject * -Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args) +ConnectPipe(OverlappedObject *self, PyObject *args) { - char *Address; - Py_ssize_t AddressLength; - HANDLE IocpHandle; - OVERLAPPED Overlapped; - BOOL ret; - DWORD err; - WaitNamedPipeAndConnectContext *ctx; - Py_ssize_t ContextLength; + PyObject *AddressObj; + wchar_t *Address; + HANDLE PipeHandle; - if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER, - &Address, &AddressLength, &IocpHandle, &Overlapped)) + if (!PyArg_ParseTuple(args, "U", &AddressObj)) return NULL; - if (self->type != TYPE_NONE) { - PyErr_SetString(PyExc_ValueError, "operation already attempted"); + Address = PyUnicode_AsWideCharString(AddressObj, NULL); + if (Address == NULL) return NULL; - } - - ContextLength = (AddressLength + - offsetof(WaitNamedPipeAndConnectContext, Address)); - ctx = calloc(1, ContextLength + 1); - if (ctx == NULL) - return PyErr_NoMemory(); - memcpy(ctx->Address, Address, AddressLength + 1); - ctx->Overlapped = &self->overlapped; - ctx->IocpHandle = IocpHandle; - - self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT; - self->handle = NULL; - - Py_BEGIN_ALLOW_THREADS - ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx, - WT_EXECUTELONGFUNCTION); - Py_END_ALLOW_THREADS - - mark_as_completed(&self->overlapped); - self->error = err = ret ? ERROR_SUCCESS : GetLastError(); - if (!ret) - return SetFromWindowsErr(err); - Py_RETURN_NONE; + PipeHandle = CreateFileW(Address, + GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, NULL); + PyMem_Free(Address); + if (PipeHandle == INVALID_HANDLE_VALUE) + return SetFromWindowsErr(0); + return Py_BuildValue(F_HANDLE, PipeHandle); } static PyObject* @@ -1290,9 +1218,6 @@ static PyMethodDef Overlapped_methods[] = { METH_VARARGS, Overlapped_DisconnectEx_doc}, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, - {"WaitNamedPipeAndConnect", - (PyCFunction) Overlapped_WaitNamedPipeAndConnect, - METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc}, {NULL} }; @@ -1378,6 +1303,9 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, SetEvent_doc}, {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, + {"ConnectPipe", + (PyCFunction) ConnectPipe, + METH_VARARGS, ConnectPipe_doc}, {NULL} }; @@ -1428,6 +1356,7 @@ _init_overlapped(void) WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED); WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); + WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WINAPI_CONSTANT(F_DWORD, INFINITE); WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); WINAPI_CONSTANT(F_HANDLE, NULL); @@ -398,7 +398,8 @@ class Release(object): if command: print("Invalid command: %s" % command) else: - parser.print_usage() + parser.print_help() + print("") print("Available commands:") print("- build: build asyncio in place, imply --running") diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py index 39e5149..9a068a0 100644 --- a/tests/test_windows_events.py +++ b/tests/test_windows_events.py @@ -12,6 +12,7 @@ from trollius import _overlapped from trollius import py33_winapi as _winapi from trollius import windows_events from trollius.py33_exceptions import PermissionError, FileNotFoundError +from trollius.test_utils import mock class UpperProto(asyncio.Protocol): @@ -92,6 +93,18 @@ class ProactorTests(test_utils.TestCase): raise Return('done') + def test_connect_pipe_cancel(self): + exc = OSError() + exc.winerror = _overlapped.ERROR_PIPE_BUSY + with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect: + coro = self.loop._proactor.connect_pipe('pipe_address') + task = self.loop.create_task(coro) + + # check that it's possible to cancel connect_pipe() + task.cancel() + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(task) + def test_wait_for_handle(self): event = _overlapped.CreateEvent(None, True, False, None) self.addCleanup(_winapi.CloseHandle, event) diff --git a/trollius/base_events.py b/trollius/base_events.py index f0b5430..e5e5b61 100644 --- a/trollius/base_events.py +++ b/trollius/base_events.py @@ -188,6 +188,7 @@ class BaseEventLoop(events.AbstractEventLoop): # In debug mode, if the execution of a callback or a step of a task # exceed this duration in seconds, the slow callback/task is logged. self.slow_callback_duration = 0.1 + self._current_handle = None def __repr__(self): return ('<%s running=%s closed=%s debug=%s>' @@ -732,7 +733,13 @@ class BaseEventLoop(events.AbstractEventLoop): logger.debug("Datagram endpoint remote_addr=%r created: " "(%r, %r)", remote_addr, transport, protocol) - yield From(waiter) + + try: + yield From(waiter) + except: + transport.close() + raise + raise Return(transport, protocol) @coroutine @@ -824,7 +831,13 @@ class BaseEventLoop(events.AbstractEventLoop): protocol = protocol_factory() waiter = futures.Future(loop=self) transport = self._make_read_pipe_transport(pipe, protocol, waiter) - yield From(waiter) + + try: + yield From(waiter) + except: + transport.close() + raise + if self._debug: logger.debug('Read pipe %r connected: (%r, %r)', pipe.fileno(), transport, protocol) @@ -835,7 +848,13 @@ class BaseEventLoop(events.AbstractEventLoop): protocol = protocol_factory() waiter = futures.Future(loop=self) transport = self._make_write_pipe_transport(pipe, protocol, waiter) - yield From(waiter) + + try: + yield From(waiter) + except: + transport.close() + raise + if self._debug: logger.debug('Write pipe %r connected: (%r, %r)', pipe.fileno(), transport, protocol) @@ -957,6 +976,10 @@ class BaseEventLoop(events.AbstractEventLoop): else: exc_info = False + if (self._current_handle is not None + and self._current_handle._source_traceback): + context['handle_traceback'] = self._current_handle._source_traceback + log_lines = [message] for key in sorted(context): if key in ('message', 'exception'): @@ -966,6 +989,10 @@ class BaseEventLoop(events.AbstractEventLoop): tb = ''.join(traceback.format_list(value)) value = 'Object created at (most recent call last):\n' value += tb.rstrip() + elif key == 'handle_traceback': + tb = ''.join(traceback.format_list(value)) + value = 'Handle created at (most recent call last):\n' + value += tb.rstrip() else: value = repr(value) log_lines.append('{0}: {1}'.format(key, value)) @@ -1123,12 +1150,16 @@ class BaseEventLoop(events.AbstractEventLoop): if handle._cancelled: continue if self._debug: - t0 = self.time() - handle._run() - dt = self.time() - t0 - if dt >= self.slow_callback_duration: - logger.warning('Executing %s took %.3f seconds', - _format_handle(handle), dt) + try: + self._current_handle = handle + t0 = self.time() + handle._run() + dt = self.time() - t0 + if dt >= self.slow_callback_duration: + logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt) + finally: + self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs. diff --git a/trollius/windows_events.py b/trollius/windows_events.py index 1592610..b0c076a 100644 --- a/trollius/windows_events.py +++ b/trollius/windows_events.py @@ -17,7 +17,8 @@ from . import windows_utils from . import _overlapped from .coroutines import coroutine, From, Return from .log import logger -from .py33_exceptions import wrap_error, get_error_class, ConnectionRefusedError +from .py33_exceptions import (wrap_error, get_error_class, + ConnectionRefusedError, BrokenPipeError) __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', @@ -30,6 +31,13 @@ INFINITE = 0xffffffff ERROR_CONNECTION_REFUSED = 1225 ERROR_CONNECTION_ABORTED = 1236 +# Initial delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_INIT_DELAY = 0.001 + +# Maximum delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_MAX_DELAY = 0.100 + + class _OverlappedFuture(futures.Future): """Subclass of Future which represents an overlapped operation. @@ -120,14 +128,12 @@ class _BaseWaitHandleFuture(futures.Future): return self._registered = False + wait_handle = self._wait_handle + self._wait_handle = None try: - _overlapped.UnregisterWait(self._wait_handle) + _overlapped.UnregisterWait(wait_handle) except WindowsError as exc: - self._wait_handle = None - if exc.winerror == _overlapped.ERROR_IO_PENDING: - # ERROR_IO_PENDING is not an error, the wait was unregistered - self._unregister_wait_cb(None) - elif exc.winerror != _overlapped.ERROR_IO_PENDING: + if exc.winerror != _overlapped.ERROR_IO_PENDING: context = { 'message': 'Failed to unregister the wait handle', 'exception': exc, @@ -136,9 +142,10 @@ class _BaseWaitHandleFuture(futures.Future): if self._source_traceback: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - else: - self._wait_handle = None - self._unregister_wait_cb(None) + return + # ERROR_IO_PENDING means that the unregister is pending + + self._unregister_wait_cb(None) def cancel(self): self._unregister_wait() @@ -205,14 +212,12 @@ class _WaitHandleFuture(_BaseWaitHandleFuture): return self._registered = False + wait_handle = self._wait_handle + self._wait_handle = None try: - _overlapped.UnregisterWaitEx(self._wait_handle, self._event) + _overlapped.UnregisterWaitEx(wait_handle, self._event) except OSError as exc: - self._wait_handle = None - if exc.winerror == _overlapped.ERROR_IO_PENDING: - # ERROR_IO_PENDING is not an error, the wait was unregistered - self._unregister_wait_cb(None) - elif exc.winerror != _overlapped.ERROR_IO_PENDING: + if exc.winerror != _overlapped.ERROR_IO_PENDING: context = { 'message': 'Failed to unregister the wait handle', 'exception': exc, @@ -221,11 +226,11 @@ class _WaitHandleFuture(_BaseWaitHandleFuture): if self._source_traceback: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - else: - self._wait_handle = None - self._event_fut = self._proactor._wait_cancel( - self._event, - self._unregister_wait_cb) + return + # ERROR_IO_PENDING is not an error, the wait was unregistered + + self._event_fut = self._proactor._wait_cancel(self._event, + self._unregister_wait_cb) class PipeServer(object): @@ -253,7 +258,7 @@ class PipeServer(object): def _server_pipe_handle(self, first): # Return a wrapper for a new pipe handle. - if self._address is None: + if self.closed(): return None flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED if first: @@ -269,6 +274,9 @@ class PipeServer(object): self._free_instances.add(pipe) return pipe + def closed(self): + return (self._address is None) + def close(self): if self._accept_pipe_future is not None: self._accept_pipe_future.cancel() @@ -321,12 +329,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): if f: pipe = f.result() server._free_instances.discard(pipe) + + if server.closed(): + # A client connected before the server was closed: + # drop the client (close the pipe) and exit + pipe.close() + return + protocol = protocol_factory() self._make_duplex_pipe_transport( pipe, protocol, extra={'addr': address}) + pipe = server._get_unconnected_pipe() if pipe is None: return + f = self._proactor.accept_pipe(pipe) except OSError as exc: if pipe and pipe.fileno() != -1: @@ -393,13 +410,21 @@ class IocpProactor(object): self._results = [] return tmp + def _result(self, value): + fut = futures.Future(loop=self._loop) + fut.set_result(value) + return fut + def recv(self, conn, nbytes, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) - if isinstance(conn, socket.socket): - wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags) - else: - wrap_error(ov.ReadFile, conn.fileno(), nbytes) + try: + if isinstance(conn, socket.socket): + wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags) + else: + wrap_error(ov.ReadFile, conn.fileno(), nbytes) + except BrokenPipeError: + return self._result(b'') def finish_recv(trans, key, ov): return wrap_error(ov.getresult) @@ -474,40 +499,39 @@ class IocpProactor(object): def accept_pipe(self, pipe): self._register_with_iocp(pipe) ov = _overlapped.Overlapped(NULL) - ov.ConnectNamedPipe(pipe.fileno()) + connected = ov.ConnectNamedPipe(pipe.fileno()) + + if connected: + # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means + # that the pipe is connected. There is no need to wait for the + # completion of the connection. + return self._result(pipe) def finish_accept_pipe(trans, key, ov): wrap_error(ov.getresult) return pipe - # FIXME: Tulip issue 196: why do we need register=False? - # See also the comment in the _register() method - return self._register(ov, pipe, finish_accept_pipe, - register=False) + return self._register(ov, pipe, finish_accept_pipe) + @coroutine def connect_pipe(self, address): - ov = _overlapped.Overlapped(NULL) - ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address) - - def finish_connect_pipe(err, handle, ov): - # err, handle were arguments passed to PostQueuedCompletionStatus() - # in a function run in a thread pool. - if err == _overlapped.ERROR_SEM_TIMEOUT: - # Connection did not succeed within time limit. - msg = _overlapped.FormatMessage(err) - raise ConnectionRefusedError(0, msg, None, err) - elif err != 0: - msg = _overlapped.FormatMessage(err) - err_cls = get_error_class(err, None) - if err_cls is not None: - raise err_cls(0, msg, None, err) - else: - raise WindowsError(err, msg) - else: - return windows_utils.PipeHandle(handle) + delay = CONNECT_PIPE_INIT_DELAY + while True: + # Unfortunately there is no way to do an overlapped connect to a pipe. + # Call CreateFile() in a loop until it doesn't fail with + # ERROR_PIPE_BUSY + try: + handle = _overlapped.ConnectPipe(address) + break + except OSError as exc: + if exc.winerror != _overlapped.ERROR_PIPE_BUSY: + raise + + # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later + delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) + yield From(tasks.sleep(delay, loop=self._loop)) - return self._register(ov, None, finish_connect_pipe, - wait_for_post=True) + raise Return(windows_utils.PipeHandle(handle)) def wait_for_handle(self, handle, timeout=None): """Wait for a handle. @@ -566,15 +590,14 @@ class IocpProactor(object): # to avoid sending notifications to completion port of ops # that succeed immediately. - def _register(self, ov, obj, callback, - wait_for_post=False, register=True): + def _register(self, ov, obj, callback): # Return a future which will be set with the result of the # operation when it completes. The future's value is actually # the value returned by callback(). f = _OverlappedFuture(ov, loop=self._loop) if f._source_traceback: del f._source_traceback[-1] - if not ov.pending and not wait_for_post: + if not ov.pending: # The operation has completed, so no need to postpone the # work. We cannot take this short cut if we need the # NumberOfBytes, CompletionKey values returned by @@ -590,18 +613,11 @@ class IocpProactor(object): # Register the overlapped operation to keep a reference to the # OVERLAPPED object, otherwise the memory is freed and Windows may # read uninitialized memory. - # - # For an unknown reason, ConnectNamedPipe() behaves differently: - # the completion is not notified by GetOverlappedResult() if we - # already called GetOverlappedResult(). For this specific case, we - # don't expect notification (register is set to False). - else: - register = True - if register: - # Register the overlapped operation for later. Note that - # we only store obj to prevent it from being garbage - # collected too early. - self._cache[ov.address] = (f, ov, obj, callback) + + # Register the overlapped operation for later. Note that + # we only store obj to prevent it from being garbage + # collected too early. + self._cache[ov.address] = (f, ov, obj, callback) return f def _unregister(self, ov): @@ -682,14 +698,9 @@ class IocpProactor(object): def close(self): # Cancel remaining registered operations. for address, (fut, ov, obj, callback) in list(self._cache.items()): - if obj is None: - # The operation was started with connect_pipe() which - # queues a task to Windows' thread pool. This cannot - # be cancelled, so just forget it. - del self._cache[address] - # FIXME: Tulip issue 196: remove this case, it should not happen - elif fut.done() and not fut.cancelled(): - del self._cache[address] + if fut.cancelled(): + # Nothing to do with cancelled futures + pass elif isinstance(fut, _WaitCancelFuture): # _WaitCancelFuture must not be cancelled pass diff --git a/trollius/windows_utils.py b/trollius/windows_utils.py index 8b0e439..7a103c6 100644 --- a/trollius/windows_utils.py +++ b/trollius/windows_utils.py @@ -155,6 +155,8 @@ class PipeHandle(object): return self._handle def fileno(self): + if self._handle is None: + raise ValueError("I/O operatioon on closed pipe") return self._handle def close(self, CloseHandle=_winapi.CloseHandle): |