diff options
author | Bert JW Regeer <bertjw@regeer.org> | 2018-09-22 20:38:46 -0600 |
---|---|---|
committer | Bert JW Regeer <bertjw@regeer.org> | 2018-12-02 18:02:21 -0700 |
commit | d6fbdbaf9c4a08572c535f7de8252dd64eddc210 (patch) | |
tree | 6645f50575eb03e54fdaedc1a093e9a6e3160012 | |
parent | 7b05238c39265c8e4446264a3d4380df74f330fd (diff) | |
download | waitress-d6fbdbaf9c4a08572c535f7de8252dd64eddc210.tar.gz |
Add new tests for proxy header parsing
-rw-r--r-- | waitress/tests/test_task.py | 489 |
1 files changed, 475 insertions, 14 deletions
diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index 8fb2170..ce9783a 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -752,11 +752,13 @@ class TestWSGITask(unittest.TestCase): # nail the keys of environ self.assertEqual(sorted(environ.keys()), [ 'CONTENT_LENGTH', 'CONTENT_TYPE', 'HTTP_CONNECTION', 'HTTP_X_FOO', - 'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REQUEST_METHOD', - 'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL', - 'SERVER_SOFTWARE', 'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input', - 'wsgi.input_terminated', 'wsgi.multiprocess', 'wsgi.multithread', - 'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version']) + 'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REMOTE_HOST', + 'REMOTE_PORT', 'REQUEST_METHOD', 'SCRIPT_NAME', 'SERVER_NAME', + 'SERVER_PORT', 'SERVER_PROTOCOL', 'SERVER_SOFTWARE', 'wsgi.errors', + 'wsgi.file_wrapper', 'wsgi.input', 'wsgi.input_terminated', + 'wsgi.multiprocess', 'wsgi.multithread', 'wsgi.run_once', + 'wsgi.url_scheme', 'wsgi.version' + ]) self.assertEqual(environ['REQUEST_METHOD'], 'GET') self.assertEqual(environ['SERVER_PORT'], '80') @@ -768,6 +770,8 @@ class TestWSGITask(unittest.TestCase): self.assertEqual(environ['PATH_INFO'], '/') self.assertEqual(environ['QUERY_STRING'], 'abc') self.assertEqual(environ['REMOTE_ADDR'], '127.0.0.1') + self.assertEqual(environ['REMOTE_HOST'], '127.0.0.1') + self.assertEqual(environ['REMOTE_PORT'], '39830') self.assertEqual(environ['CONTENT_TYPE'], 'abc') self.assertEqual(environ['CONTENT_LENGTH'], '10') self.assertEqual(environ['HTTP_X_FOO'], 'BAR') @@ -799,8 +803,9 @@ class TestWSGITask(unittest.TestCase): def test_get_environment_values_w_scheme_override_trusted(self): import sys inst = self._makeOne() - inst.channel.addr = ['192.168.1.1'] + inst.channel.addr = ['192.168.1.1', 8080] inst.channel.server.adj.trusted_proxy = '192.168.1.1' + inst.channel.server.adj.trusted_proxy_headers = {'x-forwarded-proto'} request = DummyParser() request.headers = { 'CONTENT_TYPE': 'abc', @@ -816,14 +821,16 @@ class TestWSGITask(unittest.TestCase): # nail the keys of environ self.assertEqual(sorted(environ.keys()), [ 'CONTENT_LENGTH', 'CONTENT_TYPE', 'HTTP_CONNECTION', 'HTTP_X_FOO', - 'PATH_INFO', 'QUERY_STRING', 'REMOTE_ADDR', 'REQUEST_METHOD', - 'SCRIPT_NAME', 'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL', - 'SERVER_SOFTWARE', 'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input', + 'HTTP_X_FORWARDED_PROTO', 'PATH_INFO', 'QUERY_STRING', + 'REMOTE_ADDR', 'REMOTE_HOST', 'REQUEST_METHOD', 'SCRIPT_NAME', + 'SERVER_NAME', 'SERVER_PORT', 'SERVER_PROTOCOL', 'SERVER_SOFTWARE', + 'wsgi.errors', 'wsgi.file_wrapper', 'wsgi.input', 'wsgi.input_terminated', 'wsgi.multiprocess', 'wsgi.multithread', - 'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version']) + 'wsgi.run_once', 'wsgi.url_scheme', 'wsgi.version' + ]) self.assertEqual(environ['REQUEST_METHOD'], 'GET') - self.assertEqual(environ['SERVER_PORT'], '80') + self.assertEqual(environ['SERVER_PORT'], '443') self.assertEqual(environ['SERVER_NAME'], 'localhost') self.assertEqual(environ['SERVER_SOFTWARE'], 'waitress') self.assertEqual(environ['SERVER_PROTOCOL'], 'HTTP/1.0') @@ -849,6 +856,7 @@ class TestWSGITask(unittest.TestCase): inst = self._makeOne() inst.channel.addr = ['192.168.1.1'] inst.channel.server.adj.trusted_proxy = '192.168.1.1' + inst.channel.server.adj.trusted_proxy_headers = {'x-forwarded-proto'} request = DummyParser() request.headers = { 'CONTENT_TYPE': 'abc', @@ -861,6 +869,455 @@ class TestWSGITask(unittest.TestCase): inst.request = request self.assertRaises(ValueError, inst.get_environment) + + def test_parse_proxy_headers_forwarded_for(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-for'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '192.0.2.1') + + def test_parse_proxy_headers_forwarded_for_v6_missing_brackets(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '2001:db8::0' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-for'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '[2001:db8::0]') + + def test_parse_proxy_headers_forwared_for_multiple(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'x-forwarded-for'} + ) + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + + def test_parse_forwarded_multiple_proxies_trust_only_two(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For=192.0.2.1;host=fake.com, For=198.51.100.2;host=example.com:8080, For=203.0.113.1' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + + def test_parse_forwarded_multiple_proxies(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'for="[2001:db8::1]";host="example.com:8443";proto="https", for=192.0.2.1;host="example.internal:8080"' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '[2001:db8::1]') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8443') + self.assertEqual(environ['SERVER_PORT'], '8443') + self.assertEqual(environ['wsgi.url_scheme'], 'https') + + def test_parse_forwarded_multiple_proxies_minimal(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'for="[2001:db8::1]";proto="https", for=192.0.2.1;host="example.org"' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '[2001:db8::1]') + self.assertEqual(environ['SERVER_NAME'], 'example.org') + self.assertEqual(environ['HTTP_HOST'], 'example.org') + self.assertEqual(environ['SERVER_PORT'], '443') + self.assertEqual(environ['wsgi.url_scheme'], 'https') + + def test_parse_proxy_headers_forwarded_host_with_port(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1', + 'X_FORWARDED_PROTO': 'http', + 'X_FORWARDED_HOST': 'example.com:8080', + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + + def test_parse_proxy_headers_forwarded_host_without_port(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1', + 'X_FORWARDED_PROTO': 'http', + 'X_FORWARDED_HOST': 'example.com', + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com') + self.assertEqual(environ['SERVER_PORT'], '80') + + def test_parse_proxy_headers_forwarded_host_with_forwarded_port(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1', + 'X_FORWARDED_PROTO': 'http', + 'X_FORWARDED_HOST': 'example.com', + 'X_FORWARDED_PORT': '8080' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'x-forwarded-port'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + + def test_parse_proxy_headers_forwarded_host_multiple_with_forwarded_port(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1', + 'X_FORWARDED_PROTO': 'http', + 'X_FORWARDED_HOST': 'example.com, example.org', + 'X_FORWARDED_PORT': '8080' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=2, + trusted_proxy_headers={'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'x-forwarded-port'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + + def test_parse_proxy_headers_forwarded_host_multiple_with_forwarded_port_limit_one_trusted(self): + inst = self._makeOne() + + headers = { + 'X_FORWARDED_FOR': '192.0.2.1, 198.51.100.2, 203.0.113.1', + 'X_FORWARDED_PROTO': 'http', + 'X_FORWARDED_HOST': 'example.com, example.org', + 'X_FORWARDED_PORT': '8080' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'x-forwarded-port'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '203.0.113.1') + self.assertEqual(environ['SERVER_NAME'], 'example.org') + self.assertEqual(environ['HTTP_HOST'], 'example.org:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + + def test_parse_forwarded(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For=198.51.100.2:5858;host=example.com:8080;proto=https' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['REMOTE_PORT'], '5858') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + self.assertEqual(environ['wsgi.url_scheme'], 'https') + + def test_parse_forwarded_warning_other_proxy_headers(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_FOR': '[2001:db8::1]', + 'FORWARDED': 'For=198.51.100.2;host=example.com:8080;proto=https' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'}, + log_untrusted_proxy_headers=True + ) + + self.assertEqual(len(inst.logger.logged), 1) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + self.assertEqual(environ['wsgi.url_scheme'], 'https') + + def test_parse_forwarded_empty_pair(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For=198.51.100.2;;proto=https;by=_unused' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + + def test_parse_forwarded_pair_token_whitespace(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For=198.51.100.2; proto =https' + } + environ = {} + + with self.assertRaises(ValueError): + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + def test_parse_forwarded_pair_value_whitespace(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For= "198.51.100.2"; proto =https' + } + environ = {} + + with self.assertRaises(ValueError): + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + def test_parse_forwarded_pair_no_equals(self): + inst = self._makeOne() + + headers = { + 'FORWARDED': 'For' + } + environ = {} + + with self.assertRaises(ValueError): + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + def test_parse_forwarded_warning_unknown_token(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'FORWARDED': 'For=198.51.100.2;host=example.com:8080;proto=https;unknown="yolo"' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'forwarded'} + ) + + self.assertEqual(len(inst.logger.logged), 1) + self.assertIn('Unknown Forwarded token', inst.logger.logged[0]) + + self.assertEqual(environ['REMOTE_ADDR'], '198.51.100.2') + self.assertEqual(environ['SERVER_NAME'], 'example.com') + self.assertEqual(environ['HTTP_HOST'], 'example.com:8080') + self.assertEqual(environ['SERVER_PORT'], '8080') + self.assertEqual(environ['wsgi.url_scheme'], 'https') + + def test_parse_no_valid_proxy_headers(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_FOR': '198.51.100.2', + 'FORWARDED': 'For=198.51.100.2;host=example.com:8080;proto=https' + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + ) + + self.assertEqual(environ, {}) + + def test_parse_headers_contains_only_trusted(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_FOR': '198.51.100.2', + 'X_FORWARDED_BY': 'Waitress', + 'X_FORWARDED_PROTO': 'https', + 'X_FORWARDED_HOST': 'example.org', + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-by'} + ) + + self.assertEqual(environ, {}) + self.assertEqual(headers, {'X_FORWARDED_BY': 'Waitress'}) + + def test_parse_headers_contains_all_including_untrusted(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_FOR': '198.51.100.2', + 'X_FORWARDED_BY': 'Waitress', + 'X_FORWARDED_PROTO': 'https', + 'X_FORWARDED_HOST': 'example.org', + } + headers_orig = headers.copy() + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-by'}, + clear_untrusted_proxy_headers=False + ) + + self.assertEqual(environ, {}) + self.assertEqual(headers, headers_orig) + + def test_parse_multiple_x_forwarded_proto(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_PROTO': 'http, https', + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-proto'} + ) + + self.assertEqual(environ, {}) + self.assertEqual(len(inst.logger.logged), 1) + self.assertIn("Found multiple values in X-Forwarded-Proto", inst.logger.logged[0]) + + def test_parse_multiple_x_forwarded_port(self): + inst = self._makeOne() + inst.logger = DummyLogger() + + headers = { + 'X_FORWARDED_PORT': '443, 80', + } + environ = {} + inst.parse_proxy_headers( + environ, + headers, + trusted_proxy_count=1, + trusted_proxy_headers={'x-forwarded-port'} + ) + + self.assertEqual(environ, {}) + self.assertEqual(len(inst.logger.logged), 1) + self.assertIn("Found multiple values in X-Forwarded-Port", inst.logger.logged[0]) + + class TestErrorTask(unittest.TestCase): def _makeOne(self, channel=None, request=None): @@ -969,6 +1426,10 @@ class DummyAdj(object): port = 80 url_prefix = '' trusted_proxy = None + trusted_proxy_count = 1 + trusted_proxy_headers = {} + log_untrusted_proxy_headers = True + clear_untrusted_proxy_headers = True class DummyServer(object): server_name = 'localhost' @@ -981,7 +1442,7 @@ class DummyChannel(object): closed_when_done = False adj = DummyAdj() creation_time = 0 - addr = ('127.0.0.1', 80) + addr = ('127.0.0.1', 39830) def __init__(self, server=None): if server is None: @@ -1020,8 +1481,8 @@ class DummyLogger(object): def __init__(self): self.logged = [] - def warning(self, msg): - self.logged.append(msg) + def warning(self, msg, *args): + self.logged.append(msg % args) def exception(self, msg): self.logged.append(msg) |