summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/fixtureapps/filewrapper.py2
-rw-r--r--tests/fixtureapps/getline.py4
-rw-r--r--tests/fixtureapps/nocl.py3
-rw-r--r--tests/test_adjustments.py25
-rw-r--r--tests/test_buffers.py10
-rw-r--r--tests/test_channel.py32
-rw-r--r--tests/test_compat.py22
-rw-r--r--tests/test_functional.py482
-rw-r--r--tests/test_init.py4
-rw-r--r--tests/test_parser.py106
-rw-r--r--tests/test_proxy_headers.py10
-rw-r--r--tests/test_receiver.py2
-rw-r--r--tests/test_runner.py24
-rw-r--r--tests/test_server.py18
-rw-r--r--tests/test_task.py18
-rw-r--r--tests/test_trigger.py2
-rw-r--r--tests/test_utilities.py7
-rw-r--r--tests/test_wasyncore.py95
18 files changed, 411 insertions, 455 deletions
diff --git a/tests/fixtureapps/filewrapper.py b/tests/fixtureapps/filewrapper.py
index 63df5a6..40b7685 100644
--- a/tests/fixtureapps/filewrapper.py
+++ b/tests/fixtureapps/filewrapper.py
@@ -5,7 +5,7 @@ here = os.path.dirname(os.path.abspath(__file__))
fn = os.path.join(here, "groundhog1.jpg")
-class KindaFilelike(object): # pragma: no cover
+class KindaFilelike: # pragma: no cover
def __init__(self, bytes):
self.bytes = bytes
diff --git a/tests/fixtureapps/getline.py b/tests/fixtureapps/getline.py
index 5e0ad3a..bb5b39c 100644
--- a/tests/fixtureapps/getline.py
+++ b/tests/fixtureapps/getline.py
@@ -2,9 +2,9 @@ import sys
if __name__ == "__main__":
try:
- from urllib.request import urlopen, URLError
+ from urllib.request import URLError, urlopen
except ImportError:
- from urllib2 import urlopen, URLError
+ from urllib2 import URLError, urlopen
url = sys.argv[1]
headers = {"Content-Type": "text/plain; charset=utf-8"}
diff --git a/tests/fixtureapps/nocl.py b/tests/fixtureapps/nocl.py
index f82bba0..c95a4f5 100644
--- a/tests/fixtureapps/nocl.py
+++ b/tests/fixtureapps/nocl.py
@@ -6,8 +6,7 @@ def chunks(l, n): # pragma: no cover
def gen(body): # pragma: no cover
- for chunk in chunks(body, 10):
- yield chunk
+ yield from chunks(body, 10)
def app(environ, start_response): # pragma: no cover
diff --git a/tests/test_adjustments.py b/tests/test_adjustments.py
index 303c1aa..420ee4c 100644
--- a/tests/test_adjustments.py
+++ b/tests/test_adjustments.py
@@ -1,16 +1,9 @@
-import sys
import socket
+import sys
+import unittest
import warnings
-from waitress.compat import (
- PY2,
- WIN,
-)
-
-if sys.version_info[:2] == (2, 6): # pragma: no cover
- import unittest2 as unittest
-else: # pragma: no cover
- import unittest
+from waitress.compat import WIN
class Test_asbool(unittest.TestCase):
@@ -60,10 +53,12 @@ class Test_as_socket_list(unittest.TestCase):
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET6, socket.SOCK_STREAM),
]
+
if hasattr(socket, "AF_UNIX"):
sockets.append(socket.socket(socket.AF_UNIX, socket.SOCK_STREAM))
new_sockets = as_socket_list(sockets)
self.assertEqual(sockets, new_sockets)
+
for sock in sockets:
sock.close()
@@ -77,6 +72,7 @@ class Test_as_socket_list(unittest.TestCase):
]
new_sockets = as_socket_list(sockets)
self.assertEqual(new_sockets, [sockets[0], sockets[1]])
+
for sock in [sock for sock in sockets if isinstance(sock, socket.socket)]:
sock.close()
@@ -99,6 +95,7 @@ class TestAdjustments(unittest.TestCase):
return True
except socket.gaierror as e:
# Check to see what the error is
+
if e.errno == socket.EAI_ADDRFAMILY:
return False
else:
@@ -220,11 +217,12 @@ class TestAdjustments(unittest.TestCase):
self.assertRaises(ValueError, self._makeOne, listen="127.0.0.1:test")
def test_service_port(self):
- if WIN and PY2: # pragma: no cover
- # On Windows and Python 2 this is broken, so we raise a ValueError
+ if WIN: # pragma: no cover
+ # On Windows this is broken, so we raise a ValueError
self.assertRaises(
ValueError, self._makeOne, listen="127.0.0.1:http",
)
+
return
inst = self._makeOne(listen="127.0.0.1:http 0.0.0.0:https")
@@ -406,6 +404,9 @@ class TestCLI(unittest.TestCase):
return Adjustments.parse_args(argv)
+ def assertDictContainsSubset(self, subset, dictionary):
+ self.assertTrue(set(subset.items()) <= set(dictionary.items()))
+
def test_noargs(self):
opts, args = self.parse([])
self.assertDictEqual(opts, {"call": False, "help": False})
diff --git a/tests/test_buffers.py b/tests/test_buffers.py
index a1330ac..01cdc2d 100644
--- a/tests/test_buffers.py
+++ b/tests/test_buffers.py
@@ -1,5 +1,5 @@
-import unittest
import io
+import unittest
class TestFileBasedBuffer(unittest.TestCase):
@@ -413,7 +413,7 @@ class TestOverflowableBuffer(unittest.TestCase):
def test_prune_with_buf(self):
inst = self._makeOne()
- class Buf(object):
+ class Buf:
def prune(self):
self.pruned = True
@@ -477,7 +477,7 @@ class TestOverflowableBuffer(unittest.TestCase):
self.buffers_to_close.remove(inst)
def test_close_withbuf(self):
- class Buffer(object):
+ class Buffer:
def close(self):
self.closed = True
@@ -489,7 +489,7 @@ class TestOverflowableBuffer(unittest.TestCase):
self.buffers_to_close.remove(inst)
-class KindaFilelike(object):
+class KindaFilelike:
def __init__(self, bytes, close=None, tellresults=None):
self.bytes = bytes
self.tellresults = tellresults
@@ -506,7 +506,7 @@ class Filelike(KindaFilelike):
return v
-class DummyBuffer(object):
+class DummyBuffer:
def __init__(self, length=0):
self.length = length
diff --git a/tests/test_channel.py b/tests/test_channel.py
index 14ef5a0..df3d450 100644
--- a/tests/test_channel.py
+++ b/tests/test_channel.py
@@ -1,5 +1,5 @@
-import unittest
import io
+import unittest
class TestHTTPChannel(unittest.TestCase):
@@ -29,13 +29,13 @@ class TestHTTPChannel(unittest.TestCase):
inst, _, map = self._makeOneWithMap()
- class DummyBuffer(object):
+ class DummyBuffer:
chunks = []
def append(self, data):
self.chunks.append(data)
- class DummyData(object):
+ class DummyData:
def __len__(self):
return MAXINT
@@ -195,7 +195,7 @@ class TestHTTPChannel(unittest.TestCase):
inst.will_close = False
def recv(b):
- raise socket.error
+ raise OSError
inst.recv = recv
inst.last_activity = 0
@@ -270,7 +270,7 @@ class TestHTTPChannel(unittest.TestCase):
class Lock(DummyLock):
def wait(self):
inst.total_outbufs_len = 0
- super(Lock, self).wait()
+ super().wait()
inst.outbuf_lock = Lock()
wrote = inst.write_soon(b"xyz")
@@ -367,7 +367,7 @@ class TestHTTPChannel(unittest.TestCase):
inst, sock, map = self._makeOneWithMap()
- class DummyHugeOutbuffer(object):
+ class DummyHugeOutbuffer:
def __init__(self):
self.length = MAXINT + 1
@@ -705,7 +705,7 @@ class TestHTTPChannel(unittest.TestCase):
self.assertEqual(inst.requests, [])
-class DummySock(object):
+class DummySock:
blocking = False
closed = False
@@ -732,7 +732,7 @@ class DummySock(object):
return len(data)
-class DummyLock(object):
+class DummyLock:
notified = False
def __init__(self, acquirable=True):
@@ -759,7 +759,7 @@ class DummyLock(object):
pass
-class DummyBuffer(object):
+class DummyBuffer:
closed = False
def __init__(self, data, toraise=None):
@@ -783,7 +783,7 @@ class DummyBuffer(object):
self.closed = True
-class DummyAdjustments(object):
+class DummyAdjustments:
outbuf_overflow = 1048576
outbuf_high_watermark = 1048576
inbuf_overflow = 512000
@@ -798,7 +798,7 @@ class DummyAdjustments(object):
max_request_header_size = 10000
-class DummyServer(object):
+class DummyServer:
trigger_pulled = False
adj = DummyAdjustments()
@@ -813,7 +813,7 @@ class DummyServer(object):
self.trigger_pulled = True
-class DummyParser(object):
+class DummyParser:
version = 1
data = None
completed = True
@@ -831,7 +831,7 @@ class DummyParser(object):
return len(data)
-class DummyRequest(object):
+class DummyRequest:
error = None
path = "/"
version = "1.0"
@@ -844,7 +844,7 @@ class DummyRequest(object):
self.closed = True
-class DummyLogger(object):
+class DummyLogger:
def __init__(self):
self.exceptions = []
self.infos = []
@@ -857,13 +857,13 @@ class DummyLogger(object):
self.exceptions.append(msg)
-class DummyError(object):
+class DummyError:
code = "431"
reason = "Bleh"
body = "My body"
-class DummyTaskClass(object):
+class DummyTaskClass:
wrote_header = True
close_on_finish = False
serviced = False
diff --git a/tests/test_compat.py b/tests/test_compat.py
deleted file mode 100644
index 37c2193..0000000
--- a/tests/test_compat.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-
-class Test_unquote_bytes_to_wsgi(unittest.TestCase):
- def _callFUT(self, v):
- from waitress.compat import unquote_bytes_to_wsgi
-
- return unquote_bytes_to_wsgi(v)
-
- def test_highorder(self):
- from waitress.compat import PY3
-
- val = b"/a%C5%9B"
- result = self._callFUT(val)
- if PY3: # pragma: no cover
- # PEP 3333 urlunquoted-latin1-decoded-bytes
- self.assertEqual(result, "/aÅ\x9b")
- else: # pragma: no cover
- # sanity
- self.assertEqual(result, b"/a\xc5\x9b")
diff --git a/tests/test_functional.py b/tests/test_functional.py
index e894497..c99876d 100644
--- a/tests/test_functional.py
+++ b/tests/test_functional.py
@@ -1,4 +1,5 @@
import errno
+from http import client as httplib
import logging
import multiprocessing
import os
@@ -9,8 +10,8 @@ import subprocess
import sys
import time
import unittest
+
from waitress import server
-from waitress.compat import httplib, tobytes
from waitress.utilities import cleanup_unix_socket
dn = os.path.dirname
@@ -54,14 +55,15 @@ class FixtureTcpWSGIServer(server.TcpWSGIServer):
def __init__(self, application, queue, **kw): # pragma: no cover
# Coverage doesn't see this as it's ran in a separate process.
kw["port"] = 0 # Bind to any available port.
- super(FixtureTcpWSGIServer, self).__init__(application, **kw)
+ super().__init__(application, **kw)
host, port = self.socket.getsockname()
+
if os.name == "nt":
host = "127.0.0.1"
queue.put((host, port))
-class SubprocessTests(object):
+class SubprocessTests:
# For nose: all tests may be ran in separate processes.
_multiprocess_can_split_ = True
@@ -98,9 +100,9 @@ class SubprocessTests(object):
def assertline(self, line, status, reason, version):
v, s, r = (x.strip() for x in line.split(None, 2))
- self.assertEqual(s, tobytes(status))
- self.assertEqual(r, tobytes(reason))
- self.assertEqual(v, tobytes(version))
+ self.assertEqual(s, status.encode("latin-1"))
+ self.assertEqual(r, reason.encode("latin-1"))
+ self.assertEqual(v, version.encode("latin-1"))
def create_socket(self):
return socket.socket(self.server.family, socket.SOCK_STREAM)
@@ -142,9 +144,11 @@ class SleepyThreadTests(TcpTests, unittest.TestCase):
)
r, w = os.pipe()
procs = []
+
for cmd in cmds:
procs.append(subprocess.Popen(cmd, stdout=w))
time.sleep(3)
+
for proc in procs:
if proc.returncode is not None: # pragma: no cover
proc.terminate()
@@ -158,7 +162,7 @@ class SleepyThreadTests(TcpTests, unittest.TestCase):
self.assertEqual(result, b"notsleepy returnedsleepy returned")
-class EchoTests(object):
+class EchoTests:
def setUp(self):
from tests.fixtureapps import echo
@@ -177,11 +181,11 @@ class EchoTests(object):
from tests.fixtureapps import echo
line, headers, body = read_http(fp)
+
return line, headers, echo.parse_response(body)
def test_date_and_server(self):
- to_send = "GET / HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET / HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -192,8 +196,7 @@ class EchoTests(object):
def test_bad_host_header(self):
# https://corte.si/posts/code/pathod/pythonservers/index.html
- to_send = "GET / HTTP/1.0\r\n Host: 0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET / HTTP/1.0\r\n Host: 0\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -203,9 +206,8 @@ class EchoTests(object):
self.assertTrue(headers.get("date"))
def test_send_with_body(self):
- to_send = "GET / HTTP/1.0\r\nContent-Length: 5\r\n\r\n"
- to_send += "hello"
- to_send = tobytes(to_send)
+ to_send = b"GET / HTTP/1.0\r\nContent-Length: 5\r\n\r\n"
+ to_send += b"hello"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -215,8 +217,7 @@ class EchoTests(object):
self.assertEqual(echo.body, b"hello")
def test_send_empty_body(self):
- to_send = "GET / HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET / HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -227,6 +228,7 @@ class EchoTests(object):
def test_multiple_requests_with_body(self):
orig_sock = self.sock
+
for x in range(3):
self.sock = self.create_socket()
self.test_send_with_body()
@@ -235,6 +237,7 @@ class EchoTests(object):
def test_multiple_requests_without_body(self):
orig_sock = self.sock
+
for x in range(3):
self.sock = self.create_socket()
self.test_send_empty_body()
@@ -242,13 +245,13 @@ class EchoTests(object):
self.sock = orig_sock
def test_without_crlf(self):
- data = "Echo\r\nthis\r\nplease"
- s = tobytes(
- "GET / HTTP/1.0\r\n"
- "Connection: close\r\n"
- "Content-Length: %d\r\n"
- "\r\n"
- "%s" % (len(data), data)
+ data = b"Echo\r\nthis\r\nplease"
+ s = (
+ b"GET / HTTP/1.0\r\n"
+ b"Connection: close\r\n"
+ b"Content-Length: %d\r\n"
+ b"\r\n"
+ b"%s" % (len(data), data)
)
self.connect()
self.sock.send(s)
@@ -257,40 +260,42 @@ class EchoTests(object):
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(int(echo.content_length), len(data))
self.assertEqual(len(echo.body), len(data))
- self.assertEqual(echo.body, tobytes(data))
+ self.assertEqual(echo.body, (data))
def test_large_body(self):
# 1024 characters.
- body = "This string has 32 characters.\r\n" * 32
- s = tobytes(
- "GET / HTTP/1.0\r\nContent-Length: %d\r\n\r\n%s" % (len(body), body)
- )
+ body = b"This string has 32 characters.\r\n" * 32
+ s = b"GET / HTTP/1.0\r\nContent-Length: %d\r\n\r\n%s" % (len(body), body)
self.connect()
self.sock.send(s)
fp = self.sock.makefile("rb", 0)
line, headers, echo = self._read_echo(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(echo.content_length, "1024")
- self.assertEqual(echo.body, tobytes(body))
+ self.assertEqual(echo.body, body)
def test_many_clients(self):
conns = []
+
for n in range(50):
h = self.make_http_connection()
h.request("GET", "/", headers={"Accept": "text/plain"})
conns.append(h)
responses = []
+
for h in conns:
response = h.getresponse()
self.assertEqual(response.status, 200)
responses.append(response)
+
for response in responses:
response.read()
+
for h in conns:
h.close()
def test_chunking_request_without_content(self):
- header = tobytes("GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n")
+ header = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
self.connect()
self.sock.send(header)
self.sock.send(b"0\r\n\r\n")
@@ -305,10 +310,11 @@ class EchoTests(object):
control_line = b"20;\r\n" # 20 hex = 32 dec
s = b"This string has 32 characters.\r\n"
expected = s * 12
- header = tobytes("GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n")
+ header = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
self.connect()
self.sock.send(header)
fp = self.sock.makefile("rb", 0)
+
for n in range(12):
self.sock.send(control_line)
self.sock.send(s)
@@ -321,13 +327,12 @@ class EchoTests(object):
self.assertFalse("transfer-encoding" in headers)
def test_broken_chunked_encoding(self):
- control_line = "20;\r\n" # 20 hex = 32 dec
- s = "This string has 32 characters.\r\n"
- to_send = "GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
- to_send += control_line + s + "\r\n"
+ control_line = b"20;\r\n" # 20 hex = 32 dec
+ s = b"This string has 32 characters.\r\n"
+ to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
+ to_send += control_line + s + b"\r\n"
# garbage in input
- to_send += "garbage\r\n"
- to_send = tobytes(to_send)
+ to_send += b"garbage\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -346,13 +351,12 @@ class EchoTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_broken_chunked_encoding_missing_chunk_end(self):
- control_line = "20;\r\n" # 20 hex = 32 dec
- s = "This string has 32 characters.\r\n"
- to_send = "GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
+ control_line = b"20;\r\n" # 20 hex = 32 dec
+ s = b"This string has 32 characters.\r\n"
+ to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
to_send += control_line + s
# garbage in input
- to_send += "garbage"
- to_send = tobytes(to_send)
+ to_send += b"garbage"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -373,10 +377,8 @@ class EchoTests(object):
def test_keepalive_http_10(self):
# Handling of Keep-Alive within HTTP 1.0
- data = "Default: Don't keep me alive"
- s = tobytes(
- "GET / HTTP/1.0\r\nContent-Length: %d\r\n\r\n%s" % (len(data), data)
- )
+ data = b"Default: Don't keep me alive"
+ s = b"GET / HTTP/1.0\r\nContent-Length: %d\r\n\r\n%s" % (len(data), data)
self.connect()
self.sock.send(s)
response = httplib.HTTPResponse(self.sock)
@@ -391,13 +393,13 @@ class EchoTests(object):
# If header Connection: Keep-Alive is explicitly sent,
# we want to keept the connection open, we also need to return
# the corresponding header
- data = "Keep me alive"
- s = tobytes(
- "GET / HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: %d\r\n"
- "\r\n"
- "%s" % (len(data), data)
+ data = b"Keep me alive"
+ s = (
+ b"GET / HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: %d\r\n"
+ b"\r\n"
+ b"%s" % (len(data), data)
)
self.connect()
self.sock.send(s)
@@ -411,10 +413,8 @@ class EchoTests(object):
# Handling of Keep-Alive within HTTP 1.1
# All connections are kept alive, unless stated otherwise
- data = "Default: Keep me alive"
- s = tobytes(
- "GET / HTTP/1.1\r\nContent-Length: %d\r\n\r\n%s" % (len(data), data)
- )
+ data = b"Default: Keep me alive"
+ s = b"GET / HTTP/1.1\r\nContent-Length: %d\r\n\r\n%s" % (len(data), data)
self.connect()
self.sock.send(s)
response = httplib.HTTPResponse(self.sock)
@@ -424,13 +424,13 @@ class EchoTests(object):
def test_keepalive_http11_explicit(self):
# Explicitly set keep-alive
- data = "Default: Keep me alive"
- s = tobytes(
- "GET / HTTP/1.1\r\n"
- "Connection: keep-alive\r\n"
- "Content-Length: %d\r\n"
- "\r\n"
- "%s" % (len(data), data)
+ data = b"Default: Keep me alive"
+ s = (
+ b"GET / HTTP/1.1\r\n"
+ b"Connection: keep-alive\r\n"
+ b"Content-Length: %d\r\n"
+ b"\r\n"
+ b"%s" % (len(data), data)
)
self.connect()
self.sock.send(s)
@@ -441,13 +441,13 @@ class EchoTests(object):
def test_keepalive_http11_connclose(self):
# specifying Connection: close explicitly
- data = "Don't keep me alive"
- s = tobytes(
- "GET / HTTP/1.1\r\n"
- "Connection: close\r\n"
- "Content-Length: %d\r\n"
- "\r\n"
- "%s" % (len(data), data)
+ data = b"Don't keep me alive"
+ s = (
+ b"GET / HTTP/1.1\r\n"
+ b"Connection: close\r\n"
+ b"Content-Length: %d\r\n"
+ b"\r\n"
+ b"%s" % (len(data), data)
)
self.connect()
self.sock.send(s)
@@ -458,14 +458,13 @@ class EchoTests(object):
def test_proxy_headers(self):
to_send = (
- "GET / HTTP/1.0\r\n"
- "Content-Length: 0\r\n"
- "Host: www.google.com:8080\r\n"
- "X-Forwarded-For: 192.168.1.1\r\n"
- "X-Forwarded-Proto: https\r\n"
- "X-Forwarded-Port: 5000\r\n\r\n"
+ b"GET / HTTP/1.0\r\n"
+ b"Content-Length: 0\r\n"
+ b"Host: www.google.com:8080\r\n"
+ b"X-Forwarded-For: 192.168.1.1\r\n"
+ b"X-Forwarded-Proto: https\r\n"
+ b"X-Forwarded-Port: 5000\r\n\r\n"
)
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -480,7 +479,7 @@ class EchoTests(object):
self.assertEqual(echo.remote_host, "192.168.1.1")
-class PipeliningTests(object):
+class PipeliningTests:
def setUp(self):
from tests.fixtureapps import echo
@@ -491,27 +490,30 @@ class PipeliningTests(object):
def test_pipelining(self):
s = (
- "GET / HTTP/1.0\r\n"
- "Connection: %s\r\n"
- "Content-Length: %d\r\n"
- "\r\n"
- "%s"
+ b"GET / HTTP/1.0\r\n"
+ b"Connection: %s\r\n"
+ b"Content-Length: %d\r\n"
+ b"\r\n"
+ b"%s"
)
to_send = b""
count = 25
+
for n in range(count):
- body = "Response #%d\r\n" % (n + 1)
+ body = b"Response #%d\r\n" % (n + 1)
+
if n + 1 < count:
- conn = "keep-alive"
+ conn = b"keep-alive"
else:
- conn = "close"
- to_send += tobytes(s % (conn, len(body), body))
+ conn = b"close"
+ to_send += s % (conn, len(body), body)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
+
for n in range(count):
- expect_body = tobytes("Response #%d\r\n" % (n + 1))
+ expect_body = b"Response #%d\r\n" % (n + 1)
line = fp.readline() # status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
@@ -522,7 +524,7 @@ class PipeliningTests(object):
self.assertEqual(response_body, expect_body)
-class ExpectContinueTests(object):
+class ExpectContinueTests:
def setUp(self):
from tests.fixtureapps import echo
@@ -533,14 +535,14 @@ class ExpectContinueTests(object):
def test_expect_continue(self):
# specifying Connection: close explicitly
- data = "I have expectations"
- to_send = tobytes(
- "GET / HTTP/1.1\r\n"
- "Connection: close\r\n"
- "Content-Length: %d\r\n"
- "Expect: 100-continue\r\n"
- "\r\n"
- "%s" % (len(data), data)
+ data = b"I have expectations"
+ to_send = (
+ b"GET / HTTP/1.1\r\n"
+ b"Connection: close\r\n"
+ b"Content-Length: %d\r\n"
+ b"Expect: 100-continue\r\n"
+ b"\r\n"
+ b"%s" % (len(data), data)
)
self.connect()
self.sock.send(to_send)
@@ -558,10 +560,10 @@ class ExpectContinueTests(object):
response_body = fp.read(length)
self.assertEqual(int(status), 200)
self.assertEqual(length, len(response_body))
- self.assertEqual(response_body, tobytes(data))
+ self.assertEqual(response_body, data)
-class BadContentLengthTests(object):
+class BadContentLengthTests:
def setUp(self):
from tests.fixtureapps import badcl
@@ -573,11 +575,11 @@ class BadContentLengthTests(object):
def test_short_body(self):
# check to see if server closes connection when body is too short
# for cl header
- to_send = tobytes(
- "GET /short_body HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /short_body HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -590,7 +592,7 @@ class BadContentLengthTests(object):
self.assertEqual(int(status), 200)
self.assertNotEqual(content_length, len(response_body))
self.assertEqual(len(response_body), content_length - 1)
- self.assertEqual(response_body, tobytes("abcdefghi"))
+ self.assertEqual(response_body, b"abcdefghi")
# remote closed connection (despite keepalive header); not sure why
# first send succeeds
self.send_check_error(to_send)
@@ -599,11 +601,11 @@ class BadContentLengthTests(object):
def test_long_body(self):
# check server doesnt close connection when body is too short
# for cl header
- to_send = tobytes(
- "GET /long_body HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /long_body HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -615,7 +617,7 @@ class BadContentLengthTests(object):
response_body = fp.read(content_length)
self.assertEqual(int(status), 200)
self.assertEqual(content_length, len(response_body))
- self.assertEqual(response_body, tobytes("abcdefgh"))
+ self.assertEqual(response_body, b"abcdefgh")
# remote does not close connection (keepalive header)
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -627,7 +629,7 @@ class BadContentLengthTests(object):
self.assertEqual(int(status), 200)
-class NoContentLengthTests(object):
+class NoContentLengthTests:
def setUp(self):
from tests.fixtureapps import nocl
@@ -637,14 +639,13 @@ class NoContentLengthTests(object):
self.stop_subprocess()
def test_http10_generator(self):
- body = string.ascii_letters
+ body = string.ascii_letters.encode("latin-1")
to_send = (
- "GET / HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: %d\r\n\r\n" % len(body)
+ b"GET / HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: %d\r\n\r\n" % len(body)
)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -652,21 +653,20 @@ class NoContentLengthTests(object):
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers.get("content-length"), None)
self.assertEqual(headers.get("connection"), "close")
- self.assertEqual(response_body, tobytes(body))
+ self.assertEqual(response_body, body)
# remote closed connection (despite keepalive header), because
# generators cannot have a content-length divined
self.send_check_error(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
def test_http10_list(self):
- body = string.ascii_letters
+ body = string.ascii_letters.encode("latin-1")
to_send = (
- "GET /list HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: %d\r\n\r\n" % len(body)
+ b"GET /list HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: %d\r\n\r\n" % len(body)
)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -674,7 +674,7 @@ class NoContentLengthTests(object):
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers["content-length"], str(len(body)))
self.assertEqual(headers.get("connection"), "Keep-Alive")
- self.assertEqual(response_body, tobytes(body))
+ self.assertEqual(response_body, body)
# remote keeps connection open because it divined the content length
# from a length-1 list
self.sock.send(to_send)
@@ -682,14 +682,13 @@ class NoContentLengthTests(object):
self.assertline(line, "200", "OK", "HTTP/1.0")
def test_http10_listlentwo(self):
- body = string.ascii_letters
+ body = string.ascii_letters.encode("latin-1")
to_send = (
- "GET /list_lentwo HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: %d\r\n\r\n" % len(body)
+ b"GET /list_lentwo HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: %d\r\n\r\n" % len(body)
)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -697,7 +696,7 @@ class NoContentLengthTests(object):
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers.get("content-length"), None)
self.assertEqual(headers.get("connection"), "close")
- self.assertEqual(response_body, tobytes(body))
+ self.assertEqual(response_body, body)
# remote closed connection (despite keepalive header), because
# lists of length > 1 cannot have their content length divined
self.send_check_error(to_send)
@@ -705,18 +704,20 @@ class NoContentLengthTests(object):
def test_http11_generator(self):
body = string.ascii_letters
- to_send = "GET / HTTP/1.1\r\nContent-Length: %s\r\n\r\n" % len(body)
+ body = body.encode("latin-1")
+ to_send = b"GET / HTTP/1.1\r\nContent-Length: %d\r\n\r\n" % len(body)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
expected = b""
+
for chunk in chunks(body, 10):
- expected += tobytes(
- "%s\r\n%s\r\n" % (str(hex(len(chunk))[2:].upper()), chunk)
+ expected += b"%s\r\n%s\r\n" % (
+ hex(len(chunk))[2:].upper().encode("latin-1"),
+ chunk,
)
expected += b"0\r\n\r\n"
self.assertEqual(response_body, expected)
@@ -725,17 +726,16 @@ class NoContentLengthTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_http11_list(self):
- body = string.ascii_letters
- to_send = "GET /list HTTP/1.1\r\nContent-Length: %d\r\n\r\n" % len(body)
+ body = string.ascii_letters.encode("latin-1")
+ to_send = b"GET /list HTTP/1.1\r\nContent-Length: %d\r\n\r\n" % len(body)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
self.assertEqual(headers["content-length"], str(len(body)))
- self.assertEqual(response_body, tobytes(body))
+ self.assertEqual(response_body, body)
# remote keeps connection open because it divined the content length
# from a length-1 list
self.sock.send(to_send)
@@ -743,19 +743,20 @@ class NoContentLengthTests(object):
self.assertline(line, "200", "OK", "HTTP/1.1")
def test_http11_listlentwo(self):
- body = string.ascii_letters
- to_send = "GET /list_lentwo HTTP/1.1\r\nContent-Length: %s\r\n\r\n" % len(body)
+ body = string.ascii_letters.encode("latin-1")
+ to_send = b"GET /list_lentwo HTTP/1.1\r\nContent-Length: %d\r\n\r\n" % len(body)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
expected = b""
- for chunk in (body[0], body[1:]):
- expected += tobytes(
- "%s\r\n%s\r\n" % (str(hex(len(chunk))[2:].upper()), chunk)
+
+ for chunk in (body[:1], body[1:]):
+ expected += b"%s\r\n%s\r\n" % (
+ (hex(len(chunk))[2:].upper().encode("latin-1")),
+ chunk,
)
expected += b"0\r\n\r\n"
self.assertEqual(response_body, expected)
@@ -764,7 +765,7 @@ class NoContentLengthTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
-class WriteCallbackTests(object):
+class WriteCallbackTests:
def setUp(self):
from tests.fixtureapps import writecb
@@ -776,11 +777,11 @@ class WriteCallbackTests(object):
def test_short_body(self):
# check to see if server closes connection when body is too short
# for cl header
- to_send = tobytes(
- "GET /short_body HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /short_body HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -792,7 +793,7 @@ class WriteCallbackTests(object):
self.assertEqual(cl, 9)
self.assertNotEqual(cl, len(response_body))
self.assertEqual(len(response_body), cl - 1)
- self.assertEqual(response_body, tobytes("abcdefgh"))
+ self.assertEqual(response_body, b"abcdefgh")
# remote closed connection (despite keepalive header)
self.send_check_error(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
@@ -800,11 +801,11 @@ class WriteCallbackTests(object):
def test_long_body(self):
# check server doesnt close connection when body is too long
# for cl header
- to_send = tobytes(
- "GET /long_body HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /long_body HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -813,7 +814,7 @@ class WriteCallbackTests(object):
content_length = int(headers.get("content-length")) or None
self.assertEqual(content_length, 9)
self.assertEqual(content_length, len(response_body))
- self.assertEqual(response_body, tobytes("abcdefghi"))
+ self.assertEqual(response_body, b"abcdefghi")
# remote does not close connection (keepalive header)
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -823,11 +824,11 @@ class WriteCallbackTests(object):
def test_equal_body(self):
# check server doesnt close connection when body is equal to
# cl header
- to_send = tobytes(
- "GET /equal_body HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /equal_body HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -837,7 +838,7 @@ class WriteCallbackTests(object):
self.assertEqual(content_length, 9)
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(content_length, len(response_body))
- self.assertEqual(response_body, tobytes("abcdefghi"))
+ self.assertEqual(response_body, b"abcdefghi")
# remote does not close connection (keepalive header)
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -846,11 +847,11 @@ class WriteCallbackTests(object):
def test_no_content_length(self):
# wtf happens when there's no content-length
- to_send = tobytes(
- "GET /no_content_length HTTP/1.0\r\n"
- "Connection: Keep-Alive\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
+ to_send = (
+ b"GET /no_content_length HTTP/1.0\r\n"
+ b"Connection: Keep-Alive\r\n"
+ b"Content-Length: 0\r\n"
+ b"\r\n"
)
self.connect()
self.sock.send(to_send)
@@ -859,13 +860,13 @@ class WriteCallbackTests(object):
line, headers, response_body = read_http(fp)
content_length = headers.get("content-length")
self.assertEqual(content_length, None)
- self.assertEqual(response_body, tobytes("abcdefghi"))
+ self.assertEqual(response_body, b"abcdefghi")
# remote closed connection (despite keepalive header)
self.send_check_error(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
-class TooLargeTests(object):
+class TooLargeTests:
toobig = 1050
@@ -880,10 +881,9 @@ class TooLargeTests(object):
self.stop_subprocess()
def test_request_body_too_large_with_wrong_cl_http10(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.0\r\nContent-Length: 5\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.0\r\nContent-Length: 5\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
@@ -899,12 +899,11 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_wrong_cl_http10_keepalive(self):
- body = "a" * self.toobig
+ body = b"a" * self.toobig
to_send = (
- "GET / HTTP/1.0\r\nContent-Length: 5\r\nConnection: Keep-Alive\r\n\r\n"
+ b"GET / HTTP/1.0\r\nContent-Length: 5\r\nConnection: Keep-Alive\r\n\r\n"
)
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
@@ -922,10 +921,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_no_cl_http10(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.0\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.0\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -938,10 +936,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_no_cl_http10_keepalive(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.0\r\nConnection: Keep-Alive\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.0\r\nConnection: Keep-Alive\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -961,10 +958,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_wrong_cl_http11(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.1\r\nContent-Length: 5\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.1\r\nContent-Length: 5\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
@@ -983,10 +979,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_wrong_cl_http11_connclose(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.1\r\nContent-Length: 5\r\nConnection: close\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.1\r\nContent-Length: 5\r\nConnection: close\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1000,10 +995,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_no_cl_http11(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.1\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.1\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb")
@@ -1025,10 +1019,9 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_with_no_cl_http11_connclose(self):
- body = "a" * self.toobig
- to_send = "GET / HTTP/1.1\r\nConnection: close\r\n\r\n"
+ body = b"a" * self.toobig
+ to_send = b"GET / HTTP/1.1\r\nConnection: close\r\n\r\n"
to_send += body
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1042,12 +1035,11 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_request_body_too_large_chunked_encoding(self):
- control_line = "20;\r\n" # 20 hex = 32 dec
- s = "This string has 32 characters.\r\n"
- to_send = "GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
+ control_line = b"20;\r\n" # 20 hex = 32 dec
+ s = b"This string has 32 characters.\r\n"
+ to_send = b"GET / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
repeat = control_line + s
to_send += repeat * ((self.toobig // len(repeat)) + 1)
- to_send = tobytes(to_send)
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1062,7 +1054,7 @@ class TooLargeTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
-class InternalServerErrorTests(object):
+class InternalServerErrorTests:
def setUp(self):
from tests.fixtureapps import error
@@ -1072,8 +1064,7 @@ class InternalServerErrorTests(object):
self.stop_subprocess()
def test_before_start_response_http_10(self):
- to_send = "GET /before_start_response HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /before_start_response HTTP/1.0\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1088,8 +1079,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_before_start_response_http_11(self):
- to_send = "GET /before_start_response HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /before_start_response HTTP/1.1\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1107,9 +1097,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_before_start_response_http_11_close(self):
- to_send = tobytes(
- "GET /before_start_response HTTP/1.1\r\nConnection: close\r\n\r\n"
- )
+ to_send = b"GET /before_start_response HTTP/1.1\r\nConnection: close\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1128,8 +1116,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_after_start_response_http10(self):
- to_send = "GET /after_start_response HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /after_start_response HTTP/1.0\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1148,8 +1135,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_after_start_response_http11(self):
- to_send = "GET /after_start_response HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /after_start_response HTTP/1.1\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1167,9 +1153,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_after_start_response_http11_close(self):
- to_send = tobytes(
- "GET /after_start_response HTTP/1.1\r\nConnection: close\r\n\r\n"
- )
+ to_send = b"GET /after_start_response HTTP/1.1\r\nConnection: close\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1188,8 +1172,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_after_write_cb(self):
- to_send = "GET /after_write_cb HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /after_write_cb HTTP/1.1\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1201,8 +1184,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_in_generator(self):
- to_send = "GET /in_generator HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /in_generator HTTP/1.1\r\n\r\n"
self.connect()
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
@@ -1214,7 +1196,7 @@ class InternalServerErrorTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
-class FileWrapperTests(object):
+class FileWrapperTests:
def setUp(self):
from tests.fixtureapps import filewrapper
@@ -1224,8 +1206,7 @@ class FileWrapperTests(object):
self.stop_subprocess()
def test_filelike_http11(self):
- to_send = "GET /filelike HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike HTTP/1.1\r\n\r\n"
self.connect()
@@ -1242,8 +1223,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_filelike_nocl_http11(self):
- to_send = "GET /filelike_nocl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike_nocl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1260,8 +1240,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_filelike_shortcl_http11(self):
- to_send = "GET /filelike_shortcl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike_shortcl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1279,8 +1258,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_filelike_longcl_http11(self):
- to_send = "GET /filelike_longcl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike_longcl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1297,8 +1275,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_notfilelike_http11(self):
- to_send = "GET /notfilelike HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike HTTP/1.1\r\n\r\n"
self.connect()
@@ -1315,8 +1292,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_notfilelike_iobase_http11(self):
- to_send = "GET /notfilelike_iobase HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike_iobase HTTP/1.1\r\n\r\n"
self.connect()
@@ -1333,8 +1309,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_notfilelike_nocl_http11(self):
- to_send = "GET /notfilelike_nocl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike_nocl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1350,8 +1325,7 @@ class FileWrapperTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_notfilelike_shortcl_http11(self):
- to_send = "GET /notfilelike_shortcl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike_shortcl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1369,8 +1343,7 @@ class FileWrapperTests(object):
# connection has not been closed
def test_notfilelike_longcl_http11(self):
- to_send = "GET /notfilelike_longcl HTTP/1.1\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike_longcl HTTP/1.1\r\n\r\n"
self.connect()
@@ -1388,8 +1361,7 @@ class FileWrapperTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_filelike_http10(self):
- to_send = "GET /filelike HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike HTTP/1.0\r\n\r\n"
self.connect()
@@ -1407,8 +1379,7 @@ class FileWrapperTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_filelike_nocl_http10(self):
- to_send = "GET /filelike_nocl HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /filelike_nocl HTTP/1.0\r\n\r\n"
self.connect()
@@ -1426,8 +1397,7 @@ class FileWrapperTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_notfilelike_http10(self):
- to_send = "GET /notfilelike HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike HTTP/1.0\r\n\r\n"
self.connect()
@@ -1445,8 +1415,7 @@ class FileWrapperTests(object):
self.assertRaises(ConnectionClosed, read_http, fp)
def test_notfilelike_nocl_http10(self):
- to_send = "GET /notfilelike_nocl HTTP/1.0\r\n\r\n"
- to_send = tobytes(to_send)
+ to_send = b"GET /notfilelike_nocl HTTP/1.0\r\n\r\n"
self.connect()
@@ -1512,7 +1481,7 @@ if hasattr(socket, "AF_UNIX"):
# Coverage doesn't see this as it's ran in a separate process.
# To permit parallel testing, use a PID-dependent socket.
kw["unix_socket"] = "/tmp/waitress.test-%d.sock" % os.getpid()
- super(FixtureUnixWSGIServer, self).__init__(application, **kw)
+ super().__init__(application, **kw)
queue.put(self.socket.getsockname())
class UnixTests(SubprocessTests):
@@ -1523,7 +1492,7 @@ if hasattr(socket, "AF_UNIX"):
return UnixHTTPConnection(self.bound_to)
def stop_subprocess(self):
- super(UnixTests, self).stop_subprocess()
+ super().stop_subprocess()
cleanup_unix_socket(self.bound_to)
def send_check_error(self, to_send):
@@ -1531,8 +1500,9 @@ if hasattr(socket, "AF_UNIX"):
# 'Broken pipe' error when the socket it closed.
try:
self.sock.send(to_send)
- except socket.error as exc:
- self.assertEqual(get_errno(exc), errno.EPIPE)
+ except OSError as exc:
+ valid_errors = {errno.EPIPE, errno.ENOTCONN}
+ self.assertIn(get_errno(exc), valid_errors)
class UnixEchoTests(EchoTests, UnixTests, unittest.TestCase):
pass
@@ -1570,13 +1540,16 @@ def parse_headers(fp):
"""Parses only RFC2822 headers from a file pointer.
"""
headers = {}
+
while True:
line = fp.readline()
+
if line in (b"\r\n", b"\n", b""):
break
line = line.decode("iso-8859-1")
name, value = line.strip().split(":", 1)
headers[name.lower().strip()] = value.lower().strip()
+
return headers
@@ -1602,25 +1575,31 @@ class ConnectionClosed(Exception):
def read_http(fp): # pragma: no cover
try:
response_line = fp.readline()
- except socket.error as exc:
+ except OSError as exc:
fp.close()
# errno 104 is ENOTRECOVERABLE, In WinSock 10054 is ECONNRESET
+
if get_errno(exc) in (errno.ECONNABORTED, errno.ECONNRESET, 104, 10054):
raise ConnectionClosed
raise
+
if not response_line:
raise ConnectionClosed
header_lines = []
+
while True:
line = fp.readline()
+
if line in (b"\r\n", b"\r\n", b""):
break
else:
header_lines.append(line)
headers = dict()
+
for x in header_lines:
x = x.strip()
+
if not x:
continue
key, value = x.split(b": ", 1)
@@ -1633,8 +1612,10 @@ def read_http(fp): # pragma: no cover
num = int(headers["content-length"])
body = b""
left = num
+
while left > 0:
data = fp.read(left)
+
if not data:
break
body += data
@@ -1670,5 +1651,6 @@ def get_errno(exc): # pragma: no cover
def chunks(l, n):
""" Yield successive n-sized chunks from l.
"""
+
for i in range(0, len(l), n):
yield l[i : i + n]
diff --git a/tests/test_init.py b/tests/test_init.py
index f9b91d7..c824c21 100644
--- a/tests/test_init.py
+++ b/tests/test_init.py
@@ -31,7 +31,7 @@ class Test_serve_paste(unittest.TestCase):
self.assertEqual(server.ran, True)
-class DummyServerFactory(object):
+class DummyServerFactory:
ran = False
def __call__(self, app, **kw):
@@ -44,7 +44,7 @@ class DummyServerFactory(object):
self.ran = True
-class DummyAdj(object):
+class DummyAdj:
verbose = False
def __init__(self, kw):
diff --git a/tests/test_parser.py b/tests/test_parser.py
index 91837c7..eace4af 100644
--- a/tests/test_parser.py
+++ b/tests/test_parser.py
@@ -15,13 +15,26 @@
"""
import unittest
-from waitress.compat import text_, tobytes
+from waitress.adjustments import Adjustments
+from waitress.parser import (
+ HTTPRequestParser,
+ ParsingError,
+ TransferEncodingNotImplemented,
+ crack_first_line,
+ get_header_lines,
+ split_uri,
+ unquote_bytes_to_wsgi,
+)
+from waitress.utilities import (
+ BadRequest,
+ RequestEntityTooLarge,
+ RequestHeaderFieldsTooLarge,
+ ServerNotImplemented,
+)
class TestHTTPRequestParser(unittest.TestCase):
def setUp(self):
- from waitress.parser import HTTPRequestParser
- from waitress.adjustments import Adjustments
my_adj = Adjustments()
self.parser = HTTPRequestParser(my_adj)
@@ -45,8 +58,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers, {})
def test_received_bad_host_header(self):
- from waitress.utilities import BadRequest
-
data = b"HTTP/1.0 GET /foobar\r\n Host: foo\r\n\r\n"
result = self.parser.received(data)
self.assertEqual(result, 36)
@@ -54,8 +65,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.error.__class__, BadRequest)
def test_received_bad_transfer_encoding(self):
- from waitress.utilities import ServerNotImplemented
-
data = (
b"GET /foobar HTTP/1.1\r\n"
b"Transfer-Encoding: foo\r\n"
@@ -89,7 +98,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(result, 0)
def test_received_cl_too_large(self):
- from waitress.utilities import RequestEntityTooLarge
self.parser.adj.max_request_body_size = 2
data = b"GET /foobar HTTP/8.4\r\nContent-Length: 10\r\n\r\n"
@@ -99,7 +107,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(isinstance(self.parser.error, RequestEntityTooLarge))
def test_received_headers_too_large(self):
- from waitress.utilities import RequestHeaderFieldsTooLarge
self.parser.adj.max_request_header_size = 2
data = b"GET /foobar HTTP/8.4\r\nX-Foo: 1\r\n\r\n"
@@ -109,8 +116,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(isinstance(self.parser.error, RequestHeaderFieldsTooLarge))
def test_received_body_too_large(self):
- from waitress.utilities import RequestEntityTooLarge
-
self.parser.adj.max_request_body_size = 2
data = (
b"GET /foobar HTTP/1.1\r\n"
@@ -129,8 +134,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(isinstance(self.parser.error, RequestEntityTooLarge))
def test_received_error_from_parser(self):
- from waitress.utilities import BadRequest
-
data = (
b"GET /foobar HTTP/1.1\r\n"
b"Transfer-Encoding: chunked\r\n"
@@ -171,8 +174,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers["FOO"], "bar")
def test_parse_header_no_cr_in_headerplus(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4"
try:
@@ -183,8 +184,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_bad_content_length(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\r\ncontent-length: abc\r\n"
try:
@@ -195,8 +194,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_multiple_content_length(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\r\ncontent-length: 10\r\ncontent-length: 20\r\n"
try:
@@ -213,8 +210,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.body_rcv.__class__.__name__, "ChunkedReceiver")
def test_parse_header_transfer_encoding_invalid(self):
- from waitress.parser import TransferEncodingNotImplemented
-
data = b"GET /foobar HTTP/1.1\r\ntransfer-encoding: gzip\r\n"
try:
@@ -225,7 +220,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_transfer_encoding_invalid_multiple(self):
- from waitress.parser import TransferEncodingNotImplemented
data = b"GET /foobar HTTP/1.1\r\ntransfer-encoding: gzip\r\ntransfer-encoding: chunked\r\n"
@@ -237,8 +231,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_transfer_encoding_invalid_whitespace(self):
- from waitress.parser import TransferEncodingNotImplemented
-
data = b"GET /foobar HTTP/1.1\r\nTransfer-Encoding:\x85chunked\r\n"
try:
@@ -249,8 +241,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_transfer_encoding_invalid_unicode(self):
- from waitress.parser import TransferEncodingNotImplemented
-
# This is the binary encoding for the UTF-8 character
# https://www.compart.com/en/unicode/U+212A "unicode character "K""
# which if waitress were to accidentally do the wrong thing get
@@ -286,8 +276,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.parser.close() # doesn't raise
def test_parse_header_lf_only(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\nfoo: bar"
try:
@@ -298,8 +286,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_cr_only(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\rfoo: bar"
try:
self.parser.parse_header(data)
@@ -309,8 +295,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_extra_lf_in_header(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\r\nfoo: \nbar\r\n"
try:
self.parser.parse_header(data)
@@ -320,8 +304,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_extra_lf_in_first_line(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar\n HTTP/8.4\r\n"
try:
self.parser.parse_header(data)
@@ -331,8 +313,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_invalid_whitespace(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/8.4\r\nfoo : bar\r\n"
try:
self.parser.parse_header(data)
@@ -342,8 +322,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_invalid_whitespace_vtab(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo:\x0bbar\r\n"
try:
self.parser.parse_header(data)
@@ -353,8 +331,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_invalid_no_colon(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nnotvalid\r\n"
try:
self.parser.parse_header(data)
@@ -364,8 +340,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_invalid_folding_spacing(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\n\t\x0bbaz\r\n"
try:
self.parser.parse_header(data)
@@ -375,8 +349,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_invalid_chars(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nfoo: \x0bbaz\r\n"
try:
self.parser.parse_header(data)
@@ -386,8 +358,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_empty(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nempty:\r\n"
self.parser.parse_header(data)
@@ -397,8 +367,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers["FOO"], "bar")
def test_parse_header_multiple_values(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever, more, please, yes\r\n"
self.parser.parse_header(data)
@@ -406,8 +374,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers["FOO"], "bar, whatever, more, please, yes")
def test_parse_header_multiple_values_header_folded(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever,\r\n more, please, yes\r\n"
self.parser.parse_header(data)
@@ -415,8 +381,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers["FOO"], "bar, whatever, more, please, yes")
def test_parse_header_multiple_values_header_folded_multiple(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever,\r\n more\r\nfoo: please, yes\r\n"
self.parser.parse_header(data)
@@ -425,8 +389,6 @@ class TestHTTPRequestParser(unittest.TestCase):
def test_parse_header_multiple_values_extra_space(self):
# Tests errata from: https://www.rfc-editor.org/errata_search.php?rfc=7230&eid=4189
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: abrowser/0.001 (C O M M E N T)\r\n"
self.parser.parse_header(data)
@@ -434,8 +396,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertEqual(self.parser.headers["FOO"], "abrowser/0.001 (C O M M E N T)")
def test_parse_header_invalid_backtrack_bad(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nfoo: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\x10\r\n"
try:
self.parser.parse_header(data)
@@ -445,8 +405,6 @@ class TestHTTPRequestParser(unittest.TestCase):
self.assertTrue(False)
def test_parse_header_short_values(self):
- from waitress.parser import ParsingError
-
data = b"GET /foobar HTTP/1.1\r\none: 1\r\ntwo: 22\r\n"
self.parser.parse_header(data)
@@ -458,8 +416,6 @@ class TestHTTPRequestParser(unittest.TestCase):
class Test_split_uri(unittest.TestCase):
def _callFUT(self, uri):
- from waitress.parser import split_uri
-
(
self.proxy_scheme,
self.proxy_netloc,
@@ -499,7 +455,6 @@ class Test_split_uri(unittest.TestCase):
def test_split_uri_unicode_error_raises_parsing_error(self):
# See https://github.com/Pylons/waitress/issues/64
- from waitress.parser import ParsingError
# Either pass or throw a ParsingError, just don't throw another type of
# exception as that will cause the connection to close badly:
@@ -535,8 +490,6 @@ class Test_split_uri(unittest.TestCase):
class Test_get_header_lines(unittest.TestCase):
def _callFUT(self, data):
- from waitress.parser import get_header_lines
-
return get_header_lines(data)
def test_get_header_lines(self):
@@ -561,15 +514,11 @@ class Test_get_header_lines(unittest.TestCase):
def test_get_header_lines_malformed(self):
# https://corte.si/posts/code/pathod/pythonservers/index.html
- from waitress.parser import ParsingError
-
self.assertRaises(ParsingError, self._callFUT, b" Host: localhost\r\n\r\n")
class Test_crack_first_line(unittest.TestCase):
def _callFUT(self, line):
- from waitress.parser import crack_first_line
-
return crack_first_line(line)
def test_crack_first_line_matchok(self):
@@ -577,8 +526,6 @@ class Test_crack_first_line(unittest.TestCase):
self.assertEqual(result, (b"GET", b"/", b"1.0"))
def test_crack_first_line_lowercase_method(self):
- from waitress.parser import ParsingError
-
self.assertRaises(ParsingError, self._callFUT, b"get / HTTP/1.0")
def test_crack_first_line_nomatch(self):
@@ -595,9 +542,6 @@ class Test_crack_first_line(unittest.TestCase):
class TestHTTPRequestParserIntegration(unittest.TestCase):
def setUp(self):
- from waitress.parser import HTTPRequestParser
- from waitress.adjustments import Adjustments
-
my_adj = Adjustments()
self.parser = HTTPRequestParser(my_adj)
@@ -657,8 +601,8 @@ class TestHTTPRequestParserIntegration(unittest.TestCase):
)
# path should be utf-8 encoded
self.assertEqual(
- tobytes(parser.path).decode("utf-8"),
- text_(b"/foo/a++/\xc3\xa4=&a:int", "utf-8"),
+ parser.path.encode("latin-1").decode("utf-8"),
+ b"/foo/a++/\xc3\xa4=&a:int".decode("utf-8"),
)
self.assertEqual(
parser.query, "d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6"
@@ -721,7 +665,19 @@ class TestHTTPRequestParserIntegration(unittest.TestCase):
self.assertEqual(self.parser.headers, {"CONTENT_LENGTH": "6",})
-class DummyBodyStream(object):
+class Test_unquote_bytes_to_wsgi(unittest.TestCase):
+ def _callFUT(self, v):
+
+ return unquote_bytes_to_wsgi(v)
+
+ def test_highorder(self):
+ val = b"/a%C5%9B"
+ result = self._callFUT(val)
+ # PEP 3333 urlunquoted-latin1-decoded-bytes
+ self.assertEqual(result, "/aÅ\x9b")
+
+
+class DummyBodyStream:
def getfile(self):
return self
diff --git a/tests/test_proxy_headers.py b/tests/test_proxy_headers.py
index 15b4a08..e6f0ed6 100644
--- a/tests/test_proxy_headers.py
+++ b/tests/test_proxy_headers.py
@@ -1,7 +1,5 @@
import unittest
-from waitress.compat import tobytes
-
class TestProxyHeadersMiddleware(unittest.TestCase):
def _makeOne(self, app, **kw):
@@ -18,7 +16,7 @@ class TestProxyHeadersMiddleware(unittest.TestCase):
response.headers = response_headers
response.steps = list(app(environ, start_response))
- response.body = b"".join(tobytes(s) for s in response.steps)
+ response.body = b"".join(s.encode("latin-1") for s in response.steps)
return response
def test_get_environment_values_w_scheme_override_untrusted(self):
@@ -681,7 +679,7 @@ class TestProxyHeadersMiddleware(unittest.TestCase):
self.assertIn(b'Header "X-Forwarded-Host" malformed', response.body)
-class DummyLogger(object):
+class DummyLogger:
def __init__(self):
self.logged = []
@@ -689,14 +687,14 @@ class DummyLogger(object):
self.logged.append(msg % args)
-class DummyApp(object):
+class DummyApp:
def __call__(self, environ, start_response):
self.environ = environ
start_response("200 OK", [("Content-Type", "text/plain")])
yield "hello"
-class DummyResponse(object):
+class DummyResponse:
status = None
headers = None
body = None
diff --git a/tests/test_receiver.py b/tests/test_receiver.py
index b4910bb..f55aa68 100644
--- a/tests/test_receiver.py
+++ b/tests/test_receiver.py
@@ -226,7 +226,7 @@ class TestChunkedReceiver(unittest.TestCase):
self.assertEqual(inst.error, None)
-class DummyBuffer(object):
+class DummyBuffer:
def __init__(self, data=None):
if data is None:
data = []
diff --git a/tests/test_runner.py b/tests/test_runner.py
index e53018b..4cf6f6f 100644
--- a/tests/test_runner.py
+++ b/tests/test_runner.py
@@ -12,17 +12,17 @@ from waitress import runner
class Test_match(unittest.TestCase):
def test_empty(self):
- self.assertRaisesRegexp(
+ self.assertRaisesRegex(
ValueError, "^Malformed application ''$", runner.match, ""
)
def test_module_only(self):
- self.assertRaisesRegexp(
+ self.assertRaisesRegex(
ValueError, r"^Malformed application 'foo\.bar'$", runner.match, "foo.bar"
)
def test_bad_module(self):
- self.assertRaisesRegexp(
+ self.assertRaisesRegex(
ValueError,
r"^Malformed application 'foo#bar:barney'$",
runner.match,
@@ -42,7 +42,7 @@ class Test_resolve(unittest.TestCase):
)
def test_nonexistent_function(self):
- self.assertRaisesRegexp(
+ self.assertRaisesRegex(
AttributeError,
r"has no attribute 'nonexistent_function'",
runner.resolve,
@@ -57,7 +57,7 @@ class Test_resolve(unittest.TestCase):
def test_complex_happy_path(self):
# Ensure we can recursively resolve object attributes if necessary.
- self.assertEquals(runner.resolve("os.path", "exists.__name__"), "exists")
+ self.assertEqual(runner.resolve("os.path", "exists.__name__"), "exists")
class Test_run(unittest.TestCase):
@@ -65,7 +65,7 @@ class Test_run(unittest.TestCase):
argv = ["waitress-serve"] + argv
with capture() as captured:
self.assertEqual(runner.run(argv=argv), code)
- self.assertRegexpMatches(captured.getvalue(), regex)
+ self.assertRegex(captured.getvalue(), regex)
captured.close()
def test_bad(self):
@@ -119,7 +119,7 @@ class Test_run(unittest.TestCase):
)
def test_simple_call(self):
- import tests.fixtureapps.runner as _apps
+ from tests.fixtureapps import runner as _apps
def check_server(app, **kw):
self.assertIs(app, _apps.app)
@@ -133,7 +133,7 @@ class Test_run(unittest.TestCase):
self.assertEqual(runner.run(argv=argv, _serve=check_server), 0)
def test_returned_app(self):
- import tests.fixtureapps.runner as _apps
+ from tests.fixtureapps import runner as _apps
def check_server(app, **kw):
self.assertIs(app, _apps.app)
@@ -162,7 +162,7 @@ class Test_helper(unittest.TestCase):
raise ImportError("My reason")
except ImportError:
self.assertEqual(show_exception(sys.stderr), None)
- self.assertRegexpMatches(captured.getvalue(), regex)
+ self.assertRegex(captured.getvalue(), regex)
captured.close()
regex = (
@@ -175,15 +175,15 @@ class Test_helper(unittest.TestCase):
raise ImportError
except ImportError:
self.assertEqual(show_exception(sys.stderr), None)
- self.assertRegexpMatches(captured.getvalue(), regex)
+ self.assertRegex(captured.getvalue(), regex)
captured.close()
@contextlib.contextmanager
def capture():
- from waitress.compat import NativeIO
+ from io import StringIO
- fd = NativeIO()
+ fd = StringIO()
sys.stdout = fd
sys.stderr = fd
yield fd
diff --git a/tests/test_server.py b/tests/test_server.py
index 9134fb8..05f6b4e 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -240,7 +240,7 @@ class TestWSGIServer(unittest.TestCase):
inst.adj = DummyAdj
def foo():
- raise socket.error
+ raise OSError
inst.accept = foo
inst.logger = DummyLogger()
@@ -263,7 +263,7 @@ class TestWSGIServer(unittest.TestCase):
def test_maintenance(self):
inst = self._makeOneWithMap()
- class DummyChannel(object):
+ class DummyChannel:
requests = []
zombie = DummyChannel()
@@ -274,8 +274,8 @@ class TestWSGIServer(unittest.TestCase):
self.assertEqual(zombie.will_close, True)
def test_backward_compatibility(self):
- from waitress.server import WSGIServer, TcpWSGIServer
from waitress.adjustments import Adjustments
+ from waitress.server import TcpWSGIServer, WSGIServer
self.assertTrue(WSGIServer is TcpWSGIServer)
self.inst = WSGIServer(None, _start=False, port=1234)
@@ -411,8 +411,8 @@ if hasattr(socket, "AF_UNIX"):
def test_create_with_unix_socket(self):
from waitress.server import (
- MultiSocketServer,
BaseWSGIServer,
+ MultiSocketServer,
TcpWSGIServer,
UnixWSGIServer,
)
@@ -479,7 +479,7 @@ class DummySock(socket.socket):
pass
-class DummyTaskDispatcher(object):
+class DummyTaskDispatcher:
def __init__(self):
self.tasks = []
@@ -490,7 +490,7 @@ class DummyTaskDispatcher(object):
self.was_shutdown = True
-class DummyTask(object):
+class DummyTask:
serviced = False
start_response_called = False
wrote_header = False
@@ -512,12 +512,12 @@ class DummyAdj:
channel_timeout = 300
-class DummyAsyncore(object):
+class DummyAsyncore:
def loop(self, timeout=30.0, use_poll=False, map=None, count=None):
raise SystemExit
-class DummyTrigger(object):
+class DummyTrigger:
def pull_trigger(self):
self.pulled = True
@@ -525,7 +525,7 @@ class DummyTrigger(object):
pass
-class DummyLogger(object):
+class DummyLogger:
def __init__(self):
self.logged = []
diff --git a/tests/test_task.py b/tests/test_task.py
index 6466823..0965bf5 100644
--- a/tests/test_task.py
+++ b/tests/test_task.py
@@ -1,5 +1,5 @@
-import unittest
import io
+import unittest
class TestThreadedTaskDispatcher(unittest.TestCase):
@@ -15,7 +15,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
class BadDummyTask(DummyTask):
def service(self):
- super(BadDummyTask, self).service()
+ super().service()
inst.stop_count += 1
raise Exception
@@ -400,7 +400,7 @@ class TestWSGITask(unittest.TestCase):
inst = self._makeOne()
def execute():
- raise socket.error
+ raise OSError
inst.execute = execute
self.assertRaises(socket.error, inst.service)
@@ -922,7 +922,7 @@ class TestErrorTask(unittest.TestCase):
self.assertEqual(lines[8], b"(generated by waitress)")
-class DummyTask(object):
+class DummyTask:
serviced = False
cancelled = False
@@ -933,7 +933,7 @@ class DummyTask(object):
self.cancelled = True
-class DummyAdj(object):
+class DummyAdj:
log_socket_errors = True
ident = "waitress"
host = "127.0.0.1"
@@ -941,7 +941,7 @@ class DummyAdj(object):
url_prefix = ""
-class DummyServer(object):
+class DummyServer:
server_name = "localhost"
effective_port = 80
@@ -949,7 +949,7 @@ class DummyServer(object):
self.adj = DummyAdj()
-class DummyChannel(object):
+class DummyChannel:
closed_when_done = False
adj = DummyAdj()
creation_time = 0
@@ -970,7 +970,7 @@ class DummyChannel(object):
return len(data)
-class DummyParser(object):
+class DummyParser:
version = "1.0"
command = "GET"
path = "/"
@@ -990,7 +990,7 @@ def filter_lines(s):
return list(filter(None, s.split(b"\r\n")))
-class DummyLogger(object):
+class DummyLogger:
def __init__(self):
self.logged = []
diff --git a/tests/test_trigger.py b/tests/test_trigger.py
index af740f6..265679a 100644
--- a/tests/test_trigger.py
+++ b/tests/test_trigger.py
@@ -1,6 +1,6 @@
-import unittest
import os
import sys
+import unittest
if not sys.platform.startswith("win"):
diff --git a/tests/test_utilities.py b/tests/test_utilities.py
index 15cd24f..ea08477 100644
--- a/tests/test_utilities.py
+++ b/tests/test_utilities.py
@@ -39,16 +39,17 @@ class Test_parse_http_date(unittest.TestCase):
class Test_build_http_date(unittest.TestCase):
def test_rountdrip(self):
- from waitress.utilities import build_http_date, parse_http_date
from time import time
+ from waitress.utilities import build_http_date, parse_http_date
+
t = int(time())
self.assertEqual(t, parse_http_date(build_http_date(t)))
class Test_unpack_rfc850(unittest.TestCase):
def _callFUT(self, val):
- from waitress.utilities import unpack_rfc850, rfc850_reg
+ from waitress.utilities import rfc850_reg, unpack_rfc850
return unpack_rfc850(rfc850_reg.match(val.lower()))
@@ -60,7 +61,7 @@ class Test_unpack_rfc850(unittest.TestCase):
class Test_unpack_rfc_822(unittest.TestCase):
def _callFUT(self, val):
- from waitress.utilities import unpack_rfc822, rfc822_reg
+ from waitress.utilities import rfc822_reg, unpack_rfc822
return unpack_rfc822(rfc822_reg.match(val.lower()))
diff --git a/tests/test_wasyncore.py b/tests/test_wasyncore.py
index 9c23509..970e993 100644
--- a/tests/test_wasyncore.py
+++ b/tests/test_wasyncore.py
@@ -1,21 +1,21 @@
-from waitress import wasyncore as asyncore
-from waitress import compat
+import _thread as thread
import contextlib
+import errno
import functools
import gc
-import unittest
-import select
+from io import BytesIO
import os
-import socket
-import sys
-import time
-import errno
import re
+import select
+import socket
import struct
+import sys
import threading
+import time
+import unittest
import warnings
-from io import BytesIO
+from waitress import compat, wasyncore as asyncore
TIMEOUT = 3
HAS_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
@@ -24,6 +24,7 @@ HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
# Filename used for testing
+
if os.name == "java": # pragma: no cover
# Jython disallows @ in module names
TESTFN = "$test"
@@ -33,7 +34,7 @@ else:
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-class DummyLogger(object): # pragma: no cover
+class DummyLogger: # pragma: no cover
def __init__(self):
self.messages = []
@@ -41,7 +42,7 @@ class DummyLogger(object): # pragma: no cover
self.messages.append((severity, message))
-class WarningsRecorder(object): # pragma: no cover
+class WarningsRecorder: # pragma: no cover
"""Convenience wrapper for the warnings list returned on
entry to the warnings.catch_warnings() context manager.
"""
@@ -67,6 +68,7 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover
# in order to re-raise the warnings.
frame = sys._getframe(2)
registry = frame.f_globals.get("__warningregistry__")
+
if registry:
registry.clear()
with warnings.catch_warnings(record=True) as w:
@@ -78,19 +80,25 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover
# Filter the recorded warnings
reraise = list(w)
missing = []
+
for msg, cat in filters:
seen = False
+
for w in reraise[:]:
warning = w.message
# Filter out the matching messages
+
if re.match(msg, str(warning), re.I) and issubclass(warning.__class__, cat):
seen = True
reraise.remove(w)
+
if not seen and not quiet:
# This filter caught nothing
missing.append((msg, cat.__name__))
+
if reraise:
raise AssertionError("unhandled warning %s" % reraise[0])
+
if missing:
raise AssertionError("filter (%r, %s) did not catch any warning" % missing[0])
@@ -111,11 +119,14 @@ def check_warnings(*filters, **kwargs): # pragma: no cover
check_warnings(("", Warning), quiet=True)
"""
quiet = kwargs.get("quiet")
+
if not filters:
filters = (("", Warning),)
# Preserve backward compatibility
+
if quiet is None:
quiet = True
+
return _filterwarnings(filters, quiet)
@@ -130,6 +141,7 @@ def gc_collect(): # pragma: no cover
objects to disappear.
"""
gc.collect()
+
if sys.platform.startswith("java"):
time.sleep(0.1)
gc.collect()
@@ -137,7 +149,7 @@ def gc_collect(): # pragma: no cover
def threading_setup(): # pragma: no cover
- return (compat.thread._count(), None)
+ return (thread._count(), None)
def threading_cleanup(*original_values): # pragma: no cover
@@ -146,7 +158,8 @@ def threading_cleanup(*original_values): # pragma: no cover
_MAX_COUNT = 100
for count in range(_MAX_COUNT):
- values = (compat.thread._count(), None)
+ values = (thread._count(), None)
+
if values == original_values:
break
@@ -186,6 +199,7 @@ def join_thread(thread, timeout=30.0): # pragma: no cover
after timeout seconds.
"""
thread.join(timeout)
+
if thread.is_alive():
msg = "failed to join the thread in %.1f seconds" % timeout
raise AssertionError(msg)
@@ -213,6 +227,7 @@ def bind_port(sock, host=HOST): # pragma: no cover
"tests should never set the SO_REUSEADDR "
"socket option on TCP/IP sockets!"
)
+
if hasattr(socket, "SO_REUSEPORT"):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
@@ -225,11 +240,13 @@ def bind_port(sock, host=HOST): # pragma: no cover
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
+
if hasattr(socket, "SO_EXCLUSIVEADDRUSE"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
+
return port
@@ -303,13 +320,16 @@ def capture_server(evt, buf, serv): # pragma no cover
else:
n = 200
start = time.time()
+
while n > 0 and time.time() - start < 3.0:
r, w, e = select.select([conn], [], [], 0.1)
+
if r:
n -= 1
data = conn.recv(10)
# keep everything except for the newline terminator
buf.write(data.replace(b"\n", b""))
+
if b"\n" in data:
break
time.sleep(0.01)
@@ -332,6 +352,7 @@ def bind_unix_socket(sock, addr): # pragma: no cover
def bind_af_aware(sock, addr):
"""Helper function to bind a socket according to its family."""
+
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.
unlink(addr)
@@ -346,6 +367,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Perform the operation
func(pathname)
# Now setup the wait loop
+
if waitall:
dirname = pathname
else:
@@ -358,6 +380,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
+
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
@@ -367,6 +390,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
+
if not (L if waitall else name in L):
return
# Increase the timeout and try again
@@ -395,17 +419,20 @@ def unlink(filename):
def _is_ipv6_enabled(): # pragma: no cover
"""Check whether IPv6 is enabled on this host."""
+
if compat.HAS_IPV6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
+
return True
- except socket.error:
+ except OSError:
pass
finally:
if sock:
sock.close()
+
return False
@@ -487,6 +514,7 @@ class HelperFunctionTests(unittest.TestCase):
# Only the attribute modified by the routine we expect to be
# called should be True.
+
for attr in attributes:
self.assertEqual(getattr(tobj, attr), attr == expectedattr)
@@ -513,6 +541,7 @@ class HelperFunctionTests(unittest.TestCase):
l = []
testmap = {}
+
for i in range(10):
c = dummychannel()
l.append(c)
@@ -606,6 +635,7 @@ class DispatcherTests(unittest.TestCase):
def test_strerror(self):
# refers to bug #8573
err = asyncore._strerror(errno.EPERM)
+
if hasattr(os, "strerror"):
self.assertEqual(err, os.strerror(errno.EPERM))
err = asyncore._strerror(-1)
@@ -656,6 +686,7 @@ class DispatcherWithSendTests(unittest.TestCase):
d.send(b"\n")
n = 1000
+
while d.out_buffer and n > 0: # pragma: no cover
asyncore.poll()
n -= 1
@@ -723,6 +754,7 @@ class FileWrapperTest(unittest.TestCase):
def test_resource_warning(self):
# Issue #11453
got_warning = False
+
while got_warning is False:
# we try until we get the outcome we want because this
# test is not deterministic (gc_collect() may not
@@ -732,7 +764,7 @@ class FileWrapperTest(unittest.TestCase):
os.close(fd)
try:
- with check_warnings(("", compat.ResourceWarning)):
+ with check_warnings(("", ResourceWarning)):
f = None
gc_collect()
except AssertionError: # pragma: no cover
@@ -819,8 +851,10 @@ class BaseTestAPI:
def loop_waiting_for_flag(self, instance, timeout=5): # pragma: no cover
timeout = float(timeout) / 100
count = 100
+
while asyncore.socket_map and count > 0:
asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
+
if instance.flag:
return
count -= 1
@@ -966,6 +1000,7 @@ class BaseTestAPI:
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
+
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
@@ -980,7 +1015,7 @@ class BaseTestAPI:
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
- self.socket.send(compat.tobytes(chr(244)), socket.MSG_OOB)
+ self.socket.send(chr(244).encode("latin-1"), socket.MSG_OOB)
server = BaseServer(self.family, self.addr, TestHandler)
client = TestClient(self.family, server.address)
@@ -1082,6 +1117,7 @@ class BaseTestAPI:
@reap_threads
def test_quick_connect(self): # pragma: no cover
# see: http://bugs.python.org/issue10340
+
if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
self.skipTest("test specific to AF_INET and AF_INET6")
@@ -1420,7 +1456,7 @@ class Test_dispatcher(unittest.TestCase):
sock = dummysocket()
def getpeername():
- raise socket.error(errno.EBADF)
+ raise OSError(errno.EBADF)
map = {}
sock.getpeername = getpeername
@@ -1454,7 +1490,7 @@ class Test_dispatcher(unittest.TestCase):
def setsockopt(*arg, **kw):
sock.errored = True
- raise socket.error
+ raise OSError
sock.setsockopt = setsockopt
sock.getsockopt = lambda *arg: 0
@@ -1486,7 +1522,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def accept(*arg, **kw):
- raise socket.error(122)
+ raise OSError(122)
sock.accept = accept
inst = self._makeOne(sock=sock, map=map)
@@ -1497,7 +1533,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def send(*arg, **kw):
- raise socket.error(errno.EWOULDBLOCK)
+ raise OSError(errno.EWOULDBLOCK)
sock.send = send
inst = self._makeOne(sock=sock, map=map)
@@ -1509,7 +1545,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def send(*arg, **kw):
- raise socket.error(122)
+ raise OSError(122)
sock.send = send
inst = self._makeOne(sock=sock, map=map)
@@ -1520,7 +1556,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def recv(*arg, **kw):
- raise socket.error(errno.ECONNRESET)
+ raise OSError(errno.ECONNRESET)
def handle_close():
inst.close_handled = True
@@ -1537,7 +1573,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def close():
- raise socket.error(122)
+ raise OSError(122)
sock.close = close
inst = self._makeOne(sock=sock, map=map)
@@ -1680,7 +1716,7 @@ class Test_close_all(unittest.TestCase):
self.assertRaises(RuntimeError, self._callFUT, map)
-class DummyDispatcher(object):
+class DummyDispatcher:
read_event_handled = False
write_event_handled = False
expt_event_handled = False
@@ -1693,16 +1729,19 @@ class DummyDispatcher(object):
def handle_read_event(self):
self.read_event_handled = True
+
if self.exc is not None:
raise self.exc
def handle_write_event(self):
self.write_event_handled = True
+
if self.exc is not None:
raise self.exc
def handle_expt_event(self):
self.expt_event_handled = True
+
if self.exc is not None:
raise self.exc
@@ -1723,7 +1762,7 @@ class DummyDispatcher(object):
raise self.exc
-class DummyTime(object):
+class DummyTime:
def __init__(self):
self.sleepvals = []
@@ -1731,7 +1770,7 @@ class DummyTime(object):
self.sleepvals.append(val)
-class DummySelect(object):
+class DummySelect:
error = select.error
def __init__(self, exc=None, pollster=None):
@@ -1741,6 +1780,7 @@ class DummySelect(object):
def select(self, *arg):
self.selected.append(arg)
+
if self.exc is not None:
raise self.exc
@@ -1748,13 +1788,14 @@ class DummySelect(object):
return self.pollster
-class DummyPollster(object):
+class DummyPollster:
def __init__(self, exc=None):
self.polled = []
self.exc = exc
def poll(self, timeout):
self.polled.append(timeout)
+
if self.exc is not None:
raise self.exc
else: # pragma: no cover