summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2015-01-26 22:26:54 +0100
committerVictor Stinner <victor.stinner@gmail.com>2015-01-26 22:26:54 +0100
commit43edf148f8eeb12f516289cb36decc0702cbfd81 (patch)
treeab833211defb5dbe2bf105a8df21bee472597ef5
parentc05854eb73b54b352d4e3b2fb86281bc2b60ac2c (diff)
parent31fa887dbd1e6c89728b0ed99f7974266852d3a8 (diff)
downloadtrollius-43edf148f8eeb12f516289cb36decc0702cbfd81.tar.gz
Merge Tulip into Trollius
-rw-r--r--examples/simple_tcp_server.py4
-rw-r--r--overlapped.c123
-rwxr-xr-xrelease.py3
-rw-r--r--tests/test_windows_events.py13
-rw-r--r--trollius/base_events.py49
-rw-r--r--trollius/windows_events.py161
-rw-r--r--trollius/windows_utils.py2
7 files changed, 171 insertions, 184 deletions
diff --git a/examples/simple_tcp_server.py b/examples/simple_tcp_server.py
index 882938e..247f6e6 100644
--- a/examples/simple_tcp_server.py
+++ b/examples/simple_tcp_server.py
@@ -4,7 +4,7 @@ style and uses asyncio.streams.start_server() and
asyncio.streams.open_connection().
Note that running this example starts both the TCP server and client
-in the same process. It listens on port 1234 on 127.0.0.1, so it will
+in the same process. It listens on port 12345 on 127.0.0.1, so it will
fail if this port is currently in use.
"""
@@ -88,7 +88,7 @@ class MyServer:
def start(self, loop):
"""
- Starts the TCP server, so that it listens on port 1234.
+ Starts the TCP server, so that it listens on port 12345.
For each client that connects, the accept_client method gets
called. This method runs the loop until the server sockets
diff --git a/overlapped.c b/overlapped.c
index 9b4cca3..1a1ee5d 100644
--- a/overlapped.c
+++ b/overlapped.c
@@ -64,12 +64,6 @@ typedef struct {
};
} OverlappedObject;
-typedef struct {
- OVERLAPPED *Overlapped;
- HANDLE IocpHandle;
- char Address[1];
-} WaitNamedPipeAndConnectContext;
-
/*
* Map Windows error codes to subclasses of OSError
*/
@@ -756,7 +750,7 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
switch (err) {
case ERROR_BROKEN_PIPE:
mark_as_completed(&self->overlapped);
- Py_RETURN_NONE;
+ return SetFromWindowsErr(err);
case ERROR_SUCCESS:
case ERROR_MORE_DATA:
case ERROR_IO_PENDING:
@@ -815,7 +809,7 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
switch (err) {
case ERROR_BROKEN_PIPE:
mark_as_completed(&self->overlapped);
- Py_RETURN_NONE;
+ return SetFromWindowsErr(err);
case ERROR_SUCCESS:
case ERROR_MORE_DATA:
case ERROR_IO_PENDING:
@@ -1151,109 +1145,43 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args)
switch (err) {
case ERROR_PIPE_CONNECTED:
mark_as_completed(&self->overlapped);
- Py_RETURN_NONE;
+ Py_RETURN_TRUE;
case ERROR_SUCCESS:
case ERROR_IO_PENDING:
- Py_RETURN_NONE;
+ Py_RETURN_FALSE;
default:
self->type = TYPE_NOT_STARTED;
return SetFromWindowsErr(err);
}
}
-/* Unfortunately there is no way to do an overlapped connect to a
- pipe. We instead use WaitNamedPipe() and CreateFile() in a thread
- pool thread. If a connection succeeds within a time limit (10
- seconds) then PostQueuedCompletionStatus() is used to return the
- pipe handle to the completion port. */
-
-static DWORD WINAPI
-WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx)
-{
- HANDLE PipeHandle = INVALID_HANDLE_VALUE;
- DWORD Start = GetTickCount();
- DWORD Deadline = Start + 10*1000;
- DWORD Error = 0;
- DWORD Timeout;
- BOOL Success;
-
- for ( ; ; ) {
- Timeout = Deadline - GetTickCount();
- if ((int)Timeout < 0)
- break;
- Success = WaitNamedPipe(ctx->Address, Timeout);
- Error = Success ? ERROR_SUCCESS : GetLastError();
- switch (Error) {
- case ERROR_SUCCESS:
- PipeHandle = CreateFile(ctx->Address,
- GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED, NULL);
- if (PipeHandle == INVALID_HANDLE_VALUE)
- continue;
- break;
- case ERROR_SEM_TIMEOUT:
- continue;
- }
- break;
- }
- if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error,
- (ULONG_PTR)PipeHandle, ctx->Overlapped))
- CloseHandle(PipeHandle);
- free(ctx);
- return 0;
-}
-
PyDoc_STRVAR(
- Overlapped_WaitNamedPipeAndConnect_doc,
- "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n"
- "Start overlapped connection to address, notifying iocp_handle when\n"
- "finished");
+ ConnectPipe_doc,
+ "ConnectPipe(addr) -> pipe_handle\n\n"
+ "Connect to the pipe for asynchronous I/O (overlapped).");
static PyObject *
-Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args)
+ConnectPipe(OverlappedObject *self, PyObject *args)
{
- char *Address;
- Py_ssize_t AddressLength;
- HANDLE IocpHandle;
- OVERLAPPED Overlapped;
- BOOL ret;
- DWORD err;
- WaitNamedPipeAndConnectContext *ctx;
- Py_ssize_t ContextLength;
+ PyObject *AddressObj;
+ wchar_t *Address;
+ HANDLE PipeHandle;
- if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER,
- &Address, &AddressLength, &IocpHandle, &Overlapped))
+ if (!PyArg_ParseTuple(args, "U", &AddressObj))
return NULL;
- if (self->type != TYPE_NONE) {
- PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ Address = PyUnicode_AsWideCharString(AddressObj, NULL);
+ if (Address == NULL)
return NULL;
- }
-
- ContextLength = (AddressLength +
- offsetof(WaitNamedPipeAndConnectContext, Address));
- ctx = calloc(1, ContextLength + 1);
- if (ctx == NULL)
- return PyErr_NoMemory();
- memcpy(ctx->Address, Address, AddressLength + 1);
- ctx->Overlapped = &self->overlapped;
- ctx->IocpHandle = IocpHandle;
-
- self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT;
- self->handle = NULL;
-
- Py_BEGIN_ALLOW_THREADS
- ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx,
- WT_EXECUTELONGFUNCTION);
- Py_END_ALLOW_THREADS
-
- mark_as_completed(&self->overlapped);
- self->error = err = ret ? ERROR_SUCCESS : GetLastError();
- if (!ret)
- return SetFromWindowsErr(err);
- Py_RETURN_NONE;
+ PipeHandle = CreateFileW(Address,
+ GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED, NULL);
+ PyMem_Free(Address);
+ if (PipeHandle == INVALID_HANDLE_VALUE)
+ return SetFromWindowsErr(0);
+ return Py_BuildValue(F_HANDLE, PipeHandle);
}
static PyObject*
@@ -1290,9 +1218,6 @@ static PyMethodDef Overlapped_methods[] = {
METH_VARARGS, Overlapped_DisconnectEx_doc},
{"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
- {"WaitNamedPipeAndConnect",
- (PyCFunction) Overlapped_WaitNamedPipeAndConnect,
- METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc},
{NULL}
};
@@ -1378,6 +1303,9 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, SetEvent_doc},
{"ResetEvent", overlapped_ResetEvent,
METH_VARARGS, ResetEvent_doc},
+ {"ConnectPipe",
+ (PyCFunction) ConnectPipe,
+ METH_VARARGS, ConnectPipe_doc},
{NULL}
};
@@ -1428,6 +1356,7 @@ _init_overlapped(void)
WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING);
WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+ WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WINAPI_CONSTANT(F_DWORD, INFINITE);
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
WINAPI_CONSTANT(F_HANDLE, NULL);
diff --git a/release.py b/release.py
index 9247942..f4fd541 100755
--- a/release.py
+++ b/release.py
@@ -398,7 +398,8 @@ class Release(object):
if command:
print("Invalid command: %s" % command)
else:
- parser.print_usage()
+ parser.print_help()
+ print("")
print("Available commands:")
print("- build: build asyncio in place, imply --running")
diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py
index 39e5149..9a068a0 100644
--- a/tests/test_windows_events.py
+++ b/tests/test_windows_events.py
@@ -12,6 +12,7 @@ from trollius import _overlapped
from trollius import py33_winapi as _winapi
from trollius import windows_events
from trollius.py33_exceptions import PermissionError, FileNotFoundError
+from trollius.test_utils import mock
class UpperProto(asyncio.Protocol):
@@ -92,6 +93,18 @@ class ProactorTests(test_utils.TestCase):
raise Return('done')
+ def test_connect_pipe_cancel(self):
+ exc = OSError()
+ exc.winerror = _overlapped.ERROR_PIPE_BUSY
+ with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
+ coro = self.loop._proactor.connect_pipe('pipe_address')
+ task = self.loop.create_task(coro)
+
+ # check that it's possible to cancel connect_pipe()
+ task.cancel()
+ with self.assertRaises(asyncio.CancelledError):
+ self.loop.run_until_complete(task)
+
def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)
diff --git a/trollius/base_events.py b/trollius/base_events.py
index f0b5430..e5e5b61 100644
--- a/trollius/base_events.py
+++ b/trollius/base_events.py
@@ -188,6 +188,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1
+ self._current_handle = None
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
@@ -732,7 +733,13 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.debug("Datagram endpoint remote_addr=%r created: "
"(%r, %r)",
remote_addr, transport, protocol)
- yield From(waiter)
+
+ try:
+ yield From(waiter)
+ except:
+ transport.close()
+ raise
+
raise Return(transport, protocol)
@coroutine
@@ -824,7 +831,13 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol = protocol_factory()
waiter = futures.Future(loop=self)
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
- yield From(waiter)
+
+ try:
+ yield From(waiter)
+ except:
+ transport.close()
+ raise
+
if self._debug:
logger.debug('Read pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
@@ -835,7 +848,13 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol = protocol_factory()
waiter = futures.Future(loop=self)
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
- yield From(waiter)
+
+ try:
+ yield From(waiter)
+ except:
+ transport.close()
+ raise
+
if self._debug:
logger.debug('Write pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol)
@@ -957,6 +976,10 @@ class BaseEventLoop(events.AbstractEventLoop):
else:
exc_info = False
+ if (self._current_handle is not None
+ and self._current_handle._source_traceback):
+ context['handle_traceback'] = self._current_handle._source_traceback
+
log_lines = [message]
for key in sorted(context):
if key in ('message', 'exception'):
@@ -966,6 +989,10 @@ class BaseEventLoop(events.AbstractEventLoop):
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
+ elif key == 'handle_traceback':
+ tb = ''.join(traceback.format_list(value))
+ value = 'Handle created at (most recent call last):\n'
+ value += tb.rstrip()
else:
value = repr(value)
log_lines.append('{0}: {1}'.format(key, value))
@@ -1123,12 +1150,16 @@ class BaseEventLoop(events.AbstractEventLoop):
if handle._cancelled:
continue
if self._debug:
- t0 = self.time()
- handle._run()
- dt = self.time() - t0
- if dt >= self.slow_callback_duration:
- logger.warning('Executing %s took %.3f seconds',
- _format_handle(handle), dt)
+ try:
+ self._current_handle = handle
+ t0 = self.time()
+ handle._run()
+ dt = self.time() - t0
+ if dt >= self.slow_callback_duration:
+ logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt)
+ finally:
+ self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
diff --git a/trollius/windows_events.py b/trollius/windows_events.py
index 1592610..b0c076a 100644
--- a/trollius/windows_events.py
+++ b/trollius/windows_events.py
@@ -17,7 +17,8 @@ from . import windows_utils
from . import _overlapped
from .coroutines import coroutine, From, Return
from .log import logger
-from .py33_exceptions import wrap_error, get_error_class, ConnectionRefusedError
+from .py33_exceptions import (wrap_error, get_error_class,
+ ConnectionRefusedError, BrokenPipeError)
__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
@@ -30,6 +31,13 @@ INFINITE = 0xffffffff
ERROR_CONNECTION_REFUSED = 1225
ERROR_CONNECTION_ABORTED = 1236
+# Initial delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_INIT_DELAY = 0.001
+
+# Maximum delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_MAX_DELAY = 0.100
+
+
class _OverlappedFuture(futures.Future):
"""Subclass of Future which represents an overlapped operation.
@@ -120,14 +128,12 @@ class _BaseWaitHandleFuture(futures.Future):
return
self._registered = False
+ wait_handle = self._wait_handle
+ self._wait_handle = None
try:
- _overlapped.UnregisterWait(self._wait_handle)
+ _overlapped.UnregisterWait(wait_handle)
except WindowsError as exc:
- self._wait_handle = None
- if exc.winerror == _overlapped.ERROR_IO_PENDING:
- # ERROR_IO_PENDING is not an error, the wait was unregistered
- self._unregister_wait_cb(None)
- elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
@@ -136,9 +142,10 @@ class _BaseWaitHandleFuture(futures.Future):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- else:
- self._wait_handle = None
- self._unregister_wait_cb(None)
+ return
+ # ERROR_IO_PENDING means that the unregister is pending
+
+ self._unregister_wait_cb(None)
def cancel(self):
self._unregister_wait()
@@ -205,14 +212,12 @@ class _WaitHandleFuture(_BaseWaitHandleFuture):
return
self._registered = False
+ wait_handle = self._wait_handle
+ self._wait_handle = None
try:
- _overlapped.UnregisterWaitEx(self._wait_handle, self._event)
+ _overlapped.UnregisterWaitEx(wait_handle, self._event)
except OSError as exc:
- self._wait_handle = None
- if exc.winerror == _overlapped.ERROR_IO_PENDING:
- # ERROR_IO_PENDING is not an error, the wait was unregistered
- self._unregister_wait_cb(None)
- elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
@@ -221,11 +226,11 @@ class _WaitHandleFuture(_BaseWaitHandleFuture):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- else:
- self._wait_handle = None
- self._event_fut = self._proactor._wait_cancel(
- self._event,
- self._unregister_wait_cb)
+ return
+ # ERROR_IO_PENDING is not an error, the wait was unregistered
+
+ self._event_fut = self._proactor._wait_cancel(self._event,
+ self._unregister_wait_cb)
class PipeServer(object):
@@ -253,7 +258,7 @@ class PipeServer(object):
def _server_pipe_handle(self, first):
# Return a wrapper for a new pipe handle.
- if self._address is None:
+ if self.closed():
return None
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
if first:
@@ -269,6 +274,9 @@ class PipeServer(object):
self._free_instances.add(pipe)
return pipe
+ def closed(self):
+ return (self._address is None)
+
def close(self):
if self._accept_pipe_future is not None:
self._accept_pipe_future.cancel()
@@ -321,12 +329,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if f:
pipe = f.result()
server._free_instances.discard(pipe)
+
+ if server.closed():
+ # A client connected before the server was closed:
+ # drop the client (close the pipe) and exit
+ pipe.close()
+ return
+
protocol = protocol_factory()
self._make_duplex_pipe_transport(
pipe, protocol, extra={'addr': address})
+
pipe = server._get_unconnected_pipe()
if pipe is None:
return
+
f = self._proactor.accept_pipe(pipe)
except OSError as exc:
if pipe and pipe.fileno() != -1:
@@ -393,13 +410,21 @@ class IocpProactor(object):
self._results = []
return tmp
+ def _result(self, value):
+ fut = futures.Future(loop=self._loop)
+ fut.set_result(value)
+ return fut
+
def recv(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
- if isinstance(conn, socket.socket):
- wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
- else:
- wrap_error(ov.ReadFile, conn.fileno(), nbytes)
+ try:
+ if isinstance(conn, socket.socket):
+ wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
+ else:
+ wrap_error(ov.ReadFile, conn.fileno(), nbytes)
+ except BrokenPipeError:
+ return self._result(b'')
def finish_recv(trans, key, ov):
return wrap_error(ov.getresult)
@@ -474,40 +499,39 @@ class IocpProactor(object):
def accept_pipe(self, pipe):
self._register_with_iocp(pipe)
ov = _overlapped.Overlapped(NULL)
- ov.ConnectNamedPipe(pipe.fileno())
+ connected = ov.ConnectNamedPipe(pipe.fileno())
+
+ if connected:
+ # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
+ # that the pipe is connected. There is no need to wait for the
+ # completion of the connection.
+ return self._result(pipe)
def finish_accept_pipe(trans, key, ov):
wrap_error(ov.getresult)
return pipe
- # FIXME: Tulip issue 196: why do we need register=False?
- # See also the comment in the _register() method
- return self._register(ov, pipe, finish_accept_pipe,
- register=False)
+ return self._register(ov, pipe, finish_accept_pipe)
+ @coroutine
def connect_pipe(self, address):
- ov = _overlapped.Overlapped(NULL)
- ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
-
- def finish_connect_pipe(err, handle, ov):
- # err, handle were arguments passed to PostQueuedCompletionStatus()
- # in a function run in a thread pool.
- if err == _overlapped.ERROR_SEM_TIMEOUT:
- # Connection did not succeed within time limit.
- msg = _overlapped.FormatMessage(err)
- raise ConnectionRefusedError(0, msg, None, err)
- elif err != 0:
- msg = _overlapped.FormatMessage(err)
- err_cls = get_error_class(err, None)
- if err_cls is not None:
- raise err_cls(0, msg, None, err)
- else:
- raise WindowsError(err, msg)
- else:
- return windows_utils.PipeHandle(handle)
+ delay = CONNECT_PIPE_INIT_DELAY
+ while True:
+ # Unfortunately there is no way to do an overlapped connect to a pipe.
+ # Call CreateFile() in a loop until it doesn't fail with
+ # ERROR_PIPE_BUSY
+ try:
+ handle = _overlapped.ConnectPipe(address)
+ break
+ except OSError as exc:
+ if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
+ raise
+
+ # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
+ delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+ yield From(tasks.sleep(delay, loop=self._loop))
- return self._register(ov, None, finish_connect_pipe,
- wait_for_post=True)
+ raise Return(windows_utils.PipeHandle(handle))
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.
@@ -566,15 +590,14 @@ class IocpProactor(object):
# to avoid sending notifications to completion port of ops
# that succeed immediately.
- def _register(self, ov, obj, callback,
- wait_for_post=False, register=True):
+ def _register(self, ov, obj, callback):
# Return a future which will be set with the result of the
# operation when it completes. The future's value is actually
# the value returned by callback().
f = _OverlappedFuture(ov, loop=self._loop)
if f._source_traceback:
del f._source_traceback[-1]
- if not ov.pending and not wait_for_post:
+ if not ov.pending:
# The operation has completed, so no need to postpone the
# work. We cannot take this short cut if we need the
# NumberOfBytes, CompletionKey values returned by
@@ -590,18 +613,11 @@ class IocpProactor(object):
# Register the overlapped operation to keep a reference to the
# OVERLAPPED object, otherwise the memory is freed and Windows may
# read uninitialized memory.
- #
- # For an unknown reason, ConnectNamedPipe() behaves differently:
- # the completion is not notified by GetOverlappedResult() if we
- # already called GetOverlappedResult(). For this specific case, we
- # don't expect notification (register is set to False).
- else:
- register = True
- if register:
- # Register the overlapped operation for later. Note that
- # we only store obj to prevent it from being garbage
- # collected too early.
- self._cache[ov.address] = (f, ov, obj, callback)
+
+ # Register the overlapped operation for later. Note that
+ # we only store obj to prevent it from being garbage
+ # collected too early.
+ self._cache[ov.address] = (f, ov, obj, callback)
return f
def _unregister(self, ov):
@@ -682,14 +698,9 @@ class IocpProactor(object):
def close(self):
# Cancel remaining registered operations.
for address, (fut, ov, obj, callback) in list(self._cache.items()):
- if obj is None:
- # The operation was started with connect_pipe() which
- # queues a task to Windows' thread pool. This cannot
- # be cancelled, so just forget it.
- del self._cache[address]
- # FIXME: Tulip issue 196: remove this case, it should not happen
- elif fut.done() and not fut.cancelled():
- del self._cache[address]
+ if fut.cancelled():
+ # Nothing to do with cancelled futures
+ pass
elif isinstance(fut, _WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
diff --git a/trollius/windows_utils.py b/trollius/windows_utils.py
index 8b0e439..7a103c6 100644
--- a/trollius/windows_utils.py
+++ b/trollius/windows_utils.py
@@ -155,6 +155,8 @@ class PipeHandle(object):
return self._handle
def fileno(self):
+ if self._handle is None:
+ raise ValueError("I/O operatioon on closed pipe")
return self._handle
def close(self, CloseHandle=_winapi.CloseHandle):