summaryrefslogtreecommitdiff
path: root/test/unit/common/test_http_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/common/test_http_protocol.py')
-rw-r--r--test/unit/common/test_http_protocol.py148
1 files changed, 104 insertions, 44 deletions
diff --git a/test/unit/common/test_http_protocol.py b/test/unit/common/test_http_protocol.py
index 24e5225b2..b9962c7ff 100644
--- a/test/unit/common/test_http_protocol.py
+++ b/test/unit/common/test_http_protocol.py
@@ -15,6 +15,7 @@
from argparse import Namespace
from io import BytesIO
+import json
import mock
import types
import unittest
@@ -81,36 +82,14 @@ class TestSwiftHttpProtocol(unittest.TestCase):
], proto_obj.send_error.mock_calls)
self.assertEqual(('a', '123'), proto_obj.client_address)
- def test_request_line_cleanup(self):
- def do_test(line_from_socket, expected_line=None):
- if expected_line is None:
- expected_line = line_from_socket
-
- proto_obj = self._proto_obj()
- proto_obj.raw_requestline = line_from_socket
- with mock.patch('swift.common.http_protocol.wsgi.HttpProtocol') \
- as mock_super:
- proto_obj.parse_request()
-
- self.assertEqual([mock.call.parse_request(proto_obj)],
- mock_super.mock_calls)
- self.assertEqual(proto_obj.raw_requestline, expected_line)
-
- do_test(b'GET / HTTP/1.1')
- do_test(b'GET /%FF HTTP/1.1')
-
- if not six.PY2:
- do_test(b'GET /\xff HTTP/1.1', b'GET /%FF HTTP/1.1')
- do_test(b'PUT /Here%20Is%20A%20SnowMan:\xe2\x98\x83 HTTP/1.0',
- b'PUT /Here%20Is%20A%20SnowMan%3A%E2%98%83 HTTP/1.0')
- do_test(
- b'POST /?and%20it=fixes+params&'
- b'PALMTREE=\xf0%9f\x8c%b4 HTTP/1.1',
- b'POST /?and+it=fixes+params&PALMTREE=%F0%9F%8C%B4 HTTP/1.1')
+ def test_bad_request_line(self):
+ proto_obj = self._proto_obj()
+ proto_obj.raw_requestline = b'None //'
+ self.assertEqual(False, proto_obj.parse_request())
class ProtocolTest(unittest.TestCase):
- def _run_bytes_through_protocol(self, bytes_from_client):
+ def _run_bytes_through_protocol(self, bytes_from_client, app=None):
rfile = BytesIO(bytes_from_client)
wfile = BytesIO()
@@ -153,7 +132,7 @@ class ProtocolTest(unittest.TestCase):
with mock.patch.object(wfile, 'close', lambda: None), \
mock.patch.object(rfile, 'close', lambda: None):
eventlet.wsgi.server(
- fake_listen_socket, self.app,
+ fake_listen_socket, app or self.app,
protocol=self.protocol_class,
custom_pool=FakePool(),
log_output=False, # quiet the test run
@@ -170,37 +149,118 @@ class TestSwiftHttpProtocolSomeMore(ProtocolTest):
return [swob.wsgi_to_bytes(env['RAW_PATH_INFO'])]
def test_simple(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"GET /someurl HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertEqual(lines[0], b"HTTP/1.1 200 OK") # sanity check
self.assertEqual(lines[-1], b'/someurl')
def test_quoted(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"GET /some%fFpath%D8%AA HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertEqual(lines[0], b"HTTP/1.1 200 OK") # sanity check
self.assertEqual(lines[-1], b'/some%fFpath%D8%AA')
def test_messy(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"GET /oh\xffboy%what$now%E2%80%bd HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertEqual(lines[-1], b'/oh\xffboy%what$now%E2%80%bd')
+ def test_bad_request(self):
+ bytes_out = self._run_bytes_through_protocol((
+ b"ONLY-METHOD\r\n"
+ b"Server: example.com\r\n"
+ b"\r\n"
+ ))
+ lines = [l for l in bytes_out.split(b"\r\n") if l]
+ self.assertEqual(
+ lines[0], b"HTTP/1.1 400 Bad request syntax ('ONLY-METHOD')")
+ self.assertIn(b"Bad request syntax or unsupported method.", lines[-1])
+
+ def test_leading_slashes(self):
+ bytes_out = self._run_bytes_through_protocol((
+ b"GET ///some-leading-slashes HTTP/1.0\r\n"
+ b"User-Agent: blah blah blah\r\n"
+ b"\r\n"
+ ))
+ lines = [l for l in bytes_out.split(b"\r\n") if l]
+ self.assertEqual(lines[-1], b'///some-leading-slashes')
+
+ def test_request_lines(self):
+ def app(env, start_response):
+ start_response("200 OK", [])
+ if six.PY2:
+ return [json.dumps({
+ 'RAW_PATH_INFO': env['RAW_PATH_INFO'].decode('latin1'),
+ 'QUERY_STRING': (None if 'QUERY_STRING' not in env else
+ env['QUERY_STRING'].decode('latin1')),
+ }).encode('ascii')]
+ return [json.dumps({
+ 'RAW_PATH_INFO': env['RAW_PATH_INFO'],
+ 'QUERY_STRING': env.get('QUERY_STRING'),
+ }).encode('ascii')]
+
+ def do_test(request_line, expected):
+ bytes_out = self._run_bytes_through_protocol(
+ request_line + b'\r\n\r\n',
+ app,
+ )
+ print(bytes_out)
+ resp_body = bytes_out.partition(b'\r\n\r\n')[2]
+ self.assertEqual(json.loads(resp_body), expected)
+
+ do_test(b'GET / HTTP/1.1', {
+ 'RAW_PATH_INFO': u'/',
+ 'QUERY_STRING': None,
+ })
+ do_test(b'GET /%FF HTTP/1.1', {
+ 'RAW_PATH_INFO': u'/%FF',
+ 'QUERY_STRING': None,
+ })
+
+ do_test(b'GET /\xff HTTP/1.1', {
+ 'RAW_PATH_INFO': u'/\xff',
+ 'QUERY_STRING': None,
+ })
+ do_test(b'PUT /Here%20Is%20A%20SnowMan:\xe2\x98\x83 HTTP/1.0', {
+ 'RAW_PATH_INFO': u'/Here%20Is%20A%20SnowMan:\xe2\x98\x83',
+ 'QUERY_STRING': None,
+ })
+ do_test(
+ b'POST /?and%20it=does+nothing+to+params&'
+ b'PALMTREE=\xf0%9f\x8c%b4 HTTP/1.1', {
+ 'RAW_PATH_INFO': u'/',
+ 'QUERY_STRING': (u'and%20it=does+nothing+to+params'
+ u'&PALMTREE=\xf0%9f\x8c%b4'),
+ }
+ )
+ do_test(b'GET // HTTP/1.1', {
+ 'RAW_PATH_INFO': u'//',
+ 'QUERY_STRING': None,
+ })
+ do_test(b'GET //bar HTTP/1.1', {
+ 'RAW_PATH_INFO': u'//bar',
+ 'QUERY_STRING': None,
+ })
+ do_test(b'GET //////baz HTTP/1.1', {
+ 'RAW_PATH_INFO': u'//////baz',
+ 'QUERY_STRING': None,
+ })
+
class TestProxyProtocol(ProtocolTest):
protocol_class = http_protocol.SwiftHttpProxiedProtocol
@@ -222,12 +282,12 @@ class TestProxyProtocol(ProtocolTest):
return [body.encode("utf-8")]
def test_request_with_proxy(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"PROXY TCP4 192.168.0.1 192.168.0.11 56423 4433\r\n"
b"GET /someurl HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertEqual(lines[0], b"HTTP/1.1 200 OK") # sanity check
@@ -238,12 +298,12 @@ class TestProxyProtocol(ProtocolTest):
])
def test_request_with_proxy_https(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"PROXY TCP4 192.168.0.1 192.168.0.11 56423 443\r\n"
b"GET /someurl HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertEqual(lines[0], b"HTTP/1.1 200 OK") # sanity check
@@ -254,7 +314,7 @@ class TestProxyProtocol(ProtocolTest):
])
def test_multiple_requests_with_proxy(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
b"PROXY TCP4 192.168.0.1 192.168.0.11 56423 443\r\n"
b"GET /someurl HTTP/1.1\r\n"
b"User-Agent: something or other\r\n"
@@ -263,7 +323,7 @@ class TestProxyProtocol(ProtocolTest):
b"User-Agent: something or other\r\n"
b"Connection: close\r\n"
b"\r\n"
- ))
+ )
lines = bytes_out.split(b"\r\n")
self.assertEqual(lines[0], b"HTTP/1.1 200 OK") # sanity check
@@ -277,12 +337,12 @@ class TestProxyProtocol(ProtocolTest):
self.assertEqual(addr_lines, [b"https is on (scheme https)"] * 2)
def test_missing_proxy_line(self):
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
# whoops, no PROXY line here
b"GET /someurl HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n"
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertIn(b"400 Invalid PROXY line", lines[0])
@@ -303,12 +363,12 @@ class TestProxyProtocol(ProtocolTest):
for unknown_line in [b'PROXY UNKNOWN', # mimimal valid unknown
b'PROXY UNKNOWNblahblah', # also valid
b'PROXY UNKNOWN a b c d']:
- bytes_out = self._run_bytes_through_protocol((
+ bytes_out = self._run_bytes_through_protocol(
unknown_line + (b"\r\n"
b"GET /someurl HTTP/1.0\r\n"
b"User-Agent: something or other\r\n"
b"\r\n")
- ))
+ )
lines = [l for l in bytes_out.split(b"\r\n") if l]
self.assertIn(b"200 OK", lines[0])