diff options
-rw-r--r-- | oauthlib/oauth1/rfc5849/signature.py | 66 | ||||
-rw-r--r-- | oauthlib/uri_validate.py | 2 | ||||
-rw-r--r-- | tests/oauth1/rfc5849/test_signatures.py | 20 | ||||
-rw-r--r-- | tests/test_uri_validate.py | 57 |
4 files changed, 113 insertions, 32 deletions
diff --git a/oauthlib/oauth1/rfc5849/signature.py b/oauthlib/oauth1/rfc5849/signature.py index 5ec123a..9cb1a51 100644 --- a/oauthlib/oauth1/rfc5849/signature.py +++ b/oauthlib/oauth1/rfc5849/signature.py @@ -37,6 +37,7 @@ should have no impact on properly behaving programs. import binascii import hashlib import hmac +import ipaddress import logging import urllib.parse as urlparse import warnings @@ -130,7 +131,12 @@ def base_string_uri(uri: str, host: str = None) -> str: raise ValueError('uri must be a string.') # FIXME: urlparse does not support unicode - scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri) + output = urlparse.urlparse(uri) + scheme = output.scheme + hostname = output.hostname + port = output.port + path = output.path + params = output.params # The scheme, authority, and path of the request resource URI `RFC3986` # are included by constructing an "http" or "https" URI representing @@ -152,13 +158,22 @@ def base_string_uri(uri: str, host: str = None) -> str: # 1. The scheme and host MUST be in lowercase. scheme = scheme.lower() - netloc = netloc.lower() # Note: if ``host`` is used, it will be converted to lowercase below + if hostname is not None: + hostname = hostname.lower() # 2. The host and port values MUST match the content of the HTTP # request "Host" header field. if host is not None: - netloc = host.lower() # override value in uri with provided host + # NOTE: override value in uri with provided host + # Host argument is equal to netloc. It means it's missing scheme. + # Add it back, before parsing. + + host = host.lower() + host = f"{scheme}://{host}" + output = urlparse.urlparse(host) + hostname = output.hostname + port = output.port # 3. The port MUST be included if it is not the default port for the # scheme, and MUST be excluded if it is the default. Specifically, @@ -169,33 +184,28 @@ def base_string_uri(uri: str, host: str = None) -> str: # .. _`RFC2616`: https://tools.ietf.org/html/rfc2616 # .. _`RFC2818`: https://tools.ietf.org/html/rfc2818 - if ':' in netloc: - # Contains a colon ":", so try to parse as "host:port" - - hostname, port_str = netloc.split(':', 1) - - if len(hostname) == 0: - raise ValueError('missing host') # error: netloc was ":port" or ":" + if hostname is None: + raise ValueError('missing host') - if len(port_str) == 0: - netloc = hostname # was "host:", so just use the host part - else: - try: - port_num = int(port_str) # try to parse into an integer number - except ValueError: - raise ValueError('port is not an integer') - - if port_num <= 0 or 65535 < port_num: - raise ValueError('port out of range') # 16-bit unsigned ints - if (scheme, port_num) in (('http', 80), ('https', 443)): - netloc = hostname # default port for scheme: exclude port num - else: - netloc = hostname + ':' + str(port_num) # use hostname:port + # NOTE: Try guessing if we're dealing with IP or hostname + try: + hostname = ipaddress.ip_address(hostname) + except ValueError: + pass + + if isinstance(hostname, ipaddress.IPv6Address): + hostname = f"[{hostname}]" + elif isinstance(hostname, ipaddress.IPv4Address): + hostname = f"{hostname}" + + if port is not None and not (0 < port <= 65535): + raise ValueError('port out of range') # 16-bit unsigned ints + if (scheme, port) in (('http', 80), ('https', 443)): + netloc = hostname # default port for scheme: exclude port num + elif port: + netloc = f"{hostname}:{port}" # use hostname:port else: - # Does not contain a colon, so entire value must be the hostname - - if len(netloc) == 0: - raise ValueError('missing host') # error: netloc was empty string + netloc = hostname v = urlparse.urlunparse((scheme, netloc, path, params, '', '')) diff --git a/oauthlib/uri_validate.py b/oauthlib/uri_validate.py index 8a6d9c2..a6fe0fb 100644 --- a/oauthlib/uri_validate.py +++ b/oauthlib/uri_validate.py @@ -66,7 +66,7 @@ IPv4address = r"%(dec_octet)s \. %(dec_octet)s \. %(dec_octet)s \. %(dec_octet)s ) # IPv6address -IPv6address = r"([A-Fa-f0-9:]+:+)+[A-Fa-f0-9]+" +IPv6address = r"([A-Fa-f0-9:]+[:$])[A-Fa-f0-9]{1,4}" # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) IPvFuture = r"v %(HEXDIG)s+ \. (?: %(unreserved)s | %(sub_delims)s | : )+" % locals() diff --git a/tests/oauth1/rfc5849/test_signatures.py b/tests/oauth1/rfc5849/test_signatures.py index 4e6d962..2d4735e 100644 --- a/tests/oauth1/rfc5849/test_signatures.py +++ b/tests/oauth1/rfc5849/test_signatures.py @@ -229,6 +229,26 @@ class SignatureTests(TestCase): base_string_uri('http:///path', 'OVERRIDE.example.com')) # ---------------- + # Host: valid host allows for IPv4 and IPv6 + + self.assertEqual( + 'https://192.168.0.1/', + base_string_uri('https://192.168.0.1') + ) + self.assertEqual( + 'https://192.168.0.1:13000/', + base_string_uri('https://192.168.0.1:13000') + ) + self.assertEqual( + 'https://[123:db8:fd00:1000::5]:13000/', + base_string_uri('https://[123:db8:fd00:1000::5]:13000') + ) + self.assertEqual( + 'https://[123:db8:fd00:1000::5]/', + base_string_uri('https://[123:db8:fd00:1000::5]') + ) + + # ---------------- # Port: default ports always excluded; non-default ports always included self.assertEqual( diff --git a/tests/test_uri_validate.py b/tests/test_uri_validate.py index 3489d95..6a9f8ea 100644 --- a/tests/test_uri_validate.py +++ b/tests/test_uri_validate.py @@ -1,4 +1,4 @@ -import oauthlib +import unittest from oauthlib.uri_validate import is_absolute_uri from tests.unittest import TestCase @@ -7,7 +7,6 @@ from tests.unittest import TestCase class UriValidateTest(TestCase): def test_is_absolute_uri(self): - self.assertIsNotNone(is_absolute_uri('schema://example.com/path')) self.assertIsNotNone(is_absolute_uri('https://example.com/path')) self.assertIsNotNone(is_absolute_uri('https://example.com')) @@ -17,17 +16,69 @@ class UriValidateTest(TestCase): self.assertIsNotNone(is_absolute_uri('http://example.com')) self.assertIsNotNone(is_absolute_uri('http://example.com/path')) self.assertIsNotNone(is_absolute_uri('http://example.com:80/path')) - self.assertIsNotNone(is_absolute_uri('com.example.bundle.id:/')) + + def test_query(self): + self.assertIsNotNone(is_absolute_uri('http://example.com:80/path?foo')) + self.assertIsNotNone(is_absolute_uri('http://example.com:80/path?foo=bar')) + self.assertIsNotNone(is_absolute_uri('http://example.com:80/path?foo=bar&fruit=banana')) + + def test_fragment_forbidden(self): + self.assertIsNone(is_absolute_uri('http://example.com:80/path#foo')) + self.assertIsNone(is_absolute_uri('http://example.com:80/path#foo=bar')) + self.assertIsNone(is_absolute_uri('http://example.com:80/path#foo=bar&fruit=banana')) + + def test_combined_forbidden(self): + self.assertIsNone(is_absolute_uri('http://example.com:80/path?foo#bar')) + self.assertIsNone(is_absolute_uri('http://example.com:80/path?foo&bar#fruit')) + self.assertIsNone(is_absolute_uri('http://example.com:80/path?foo=1&bar#fruit=banana')) + self.assertIsNone(is_absolute_uri('http://example.com:80/path?foo=1&bar=2#fruit=banana&bar=foo')) + + def test_custom_scheme(self): + self.assertIsNotNone(is_absolute_uri('com.example.bundle.id://')) + + def test_ipv6_bracket(self): self.assertIsNotNone(is_absolute_uri('http://[::1]:38432/path')) self.assertIsNotNone(is_absolute_uri('http://[::1]/path')) self.assertIsNotNone(is_absolute_uri('http://[fd01:0001::1]/path')) self.assertIsNotNone(is_absolute_uri('http://[fd01:1::1]/path')) self.assertIsNotNone(is_absolute_uri('http://[0123:4567:89ab:cdef:0123:4567:89ab:cdef]/path')) + self.assertIsNotNone(is_absolute_uri('http://[0123:4567:89ab:cdef:0123:4567:89ab:cdef]:8080/path')) + + @unittest.skip("ipv6 edge-cases not supported") + def test_ipv6_edge_cases(self): + self.assertIsNotNone(is_absolute_uri('http://2001:db8::')) + self.assertIsNotNone(is_absolute_uri('http://::1234:5678')) + self.assertIsNotNone(is_absolute_uri('http://2001:db8::1234:5678')) + self.assertIsNotNone(is_absolute_uri('http://2001:db8:3333:4444:5555:6666:7777:8888')) + self.assertIsNotNone(is_absolute_uri('http://2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF')) + self.assertIsNotNone(is_absolute_uri('http://0123:4567:89ab:cdef:0123:4567:89ab:cdef/path')) + self.assertIsNotNone(is_absolute_uri('http://::')) + self.assertIsNotNone(is_absolute_uri('http://2001:0db8:0001:0000:0000:0ab9:C0A8:0102')) + + @unittest.skip("ipv6 dual ipv4 not supported") + def test_ipv6_dual(self): + self.assertIsNotNone(is_absolute_uri('http://2001:db8:3333:4444:5555:6666:1.2.3.4')) + self.assertIsNotNone(is_absolute_uri('http://::11.22.33.44')) + self.assertIsNotNone(is_absolute_uri('http://2001:db8::123.123.123.123')) + self.assertIsNotNone(is_absolute_uri('http://::1234:5678:91.123.4.56')) + self.assertIsNotNone(is_absolute_uri('http://::1234:5678:1.2.3.4')) + self.assertIsNotNone(is_absolute_uri('http://2001:db8::1234:5678:5.6.7.8')) + + def test_ipv4(self): self.assertIsNotNone(is_absolute_uri('http://127.0.0.1:38432/')) self.assertIsNotNone(is_absolute_uri('http://127.0.0.1:38432/')) self.assertIsNotNone(is_absolute_uri('http://127.1:38432/')) + def test_failures(self): self.assertIsNone(is_absolute_uri('http://example.com:notaport/path')) self.assertIsNone(is_absolute_uri('wrong')) self.assertIsNone(is_absolute_uri('http://[:1]:38432/path')) self.assertIsNone(is_absolute_uri('http://[abcd:efgh::1]/')) + + def test_recursive_regex(self): + from datetime import datetime + t0 = datetime.now() + is_absolute_uri('http://[::::::::::::::::::::::::::]/path') + t1 = datetime.now() + spent = t1 - t0 + self.assertGreater(0.1, spent.total_seconds(), "possible recursive loop detected") |