diff options
Diffstat (limited to 'tests/test_wasyncore.py')
-rw-r--r-- | tests/test_wasyncore.py | 95 |
1 files changed, 68 insertions, 27 deletions
diff --git a/tests/test_wasyncore.py b/tests/test_wasyncore.py index 9c23509..970e993 100644 --- a/tests/test_wasyncore.py +++ b/tests/test_wasyncore.py @@ -1,21 +1,21 @@ -from waitress import wasyncore as asyncore -from waitress import compat +import _thread as thread import contextlib +import errno import functools import gc -import unittest -import select +from io import BytesIO import os -import socket -import sys -import time -import errno import re +import select +import socket import struct +import sys import threading +import time +import unittest import warnings -from io import BytesIO +from waitress import compat, wasyncore as asyncore TIMEOUT = 3 HAS_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") @@ -24,6 +24,7 @@ HOSTv4 = "127.0.0.1" HOSTv6 = "::1" # Filename used for testing + if os.name == "java": # pragma: no cover # Jython disallows @ in module names TESTFN = "$test" @@ -33,7 +34,7 @@ else: TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) -class DummyLogger(object): # pragma: no cover +class DummyLogger: # pragma: no cover def __init__(self): self.messages = [] @@ -41,7 +42,7 @@ class DummyLogger(object): # pragma: no cover self.messages.append((severity, message)) -class WarningsRecorder(object): # pragma: no cover +class WarningsRecorder: # pragma: no cover """Convenience wrapper for the warnings list returned on entry to the warnings.catch_warnings() context manager. """ @@ -67,6 +68,7 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover # in order to re-raise the warnings. frame = sys._getframe(2) registry = frame.f_globals.get("__warningregistry__") + if registry: registry.clear() with warnings.catch_warnings(record=True) as w: @@ -78,19 +80,25 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover # Filter the recorded warnings reraise = list(w) missing = [] + for msg, cat in filters: seen = False + for w in reraise[:]: warning = w.message # Filter out the matching messages + if re.match(msg, str(warning), re.I) and issubclass(warning.__class__, cat): seen = True reraise.remove(w) + if not seen and not quiet: # This filter caught nothing missing.append((msg, cat.__name__)) + if reraise: raise AssertionError("unhandled warning %s" % reraise[0]) + if missing: raise AssertionError("filter (%r, %s) did not catch any warning" % missing[0]) @@ -111,11 +119,14 @@ def check_warnings(*filters, **kwargs): # pragma: no cover check_warnings(("", Warning), quiet=True) """ quiet = kwargs.get("quiet") + if not filters: filters = (("", Warning),) # Preserve backward compatibility + if quiet is None: quiet = True + return _filterwarnings(filters, quiet) @@ -130,6 +141,7 @@ def gc_collect(): # pragma: no cover objects to disappear. """ gc.collect() + if sys.platform.startswith("java"): time.sleep(0.1) gc.collect() @@ -137,7 +149,7 @@ def gc_collect(): # pragma: no cover def threading_setup(): # pragma: no cover - return (compat.thread._count(), None) + return (thread._count(), None) def threading_cleanup(*original_values): # pragma: no cover @@ -146,7 +158,8 @@ def threading_cleanup(*original_values): # pragma: no cover _MAX_COUNT = 100 for count in range(_MAX_COUNT): - values = (compat.thread._count(), None) + values = (thread._count(), None) + if values == original_values: break @@ -186,6 +199,7 @@ def join_thread(thread, timeout=30.0): # pragma: no cover after timeout seconds. """ thread.join(timeout) + if thread.is_alive(): msg = "failed to join the thread in %.1f seconds" % timeout raise AssertionError(msg) @@ -213,6 +227,7 @@ def bind_port(sock, host=HOST): # pragma: no cover "tests should never set the SO_REUSEADDR " "socket option on TCP/IP sockets!" ) + if hasattr(socket, "SO_REUSEPORT"): try: if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: @@ -225,11 +240,13 @@ def bind_port(sock, host=HOST): # pragma: no cover # thus defining SO_REUSEPORT but this process is running # under an older kernel that does not support SO_REUSEPORT. pass + if hasattr(socket, "SO_EXCLUSIVEADDRUSE"): sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) sock.bind((host, 0)) port = sock.getsockname()[1] + return port @@ -303,13 +320,16 @@ def capture_server(evt, buf, serv): # pragma no cover else: n = 200 start = time.time() + while n > 0 and time.time() - start < 3.0: r, w, e = select.select([conn], [], [], 0.1) + if r: n -= 1 data = conn.recv(10) # keep everything except for the newline terminator buf.write(data.replace(b"\n", b"")) + if b"\n" in data: break time.sleep(0.01) @@ -332,6 +352,7 @@ def bind_unix_socket(sock, addr): # pragma: no cover def bind_af_aware(sock, addr): """Helper function to bind a socket according to its family.""" + if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. unlink(addr) @@ -346,6 +367,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Perform the operation func(pathname) # Now setup the wait loop + if waitall: dirname = pathname else: @@ -358,6 +380,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Testing on an i7@4.3GHz shows that usually only 1 iteration is # required when contention occurs. timeout = 0.001 + while timeout < 1.0: # Note we are only testing for the existence of the file(s) in # the contents of the directory regardless of any security or @@ -367,6 +390,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Other Windows APIs can fail or give incorrect results when # dealing with files that are pending deletion. L = os.listdir(dirname) + if not (L if waitall else name in L): return # Increase the timeout and try again @@ -395,17 +419,20 @@ def unlink(filename): def _is_ipv6_enabled(): # pragma: no cover """Check whether IPv6 is enabled on this host.""" + if compat.HAS_IPV6: sock = None try: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.bind(("::1", 0)) + return True - except socket.error: + except OSError: pass finally: if sock: sock.close() + return False @@ -487,6 +514,7 @@ class HelperFunctionTests(unittest.TestCase): # Only the attribute modified by the routine we expect to be # called should be True. + for attr in attributes: self.assertEqual(getattr(tobj, attr), attr == expectedattr) @@ -513,6 +541,7 @@ class HelperFunctionTests(unittest.TestCase): l = [] testmap = {} + for i in range(10): c = dummychannel() l.append(c) @@ -606,6 +635,7 @@ class DispatcherTests(unittest.TestCase): def test_strerror(self): # refers to bug #8573 err = asyncore._strerror(errno.EPERM) + if hasattr(os, "strerror"): self.assertEqual(err, os.strerror(errno.EPERM)) err = asyncore._strerror(-1) @@ -656,6 +686,7 @@ class DispatcherWithSendTests(unittest.TestCase): d.send(b"\n") n = 1000 + while d.out_buffer and n > 0: # pragma: no cover asyncore.poll() n -= 1 @@ -723,6 +754,7 @@ class FileWrapperTest(unittest.TestCase): def test_resource_warning(self): # Issue #11453 got_warning = False + while got_warning is False: # we try until we get the outcome we want because this # test is not deterministic (gc_collect() may not @@ -732,7 +764,7 @@ class FileWrapperTest(unittest.TestCase): os.close(fd) try: - with check_warnings(("", compat.ResourceWarning)): + with check_warnings(("", ResourceWarning)): f = None gc_collect() except AssertionError: # pragma: no cover @@ -819,8 +851,10 @@ class BaseTestAPI: def loop_waiting_for_flag(self, instance, timeout=5): # pragma: no cover timeout = float(timeout) / 100 count = 100 + while asyncore.socket_map and count > 0: asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) + if instance.flag: return count -= 1 @@ -966,6 +1000,7 @@ class BaseTestAPI: # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: self.skipTest("Not applicable to AF_UNIX sockets.") @@ -980,7 +1015,7 @@ class BaseTestAPI: class TestHandler(BaseTestHandler): def __init__(self, conn): BaseTestHandler.__init__(self, conn) - self.socket.send(compat.tobytes(chr(244)), socket.MSG_OOB) + self.socket.send(chr(244).encode("latin-1"), socket.MSG_OOB) server = BaseServer(self.family, self.addr, TestHandler) client = TestClient(self.family, server.address) @@ -1082,6 +1117,7 @@ class BaseTestAPI: @reap_threads def test_quick_connect(self): # pragma: no cover # see: http://bugs.python.org/issue10340 + if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): self.skipTest("test specific to AF_INET and AF_INET6") @@ -1420,7 +1456,7 @@ class Test_dispatcher(unittest.TestCase): sock = dummysocket() def getpeername(): - raise socket.error(errno.EBADF) + raise OSError(errno.EBADF) map = {} sock.getpeername = getpeername @@ -1454,7 +1490,7 @@ class Test_dispatcher(unittest.TestCase): def setsockopt(*arg, **kw): sock.errored = True - raise socket.error + raise OSError sock.setsockopt = setsockopt sock.getsockopt = lambda *arg: 0 @@ -1486,7 +1522,7 @@ class Test_dispatcher(unittest.TestCase): map = {} def accept(*arg, **kw): - raise socket.error(122) + raise OSError(122) sock.accept = accept inst = self._makeOne(sock=sock, map=map) @@ -1497,7 +1533,7 @@ class Test_dispatcher(unittest.TestCase): map = {} def send(*arg, **kw): - raise socket.error(errno.EWOULDBLOCK) + raise OSError(errno.EWOULDBLOCK) sock.send = send inst = self._makeOne(sock=sock, map=map) @@ -1509,7 +1545,7 @@ class Test_dispatcher(unittest.TestCase): map = {} def send(*arg, **kw): - raise socket.error(122) + raise OSError(122) sock.send = send inst = self._makeOne(sock=sock, map=map) @@ -1520,7 +1556,7 @@ class Test_dispatcher(unittest.TestCase): map = {} def recv(*arg, **kw): - raise socket.error(errno.ECONNRESET) + raise OSError(errno.ECONNRESET) def handle_close(): inst.close_handled = True @@ -1537,7 +1573,7 @@ class Test_dispatcher(unittest.TestCase): map = {} def close(): - raise socket.error(122) + raise OSError(122) sock.close = close inst = self._makeOne(sock=sock, map=map) @@ -1680,7 +1716,7 @@ class Test_close_all(unittest.TestCase): self.assertRaises(RuntimeError, self._callFUT, map) -class DummyDispatcher(object): +class DummyDispatcher: read_event_handled = False write_event_handled = False expt_event_handled = False @@ -1693,16 +1729,19 @@ class DummyDispatcher(object): def handle_read_event(self): self.read_event_handled = True + if self.exc is not None: raise self.exc def handle_write_event(self): self.write_event_handled = True + if self.exc is not None: raise self.exc def handle_expt_event(self): self.expt_event_handled = True + if self.exc is not None: raise self.exc @@ -1723,7 +1762,7 @@ class DummyDispatcher(object): raise self.exc -class DummyTime(object): +class DummyTime: def __init__(self): self.sleepvals = [] @@ -1731,7 +1770,7 @@ class DummyTime(object): self.sleepvals.append(val) -class DummySelect(object): +class DummySelect: error = select.error def __init__(self, exc=None, pollster=None): @@ -1741,6 +1780,7 @@ class DummySelect(object): def select(self, *arg): self.selected.append(arg) + if self.exc is not None: raise self.exc @@ -1748,13 +1788,14 @@ class DummySelect(object): return self.pollster -class DummyPollster(object): +class DummyPollster: def __init__(self, exc=None): self.polled = [] self.exc = exc def poll(self, timeout): self.polled.append(timeout) + if self.exc is not None: raise self.exc else: # pragma: no cover |