diff options
Diffstat (limited to 'tests/test_wasyncore.py')
-rw-r--r-- | tests/test_wasyncore.py | 48 |
1 files changed, 45 insertions, 3 deletions
diff --git a/tests/test_wasyncore.py b/tests/test_wasyncore.py index a7e2878..970e993 100644 --- a/tests/test_wasyncore.py +++ b/tests/test_wasyncore.py @@ -1,3 +1,4 @@ +import _thread as thread import contextlib import errno import functools @@ -23,6 +24,7 @@ HOSTv4 = "127.0.0.1" HOSTv6 = "::1" # Filename used for testing + if os.name == "java": # pragma: no cover # Jython disallows @ in module names TESTFN = "$test" @@ -66,6 +68,7 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover # in order to re-raise the warnings. frame = sys._getframe(2) registry = frame.f_globals.get("__warningregistry__") + if registry: registry.clear() with warnings.catch_warnings(record=True) as w: @@ -77,19 +80,25 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover # Filter the recorded warnings reraise = list(w) missing = [] + for msg, cat in filters: seen = False + for w in reraise[:]: warning = w.message # Filter out the matching messages + if re.match(msg, str(warning), re.I) and issubclass(warning.__class__, cat): seen = True reraise.remove(w) + if not seen and not quiet: # This filter caught nothing missing.append((msg, cat.__name__)) + if reraise: raise AssertionError("unhandled warning %s" % reraise[0]) + if missing: raise AssertionError("filter (%r, %s) did not catch any warning" % missing[0]) @@ -110,11 +119,14 @@ def check_warnings(*filters, **kwargs): # pragma: no cover check_warnings(("", Warning), quiet=True) """ quiet = kwargs.get("quiet") + if not filters: filters = (("", Warning),) # Preserve backward compatibility + if quiet is None: quiet = True + return _filterwarnings(filters, quiet) @@ -129,6 +141,7 @@ def gc_collect(): # pragma: no cover objects to disappear. """ gc.collect() + if sys.platform.startswith("java"): time.sleep(0.1) gc.collect() @@ -136,7 +149,7 @@ def gc_collect(): # pragma: no cover def threading_setup(): # pragma: no cover - return (compat.thread._count(), None) + return (thread._count(), None) def threading_cleanup(*original_values): # pragma: no cover @@ -145,7 +158,8 @@ def threading_cleanup(*original_values): # pragma: no cover _MAX_COUNT = 100 for count in range(_MAX_COUNT): - values = (compat.thread._count(), None) + values = (thread._count(), None) + if values == original_values: break @@ -185,6 +199,7 @@ def join_thread(thread, timeout=30.0): # pragma: no cover after timeout seconds. """ thread.join(timeout) + if thread.is_alive(): msg = "failed to join the thread in %.1f seconds" % timeout raise AssertionError(msg) @@ -212,6 +227,7 @@ def bind_port(sock, host=HOST): # pragma: no cover "tests should never set the SO_REUSEADDR " "socket option on TCP/IP sockets!" ) + if hasattr(socket, "SO_REUSEPORT"): try: if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: @@ -224,11 +240,13 @@ def bind_port(sock, host=HOST): # pragma: no cover # thus defining SO_REUSEPORT but this process is running # under an older kernel that does not support SO_REUSEPORT. pass + if hasattr(socket, "SO_EXCLUSIVEADDRUSE"): sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) sock.bind((host, 0)) port = sock.getsockname()[1] + return port @@ -302,13 +320,16 @@ def capture_server(evt, buf, serv): # pragma no cover else: n = 200 start = time.time() + while n > 0 and time.time() - start < 3.0: r, w, e = select.select([conn], [], [], 0.1) + if r: n -= 1 data = conn.recv(10) # keep everything except for the newline terminator buf.write(data.replace(b"\n", b"")) + if b"\n" in data: break time.sleep(0.01) @@ -331,6 +352,7 @@ def bind_unix_socket(sock, addr): # pragma: no cover def bind_af_aware(sock, addr): """Helper function to bind a socket according to its family.""" + if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. unlink(addr) @@ -345,6 +367,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Perform the operation func(pathname) # Now setup the wait loop + if waitall: dirname = pathname else: @@ -357,6 +380,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Testing on an i7@4.3GHz shows that usually only 1 iteration is # required when contention occurs. timeout = 0.001 + while timeout < 1.0: # Note we are only testing for the existence of the file(s) in # the contents of the directory regardless of any security or @@ -366,6 +390,7 @@ if sys.platform.startswith("win"): # pragma: no cover # Other Windows APIs can fail or give incorrect results when # dealing with files that are pending deletion. L = os.listdir(dirname) + if not (L if waitall else name in L): return # Increase the timeout and try again @@ -394,17 +419,20 @@ def unlink(filename): def _is_ipv6_enabled(): # pragma: no cover """Check whether IPv6 is enabled on this host.""" + if compat.HAS_IPV6: sock = None try: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.bind(("::1", 0)) + return True except OSError: pass finally: if sock: sock.close() + return False @@ -486,6 +514,7 @@ class HelperFunctionTests(unittest.TestCase): # Only the attribute modified by the routine we expect to be # called should be True. + for attr in attributes: self.assertEqual(getattr(tobj, attr), attr == expectedattr) @@ -512,6 +541,7 @@ class HelperFunctionTests(unittest.TestCase): l = [] testmap = {} + for i in range(10): c = dummychannel() l.append(c) @@ -605,6 +635,7 @@ class DispatcherTests(unittest.TestCase): def test_strerror(self): # refers to bug #8573 err = asyncore._strerror(errno.EPERM) + if hasattr(os, "strerror"): self.assertEqual(err, os.strerror(errno.EPERM)) err = asyncore._strerror(-1) @@ -655,6 +686,7 @@ class DispatcherWithSendTests(unittest.TestCase): d.send(b"\n") n = 1000 + while d.out_buffer and n > 0: # pragma: no cover asyncore.poll() n -= 1 @@ -722,6 +754,7 @@ class FileWrapperTest(unittest.TestCase): def test_resource_warning(self): # Issue #11453 got_warning = False + while got_warning is False: # we try until we get the outcome we want because this # test is not deterministic (gc_collect() may not @@ -818,8 +851,10 @@ class BaseTestAPI: def loop_waiting_for_flag(self, instance, timeout=5): # pragma: no cover timeout = float(timeout) / 100 count = 100 + while asyncore.socket_map and count > 0: asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) + if instance.flag: return count -= 1 @@ -965,6 +1000,7 @@ class BaseTestAPI: # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used. + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: self.skipTest("Not applicable to AF_UNIX sockets.") @@ -979,7 +1015,7 @@ class BaseTestAPI: class TestHandler(BaseTestHandler): def __init__(self, conn): BaseTestHandler.__init__(self, conn) - self.socket.send(compat.tobytes(chr(244)), socket.MSG_OOB) + self.socket.send(chr(244).encode("latin-1"), socket.MSG_OOB) server = BaseServer(self.family, self.addr, TestHandler) client = TestClient(self.family, server.address) @@ -1081,6 +1117,7 @@ class BaseTestAPI: @reap_threads def test_quick_connect(self): # pragma: no cover # see: http://bugs.python.org/issue10340 + if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): self.skipTest("test specific to AF_INET and AF_INET6") @@ -1692,16 +1729,19 @@ class DummyDispatcher: def handle_read_event(self): self.read_event_handled = True + if self.exc is not None: raise self.exc def handle_write_event(self): self.write_event_handled = True + if self.exc is not None: raise self.exc def handle_expt_event(self): self.expt_event_handled = True + if self.exc is not None: raise self.exc @@ -1740,6 +1780,7 @@ class DummySelect: def select(self, *arg): self.selected.append(arg) + if self.exc is not None: raise self.exc @@ -1754,6 +1795,7 @@ class DummyPollster: def poll(self, timeout): self.polled.append(timeout) + if self.exc is not None: raise self.exc else: # pragma: no cover |