summaryrefslogtreecommitdiff
path: root/tests/oauth2/rfc6749/endpoints
diff options
context:
space:
mode:
authorIb Lundgren <ib.lundgren@gmail.com>2013-09-12 10:05:33 +0100
committerIb Lundgren <ib.lundgren@gmail.com>2013-09-12 10:05:33 +0100
commit62058f2d031d91bb6173fe06a1f6f11e22a9f03e (patch)
tree91f947a737da32e53ef5bd16500285440499fbc8 /tests/oauth2/rfc6749/endpoints
parent1122945efbf3d1be6fed0e2279dfb81f785ad706 (diff)
downloadoauthlib-62058f2d031d91bb6173fe06a1f6f11e22a9f03e.tar.gz
Restructure OAuth2 tests.
Diffstat (limited to 'tests/oauth2/rfc6749/endpoints')
-rw-r--r--tests/oauth2/rfc6749/endpoints/__init__.py0
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_base_endpoint.py70
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_client_authentication.py103
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py116
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_error_responses.py372
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_extra_credentials.py69
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py106
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_scope_handling.py182
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_utils.py14
9 files changed, 1032 insertions, 0 deletions
diff --git a/tests/oauth2/rfc6749/endpoints/__init__.py b/tests/oauth2/rfc6749/endpoints/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/__init__.py
diff --git a/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
new file mode 100644
index 0000000..79124e3
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_base_endpoint.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from ....unittest import TestCase
+
+from oauthlib.oauth2.rfc6749 import BaseEndpoint, catch_errors_and_unavailability
+from oauthlib.oauth2 import Server, RequestValidator, FatalClientError, OAuth2Error
+
+
+class BaseEndpointTest(TestCase):
+
+ def test_default_config(self):
+ endpoint = BaseEndpoint()
+ self.assertFalse(endpoint.catch_errors)
+ self.assertTrue(endpoint.available)
+ endpoint.catch_errors = True
+ self.assertTrue(endpoint.catch_errors)
+ endpoint.available = False
+ self.assertFalse(endpoint.available)
+
+ def test_error_catching(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.catch_errors = True
+ h, b, s = server.create_authorization_response('https://example.com')
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ def test_unavailability(self):
+ validator = RequestValidator()
+ server = Server(validator)
+ server.available = False
+ h, b, s = server.create_authorization_response('https://example.com')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ def test_wrapper(self):
+
+ class TestServer(Server):
+
+ @catch_errors_and_unavailability
+ def throw_error(self, uri):
+ raise ValueError()
+
+ @catch_errors_and_unavailability
+ def throw_oauth_error(self, uri):
+ raise OAuth2Error()
+
+ @catch_errors_and_unavailability
+ def throw_fatal_oauth_error(self, uri):
+ raise FatalClientError()
+
+ validator = RequestValidator()
+ server = TestServer(validator)
+
+ server.catch_errors = True
+ h, b, s = server.throw_error('a')
+ self.assertIn("server_error", b)
+ self.assertEqual(s, 500)
+
+ server.available = False
+ h, b, s = server.throw_error('a')
+ self.assertIn("temporarily_unavailable", b)
+ self.assertEqual(s, 503)
+
+ server.available = True
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
+ server.catch_errors = False
+ self.assertRaises(OAuth2Error, server.throw_oauth_error, 'a')
+ self.assertRaises(FatalClientError, server.throw_fatal_oauth_error, 'a')
diff --git a/tests/oauth2/rfc6749/endpoints/test_client_authentication.py b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
new file mode 100644
index 0000000..fdd9665
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_client_authentication.py
@@ -0,0 +1,103 @@
+"""Client authentication tests across all endpoints.
+
+Client authentication in OAuth2 serve two purposes, to authenticate
+confidential clients and to ensure public clients are in fact public. The
+latter is achieved with authenticate_client_id and the former with
+authenticate_client.
+
+We make sure authentication is done by requiring a client object to be set
+on the request object with a client_id parameter. The client_id attribute
+prevents this check from being circumvented with a client form parameter.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ClientAuthenticationTest(TestCase):
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.client or not request.client.client_id:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client_id(self, client_id, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_username(self, username, password, client, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_client_id_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertEqual(json.loads(body)['error'], 'invalid_client')
+
+ self.validator.authenticate_client_id.return_value = True
+ self.validator.authenticate_client.side_effect = self.set_client
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=mock')
+ self.assertIn('access_token', json.loads(body))
+
+ # implicit grant
+ auth_uri = 'http://example.com/path?client_id=abc&response_type=token'
+ self.assertRaises(ValueError, self.mobile.create_authorization_response,
+ auth_uri, scopes=['random'])
+
+ self.validator.validate_client_id.side_effect = self.set_client_id
+ h, _, s = self.mobile.create_authorization_response(auth_uri, scopes=['random'])
+ self.assertEqual(302, s)
+ self.assertIn('Location', h)
+ self.assertIn('access_token', get_fragment_credentials(h['Location']))
+
+ def test_custom_authentication(self):
+ token_uri = 'http://example.com/path'
+
+ # authorization code grant
+ self.assertRaises(NotImplementedError,
+ self.web.create_token_response, token_uri,
+ body='grant_type=authorization_code&code=mock')
+
+ # password grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.legacy.create_token_response, token_uri,
+ body='grant_type=password&username=abc&password=secret')
+
+ # client credentials grant
+ self.validator.authenticate_client.return_value = True
+ self.assertRaises(NotImplementedError,
+ self.backend.create_token_response, token_uri,
+ body='grant_type=client_credentials')
diff --git a/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
new file mode 100644
index 0000000..98a132a
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_credentials_preservation.py
@@ -0,0 +1,116 @@
+"""Ensure credentials are preserved through the authorization.
+
+The Authorization Code Grant will need to preserve state as well as redirect
+uri and the Implicit Grant will need to preserve state.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2.rfc6749 import errors
+
+
+class PreservationTest(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = self.DEFAULT_REDIRECT_URI
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+
+ def set_state(self, state):
+ def set_request_state(client_id, code, client, request):
+ request.state = state
+ return True
+ return set_request_state
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def test_state_preservation(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.side_effect = self.set_state('xyz')
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['state'], 'xyz')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['state'][0], 'xyz')
+
+ def test_redirect_uri_preservation(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ redirect_uri = 'http://i.b/path'
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + '&response_type=code', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ # confirm_redirect_uri should return false if the redirect uri
+ # was given in the authorization but not in the token request.
+ self.validator.confirm_redirect_uri.return_value = False
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['error'], 'access_denied')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + '&response_type=token', scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertTrue(h['Location'].startswith(redirect_uri))
+
+ def test_invalid_redirect_uri(self):
+ auth_uri = 'http://example.com/path?redirect_uri=http%3A%2F%2Fi.b%2Fpath&client_id=abc'
+ self.validator.validate_redirect_uri.return_value = False
+
+ # authorization grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
+
+ def test_default_uri(self):
+ auth_uri = 'http://example.com/path?state=xyz&client_id=abc'
+
+ self.validator.get_default_redirect_uri.return_value = None
+
+ # authorization grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response,
+ auth_uri + '&response_type=code', scopes=['random'])
+
+ # implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response,
+ auth_uri + '&response_type=token', scopes=['random'])
diff --git a/tests/oauth2/rfc6749/endpoints/test_error_responses.py b/tests/oauth2/rfc6749/endpoints/test_error_responses.py
new file mode 100644
index 0000000..5f65de3
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_error_responses.py
@@ -0,0 +1,372 @@
+"""Ensure the correct error responses are returned for all defined error types.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+from oauthlib.oauth2.rfc6749 import errors
+
+
+class ErrorResponseTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = None
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_invalid_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=wrong'
+ # Authorization code grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_missing_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo'
+ # Authorization code grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_mismatching_redirect_uri(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MismatchingRedirectURIError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_missing_client_id(self):
+ uri = 'https://example.com/authorize?redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_redirect_uri.return_value = False
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingClientIdError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.MissingClientIdError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_invalid_client_id(self):
+ uri = 'https://example.com/authorize?client_id=foo&redirect_uri=https%3A%2F%2Fi.b%2Fback'
+ # Authorization code grant
+ self.validator.validate_client_id.return_value = False
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidClientIdError,
+ self.web.create_authorization_response, uri, scopes=['foo'])
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.validate_authorization_request, uri)
+ self.assertRaises(errors.InvalidClientIdError,
+ self.mobile.create_authorization_response, uri, scopes=['foo'])
+
+ def test_invalid_request(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ token_uri = 'https://i.b/token'
+ invalid_uris = [
+ # Duplicate parameters
+ 'https://i.b/auth?client_id=foo&client_id=bar&response_type={0}',
+ # Missing response type
+ 'https://i.b/auth?client_id=foo',
+ ]
+
+ # Authorization code grant
+ for uri in invalid_uris:
+ self.assertRaises(errors.InvalidRequestError,
+ self.web.validate_authorization_request,
+ uri.format('code'))
+ h, _, s = self.web.create_authorization_response(
+ uri.format('code'), scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=authorization_code&client_id=nope&client_id=nope&code=foo'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.web.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Implicit grant
+ for uri in invalid_uris:
+ self.assertRaises(errors.InvalidRequestError,
+ self.mobile.validate_authorization_request,
+ uri.format('token'))
+ h, _, s = self.mobile.create_authorization_response(
+ uri.format('token'), scopes=['foo'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertIn('error=invalid_request', h['Location'])
+
+ # Password credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=password&username=foo&username=bar&password=baz'
+ # missing username
+ 'grant_type=password&password=baz'
+ # missing password
+ 'grant_type=password&username=foo'
+ ]
+ self.validator.authenticate_client.side_effect = self.set_client
+ for body in invalid_bodies:
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ # Client credentials grant
+ invalid_bodies = [
+ # duplicate params
+ 'grant_type=client_credentials&scope=foo&scope=bar'
+ ]
+ for body in invalid_bodies:
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body)
+ self.assertEqual('invalid_request', json.loads(body)['error'])
+
+ def test_unauthorized_client(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_grant_type.return_value = False
+ self.validator.validate_response_type.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+ token_uri = 'https://i.b/token'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Implicit grant
+ self.assertRaises(errors.UnauthorizedClientError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body='grant_type=client_credentials')
+ self.assertEqual('unauthorized_client', json.loads(body)['error'])
+
+ def test_access_denied(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.validator.confirm_redirect_uri.return_value = False
+ token_uri = 'https://i.b/token'
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('access_denied', json.loads(body)['error'])
+
+ def test_unsupported_response_type(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.UnsupportedResponseTypeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=foo&client_id=foo')
+
+ def test_invalid_scope(self):
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.validator.validate_scopes.return_value = False
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.web.validate_authorization_request,
+ 'https://i.b/auth?response_type=code&client_id=foo')
+
+ # Implicit grant
+ self.assertRaises(errors.InvalidScopeError,
+ self.mobile.validate_authorization_request,
+ 'https://i.b/auth?response_type=token&client_id=foo')
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_scope', json.loads(body)['error'])
+
+ def test_server_error(self):
+ def raise_error(*args, **kwargs):
+ raise ValueError()
+
+ self.validator.validate_client_id.side_effect = raise_error
+ self.validator.authenticate_client.side_effect = raise_error
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+
+ # Authorization code grant
+ self.web.catch_errors = True
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Implicit grant
+ self.mobile.catch_errors = True
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 500)
+
+ # Password credentials grant
+ self.legacy.catch_errors = True
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 500)
+
+ # Client credentials grant
+ self.backend.catch_errors = True
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 500)
+
+ def test_temporarily_unavailable(self):
+ # Authorization code grant
+ self.web.available = False
+ _, _, s = self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+ _, _, s = self.web.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Implicit grant
+ self.mobile.available = False
+ _, _, s = self.mobile.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'])
+ self.assertEqual(s, 503)
+
+ # Password credentials grant
+ self.legacy.available = False
+ _, _, s = self.legacy.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=password&username=foo&password=foo')
+ self.assertEqual(s, 503)
+
+ # Client credentials grant
+ self.backend.available = False
+ _, _, s = self.backend.create_token_response(
+ 'https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual(s, 503)
+
+ def test_invalid_client(self):
+ self.validator.authenticate_client.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials')
+ self.assertEqual('invalid_client', json.loads(body)['error'])
+
+ def test_invalid_grant(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.validator.validate_code.return_value = False
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ # Password credentials grant
+ self.validator.validate_user.return_value = False
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar')
+ self.assertEqual('invalid_grant', json.loads(body)['error'])
+
+ def test_unsupported_grant_type(self):
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ _, body, _ = self.web.create_token_response('https://i.b/token',
+ body='grant_type=bar&code=foo')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Password credentials grant
+ _, body, _ = self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=bar&username=foo&password=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
+
+ # Client credentials grant
+ _, body, _ = self.backend.create_token_response('https://i.b/token',
+ body='grant_type=bar')
+ self.assertEqual('unsupported_grant_type', json.loads(body)['error'])
diff --git a/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
new file mode 100644
index 0000000..b43fa02
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_extra_credentials.py
@@ -0,0 +1,69 @@
+"""Ensure extra credentials can be supplied for inclusion in tokens.
+"""
+from __future__ import absolute_import, unicode_literals
+import mock
+
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ExtraCredentialsTest(TestCase):
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'https://i.b/cb'
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_post_authorization_request(self):
+ def save_code(client_id, token, request):
+ self.assertEqual('creds', request.extra)
+
+ def save_token(token, request):
+ self.assertEqual('creds', request.extra)
+
+ # Authorization code grant
+ self.validator.save_authorization_code.side_effect = save_code
+ self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=code',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ # Implicit grant
+ self.validator.save_bearer_token.side_effect = save_token
+ self.web.create_authorization_response(
+ 'https://i.b/auth?client_id=foo&response_type=token',
+ scopes=['foo'],
+ credentials={'extra': 'creds'})
+
+ def test_token_request(self):
+ def save_token(token, request):
+ self.assertIn('extra', token)
+
+ self.validator.save_bearer_token.side_effect = save_token
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ # Authorization code grant
+ self.web.create_token_response('https://i.b/token',
+ body='grant_type=authorization_code&code=foo',
+ credentials={'extra': 'creds'})
+
+ # Password credentials grant
+ self.legacy.create_token_response('https://i.b/token',
+ body='grant_type=password&username=foo&password=bar',
+ credentials={'extra': 'creds'})
+
+ # Client credentials grant
+ self.backend.create_token_response('https://i.b/token',
+ body='grant_type=client_credentials',
+ credentials={'extra': 'creds'})
diff --git a/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
new file mode 100644
index 0000000..6b3137a
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_resource_owner_association.py
@@ -0,0 +1,106 @@
+"""Ensure all tokens are associated with a resource owner.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class ResourceOwnerAssociationTest(TestCase):
+
+ auth_uri = 'http://example.com/path?client_id=abc'
+ token_uri = 'http://example.com/path'
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_user(self, client_id, code, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_username(self, username, password, client, request):
+ request.user = 'test'
+ return True
+
+ def set_user_from_credentials(self, request):
+ request.user = 'test'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def inspect_client(self, request, refresh_token=False):
+ if not request.user:
+ raise ValueError()
+ return 'abc'
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = 'http://i.b./path'
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.mobile = MobileApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.legacy = LegacyApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+ self.backend = BackendApplicationServer(self.validator,
+ token_generator=self.inspect_client)
+
+ def test_web_application(self):
+ # TODO: code generator + intercept test
+ h, _, s = self.web.create_authorization_response(
+ self.auth_uri + '&response_type=code',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.assertRaises(ValueError,
+ self.web.create_token_response, self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+
+ self.validator.validate_code.side_effect = self.set_user
+ _, body, _ = self.web.create_token_response(self.token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_mobile_application(self):
+ self.assertRaises(ValueError,
+ self.mobile.create_authorization_response,
+ self.auth_uri + '&response_type=token')
+
+ h, _, s = self.mobile.create_authorization_response(
+ self.auth_uri + '&response_type=token',
+ credentials={'user': 'test'}, scopes=['random'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['access_token'][0], 'abc')
+
+ def test_legacy_application(self):
+ body = 'grant_type=password&username=abc&password=secret'
+ self.assertRaises(ValueError,
+ self.legacy.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.validate_user.side_effect = self.set_user_from_username
+ _, body, _ = self.legacy.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
+
+ def test_backend_application(self):
+ body = 'grant_type=client_credentials'
+ self.assertRaises(ValueError,
+ self.backend.create_token_response,
+ self.token_uri, body=body)
+
+ self.validator.authenticate_client.side_effect = self.set_user_from_credentials
+ _, body, _ = self.backend.create_token_response(
+ self.token_uri, body=body)
+ self.assertEqual(json.loads(body)['access_token'], 'abc')
diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
new file mode 100644
index 0000000..f48a4f9
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
@@ -0,0 +1,182 @@
+"""Ensure scope is preserved across authorization.
+
+Fairly trivial in all grants except the Authorization Code Grant where scope
+need to be persisted temporarily in an authorization code.
+"""
+from __future__ import absolute_import, unicode_literals
+import json
+import mock
+
+from .test_utils import get_query_credentials, get_fragment_credentials
+from ....unittest import TestCase
+
+from oauthlib.oauth2 import RequestValidator
+from oauthlib.oauth2 import WebApplicationServer, MobileApplicationServer
+from oauthlib.oauth2 import LegacyApplicationServer, BackendApplicationServer
+
+
+class TestScopeHandling(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def set_scopes(self, scopes):
+ def set_request_scopes(client_id, code, client, request):
+ request.scopes = scopes
+ return True
+ return set_request_scopes
+
+ def set_user(self, request):
+ request.user = 'foo'
+ request.client_id = 'bar'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = TestScopeHandling.DEFAULT_REDIRECT_URI
+ self.validator.authenticate_client.side_effect = self.set_client
+ self.web = WebApplicationServer(self.validator)
+ self.mobile = MobileApplicationServer(self.validator)
+ self.legacy = LegacyApplicationServer(self.validator)
+ self.backend = BackendApplicationServer(self.validator)
+
+ def test_scope_extraction(self):
+ scopes = (
+ ('images', ['images']),
+ ('images+videos', ['images', 'videos']),
+ ('http%3A%2f%2fa.b%2fvideos', ['http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+pics', ['http://a.b/videos', 'pics']),
+ ('pics+http%3A%2f%2fa.b%2fvideos', ['pics', 'http://a.b/videos']),
+ ('http%3A%2f%2fa.b%2fvideos+https%3A%2f%2fc.d%2Fsecret', ['http://a.b/videos', 'https://c.d/secret']),
+ )
+
+ uri = 'http://example.com/path?client_id=abc&scope=%s&response_type=%s'
+ for scope, correct_scopes in scopes:
+ scopes, _ = self.web.validate_authorization_request(
+ uri % (scope, 'code'))
+ self.assertItemsEqual(scopes, correct_scopes)
+ scopes, _ = self.mobile.validate_authorization_request(
+ uri % (scope, 'token'))
+ self.assertItemsEqual(scopes, correct_scopes)
+
+ def test_scope_preservation(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ decoded_scope = 'pics http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=decoded_scope.split(' '))
+ self.validator.validate_code.side_effect = self.set_scopes(decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=decoded_scope.split(' '))
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ body = 'grant_type=client_credentials&scope=%s'
+ self.validator.authenticate_client.side_effect = self.set_user
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_scope_changed(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ scopes = ['images', 'http://a.b/videos']
+ decoded_scope = 'images http://a.b/videos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ code = get_query_credentials(h['Location'])['code'][0]
+ self.validator.validate_code.side_effect = self.set_scopes(scopes)
+ _, body, _ = self.web.create_token_response(token_uri,
+ body='grant_type=authorization_code&code=%s' % code)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # implicit grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=scopes)
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ self.assertEqual(get_fragment_credentials(h['Location'])['scope'][0], decoded_scope)
+
+ # resource owner password credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ # client credentials grant
+ self.validator.validate_scopes.side_effect = self.set_scopes(scopes)
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+
+ self.assertEqual(json.loads(body)['scope'], decoded_scope)
+
+ def test_invalid_scope(self):
+ scope = 'pics+http%3A%2f%2fa.b%2fvideos'
+ auth_uri = 'http://example.com/path?client_id=abc&response_type='
+ token_uri = 'http://example.com/path'
+
+ self.validator.validate_scopes.return_value = False
+
+ # authorization grant
+ h, _, s = self.web.create_authorization_response(
+ auth_uri + 'code', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_query_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # implicit grant
+ h, _, s = self.mobile.create_authorization_response(
+ auth_uri + 'token', scopes=['invalid'])
+ self.assertEqual(s, 302)
+ self.assertIn('Location', h)
+ error = get_fragment_credentials(h['Location'])['error'][0]
+ self.assertEqual(error, 'invalid_scope')
+
+ # resource owner password credentials grant
+ body = 'grant_type=password&username=abc&password=secret&scope=%s'
+ _, body, _ = self.legacy.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
+
+ # client credentials grant
+ self.validator.authenticate_client.side_effect = self.set_user
+ body = 'grant_type=client_credentials&scope=%s'
+ _, body, _ = self.backend.create_token_response(token_uri,
+ body=body % scope)
+ self.assertEqual(json.loads(body)['error'], 'invalid_scope')
diff --git a/tests/oauth2/rfc6749/endpoints/test_utils.py b/tests/oauth2/rfc6749/endpoints/test_utils.py
new file mode 100644
index 0000000..6b7cff8
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_utils.py
@@ -0,0 +1,14 @@
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+
+def get_query_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).query,
+ keep_blank_values=True)
+
+
+def get_fragment_credentials(uri):
+ return urlparse.parse_qs(urlparse.urlparse(uri).fragment,
+ keep_blank_values=True)