diff options
Diffstat (limited to 'Lib/test/test_httplib.py')
-rw-r--r-- | Lib/test/test_httplib.py | 198 |
1 files changed, 174 insertions, 24 deletions
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 61ed6bbbd7..68f6946a3a 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -344,6 +344,134 @@ class HeaderTests(TestCase): conn.putheader(name, value) +class TransferEncodingTest(TestCase): + expected_body = b"It's just a flesh wound" + + def test_endheaders_chunked(self): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.putrequest('POST', '/') + conn.endheaders(self._make_body(), encode_chunked=True) + + _, _, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + + def test_explicit_headers(self): + # explicit chunked + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + # this shouldn't actually be automatically chunk-encoded because the + # calling code has explicitly stated that it's taking care of it + conn.request( + 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # explicit chunked, string body + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self.expected_body.decode('latin-1'), + {'Transfer-Encoding': 'chunked'}) + + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertEqual(body, self.expected_body) + + # User-specified TE, but request() does the chunk encoding + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request('POST', '/', + headers={'Transfer-Encoding': 'gzip, chunked'}, + encode_chunked=True, + body=self._make_body()) + _, headers, body = self._parse_request(conn.sock.data) + self.assertNotIn('content-length', [k.lower() for k in headers]) + self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') + self.assertEqual(self._parse_chunked(body), self.expected_body) + + def test_request(self): + for empty_lines in (False, True,): + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request( + 'POST', '/', self._make_body(empty_lines=empty_lines)) + + _, headers, body = self._parse_request(conn.sock.data) + body = self._parse_chunked(body) + self.assertEqual(body, self.expected_body) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + + # Content-Length and Transfer-Encoding SHOULD not be sent in the + # same request + self.assertNotIn('content-length', [k.lower() for k in headers]) + + def test_empty_body(self): + # Zero-length iterable should be treated like any other iterable + conn = client.HTTPConnection('example.com') + conn.sock = FakeSocket(b'') + conn.request('POST', '/', ()) + _, headers, body = self._parse_request(conn.sock.data) + self.assertEqual(headers['Transfer-Encoding'], 'chunked') + self.assertNotIn('content-length', [k.lower() for k in headers]) + self.assertEqual(body, b"0\r\n\r\n") + + def _make_body(self, empty_lines=False): + lines = self.expected_body.split(b' ') + for idx, line in enumerate(lines): + # for testing handling empty lines + if empty_lines and idx % 2: + yield b'' + if idx < len(lines) - 1: + yield line + b' ' + else: + yield line + + def _parse_request(self, data): + lines = data.split(b'\r\n') + request = lines[0] + headers = {} + n = 1 + while n < len(lines) and len(lines[n]) > 0: + key, val = lines[n].split(b':') + key = key.decode('latin-1').strip() + headers[key] = val.decode('latin-1').strip() + n += 1 + + return request, headers, b'\r\n'.join(lines[n + 1:]) + + def _parse_chunked(self, data): + body = [] + trailers = {} + n = 0 + lines = data.split(b'\r\n') + # parse body + while True: + size, chunk = lines[n:n+2] + size = int(size, 16) + + if size == 0: + n += 1 + break + + self.assertEqual(size, len(chunk)) + body.append(chunk) + + n += 2 + # we /should/ hit the end chunk, but check against the size of + # lines so we're not stuck in an infinite loop should we get + # malformed data + if n > len(lines): + break + + return b''.join(body) + + class BasicTest(TestCase): def test_status_lines(self): # Test HTTP status lines @@ -564,7 +692,9 @@ class BasicTest(TestCase): def test_send_file(self): expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' - b'Accept-Encoding: identity\r\nContent-Length:') + b'Accept-Encoding: identity\r\n' + b'Transfer-Encoding: chunked\r\n' + b'\r\n') with open(__file__, 'rb') as body: conn = client.HTTPConnection('example.com') @@ -594,11 +724,11 @@ class BasicTest(TestCase): yield None yield 'data_two' - class UpdatingFile(): + class UpdatingFile(io.TextIOBase): mode = 'r' d = data() def read(self, blocksize=-1): - return self.d.__next__() + return next(self.d) expected = b'data' @@ -970,6 +1100,7 @@ class BasicTest(TestCase): thread = threading.Thread(target=run_server) thread.start() + self.addCleanup(thread.join, float(1)) conn = client.HTTPConnection(*serv.getsockname()) conn.request("CONNECT", "dummy:1234") response = conn.getresponse() @@ -983,7 +1114,7 @@ class BasicTest(TestCase): finally: response.close() conn.close() - thread.join() + thread.join() self.assertEqual(result, b"proxied data\n") class ExtendedReadTest(TestCase): @@ -1511,14 +1642,16 @@ class HTTPSTest(TestCase): with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # Same with explicit check_hostname=True - h = client.HTTPSConnection('localhost', server.port, context=context, - check_hostname=True) + with support.check_warnings(('', DeprecationWarning)): + h = client.HTTPSConnection('localhost', server.port, + context=context, check_hostname=True) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # With check_hostname=False, the mismatching is ignored context.check_hostname = False - h = client.HTTPSConnection('localhost', server.port, context=context, - check_hostname=False) + with support.check_warnings(('', DeprecationWarning)): + h = client.HTTPSConnection('localhost', server.port, + context=context, check_hostname=False) h.request('GET', '/nonexistent') resp = h.getresponse() resp.close() @@ -1535,8 +1668,9 @@ class HTTPSTest(TestCase): h.close() # Passing check_hostname to HTTPSConnection should override the # context's setting. - h = client.HTTPSConnection('localhost', server.port, context=context, - check_hostname=True) + with support.check_warnings(('', DeprecationWarning)): + h = client.HTTPSConnection('localhost', server.port, + context=context, check_hostname=True) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') @@ -1575,6 +1709,26 @@ class RequestBodyTest(TestCase): message = client.parse_headers(f) return message, f + def test_list_body(self): + # Note that no content-length is automatically calculated for + # an iterable. The request will fall back to send chunked + # transfer encoding. + cases = ( + ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), + ) + for body, expected in cases: + with self.subTest(body): + self.conn = client.HTTPConnection('example.com') + self.conn.sock = self.sock = FakeSocket('') + + self.conn.request('PUT', '/url', body) + msg, f = self.get_headers_and_fp() + self.assertNotIn('Content-Type', msg) + self.assertNotIn('Content-Length', msg) + self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') + self.assertEqual(expected, f.read()) + def test_manual_content_length(self): # Set an incorrect content-length so that we can verify that # it will not be over-ridden by the library. @@ -1608,7 +1762,7 @@ class RequestBodyTest(TestCase): self.assertEqual("5", message.get("content-length")) self.assertEqual(b'body\xc1', f.read()) - def test_file_body(self): + def test_text_file_body(self): self.addCleanup(support.unlink, support.TESTFN) with open(support.TESTFN, "w") as f: f.write("body") @@ -1617,8 +1771,11 @@ class RequestBodyTest(TestCase): message, f = self.get_headers_and_fp() self.assertEqual("text/plain", message.get_content_type()) self.assertIsNone(message.get_charset()) - self.assertEqual("4", message.get("content-length")) - self.assertEqual(b'body', f.read()) + # No content-length will be determined for files; the body + # will be sent using chunked transfer encoding instead. + self.assertIsNone(message.get("content-length")) + self.assertEqual("chunked", message.get("transfer-encoding")) + self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) def test_binary_file_body(self): self.addCleanup(support.unlink, support.TESTFN) @@ -1629,8 +1786,9 @@ class RequestBodyTest(TestCase): message, f = self.get_headers_and_fp() self.assertEqual("text/plain", message.get_content_type()) self.assertIsNone(message.get_charset()) - self.assertEqual("5", message.get("content-length")) - self.assertEqual(b'body\xc1', f.read()) + self.assertEqual("chunked", message.get("Transfer-Encoding")) + self.assertNotIn("Content-Length", message) + self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) class HTTPResponseTest(TestCase): @@ -1741,13 +1899,5 @@ class TunnelTests(TestCase): self.assertIn('header: {}'.format(expected_header), lines) -@support.reap_threads -def test_main(verbose=None): - support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, - PersistenceTest, - HTTPSTest, RequestBodyTest, SourceAddressTest, - HTTPResponseTest, ExtendedReadTest, - ExtendedReadTestChunked, TunnelTests) - if __name__ == '__main__': - test_main() + unittest.main(verbosity=2) |