diff options
Diffstat (limited to 'waitress/proxy_headers.py')
-rw-r--r-- | waitress/proxy_headers.py | 88 |
1 files changed, 35 insertions, 53 deletions
diff --git a/waitress/proxy_headers.py b/waitress/proxy_headers.py index 132fea8..1df8b8e 100644 --- a/waitress/proxy_headers.py +++ b/waitress/proxy_headers.py @@ -3,16 +3,18 @@ from collections import namedtuple from .utilities import logger, undquote, BadRequest -PROXY_HEADERS = frozenset({ - 'X_FORWARDED_FOR', - 'X_FORWARDED_HOST', - 'X_FORWARDED_PROTO', - 'X_FORWARDED_PORT', - 'X_FORWARDED_BY', - 'FORWARDED', -}) +PROXY_HEADERS = frozenset( + { + "X_FORWARDED_FOR", + "X_FORWARDED_HOST", + "X_FORWARDED_PROTO", + "X_FORWARDED_PORT", + "X_FORWARDED_BY", + "FORWARDED", + } +) -Forwarded = namedtuple('Forwarded', ['by', 'for_', 'host', 'proto']) +Forwarded = namedtuple("Forwarded", ["by", "for_", "host", "proto"]) class MalformedProxyHeader(Exception): @@ -34,8 +36,8 @@ def proxy_headers_middleware( ): def translate_proxy_headers(environ, start_response): untrusted_headers = PROXY_HEADERS - remote_peer = environ['REMOTE_ADDR'] - if trusted_proxy == '*' or remote_peer == trusted_proxy: + remote_peer = environ["REMOTE_ADDR"] + if trusted_proxy == "*" or remote_peer == trusted_proxy: try: untrusted_headers = parse_proxy_headers( environ, @@ -46,28 +48,27 @@ def proxy_headers_middleware( except MalformedProxyHeader as ex: logger.warning( 'Malformed proxy header "%s" from "%s": %s value: %s', - ex.header, remote_peer, ex.reason, ex.value) + ex.header, + remote_peer, + ex.reason, + ex.value, + ) error = BadRequest('Header "{0}" malformed.'.format(ex.header)) return error.wsgi_response(environ, start_response) # Clear out the untrusted proxy headers if clear_untrusted: clear_untrusted_headers( - environ, - untrusted_headers, - log_warning=log_untrusted, - logger=logger, + environ, untrusted_headers, log_warning=log_untrusted, logger=logger, ) return app(environ, start_response) + return translate_proxy_headers def parse_proxy_headers( - environ, - trusted_proxy_count, - trusted_proxy_headers, - logger=logger, + environ, trusted_proxy_count, trusted_proxy_headers, logger=logger, ): if trusted_proxy_headers is None: trusted_proxy_headers = set() @@ -78,14 +79,9 @@ def parse_proxy_headers( untrusted_headers = set(PROXY_HEADERS) def raise_for_multiple_values(): - raise ValueError( - 'Unspecified behavior for multiple values found in header', - ) + raise ValueError("Unspecified behavior for multiple values found in header",) - if ( - "x-forwarded-for" in trusted_proxy_headers - and "HTTP_X_FORWARDED_FOR" in environ - ): + if "x-forwarded-for" in trusted_proxy_headers and "HTTP_X_FORWARDED_FOR" in environ: try: forwarded_for = [] @@ -110,9 +106,7 @@ def parse_proxy_headers( untrusted_headers.remove("X_FORWARDED_FOR") except Exception as ex: raise MalformedProxyHeader( - "X-Forwarded-For", - str(ex), - environ['HTTP_X_FORWARDED_FOR'], + "X-Forwarded-For", str(ex), environ["HTTP_X_FORWARDED_FOR"], ) if ( @@ -133,35 +127,29 @@ def parse_proxy_headers( untrusted_headers.remove("X_FORWARDED_HOST") except Exception as ex: raise MalformedProxyHeader( - "X-Forwarded-Host", - str(ex), - environ['HTTP_X_FORWARDED_HOST'], + "X-Forwarded-Host", str(ex), environ["HTTP_X_FORWARDED_HOST"], ) if "x-forwarded-proto" in trusted_proxy_headers: try: forwarded_proto = undquote(environ.get("HTTP_X_FORWARDED_PROTO", "")) - if ',' in forwarded_proto: + if "," in forwarded_proto: raise_for_multiple_values() untrusted_headers.remove("X_FORWARDED_PROTO") except Exception as ex: raise MalformedProxyHeader( - "X-Forwarded-Proto", - str(ex), - environ['HTTP_X_FORWARDED_PROTO'], + "X-Forwarded-Proto", str(ex), environ["HTTP_X_FORWARDED_PROTO"], ) if "x-forwarded-port" in trusted_proxy_headers: try: forwarded_port = undquote(environ.get("HTTP_X_FORWARDED_PORT", "")) - if ',' in forwarded_port: + if "," in forwarded_port: raise_for_multiple_values() untrusted_headers.remove("X_FORWARDED_PORT") except Exception as ex: raise MalformedProxyHeader( - "X-Forwarded-Port", - str(ex), - environ['HTTP_X_FORWARDED_PORT'], + "X-Forwarded-Port", str(ex), environ["HTTP_X_FORWARDED_PORT"], ) if "x-forwarded-by" in trusted_proxy_headers: @@ -197,10 +185,10 @@ def parse_proxy_headers( raise ValueError('Invalid forwarded-pair missing "="') if token.strip() != token: - raise ValueError('Token may not be surrounded by whitespace') + raise ValueError("Token may not be surrounded by whitespace") if value.strip() != value: - raise ValueError('Value may not be surrounded by whitespace') + raise ValueError("Value may not be surrounded by whitespace") if token == "by": forwarded_by = undquote(value) @@ -224,7 +212,7 @@ def parse_proxy_headers( ) except Exception as ex: raise MalformedProxyHeader( - "Forwarded", str(ex), environ['HTTP_FORWARDED'], + "Forwarded", str(ex), environ["HTTP_FORWARDED"], ) proxies = proxies[-trusted_proxy_count:] @@ -294,17 +282,11 @@ def parse_proxy_headers( environ["HTTP_HOST"] = "{}:{}".format( forwarded_host, forwarded_port ) - elif ( - forwarded_port == "80" - and environ["wsgi.url_scheme"] != "http" - ): + elif forwarded_port == "80" and environ["wsgi.url_scheme"] != "http": environ["HTTP_HOST"] = "{}:{}".format( forwarded_host, forwarded_port ) - elif ( - forwarded_port == "443" - and environ["wsgi.url_scheme"] != "https" - ): + elif forwarded_port == "443" and environ["wsgi.url_scheme"] != "https": environ["HTTP_HOST"] = "{}:{}".format( forwarded_host, forwarded_port ) @@ -336,7 +318,7 @@ def clear_untrusted_headers( untrusted_headers_removed = [ header for header in untrusted_headers - if environ.pop('HTTP_' + header, False) is not False + if environ.pop("HTTP_" + header, False) is not False ] if log_warning and untrusted_headers_removed: |