summaryrefslogtreecommitdiff
path: root/keystone/tests/unit/test_v3_oauth2.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests/unit/test_v3_oauth2.py')
-rw-r--r--keystone/tests/unit/test_v3_oauth2.py550
1 files changed, 550 insertions, 0 deletions
diff --git a/keystone/tests/unit/test_v3_oauth2.py b/keystone/tests/unit/test_v3_oauth2.py
new file mode 100644
index 000000000..5d01e8953
--- /dev/null
+++ b/keystone/tests/unit/test_v3_oauth2.py
@@ -0,0 +1,550 @@
+# Copyright 2022 openStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from base64 import b64encode
+from http import client
+from oslo_log import log
+from oslo_serialization import jsonutils
+from unittest import mock
+from urllib import parse
+
+from keystone import conf
+from keystone import exception
+from keystone.tests.unit import test_v3
+
+LOG = log.getLogger(__name__)
+CONF = conf.CONF
+
+
+class FakeUserAppCredListCreateResource(mock.Mock):
+ pass
+
+
+class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
+ APP_CRED_CREATE_URL = '/users/%(user_id)s/application_credentials'
+ APP_CRED_LIST_URL = '/users/%(user_id)s/application_credentials'
+ APP_CRED_DELETE_URL = '/users/%(user_id)s/application_credentials/' \
+ '%(app_cred_id)s'
+ APP_CRED_SHOW_URL = '/users/%(user_id)s/application_credentials/' \
+ '%(app_cred_id)s'
+ ACCESS_TOKEN_URL = '/OS-OAUTH2/token'
+
+ def setUp(self):
+ super(OAuth2Tests, self).setUp()
+ log.set_defaults(
+ logging_context_format_string='%(asctime)s.%(msecs)03d %('
+ 'color)s%(levelname)s %(name)s [^[['
+ '01;36m%(request_id)s ^[[00;36m%('
+ 'project_name)s %(user_name)s%('
+ 'color)s] ^[[01;35m%(instance)s%('
+ 'color)s%(message)s^[[00m',
+ default_log_levels=log.DEBUG)
+ CONF.log_opt_values(LOG, log.DEBUG)
+ LOG.debug(f'is_debug_enabled: {log.is_debug_enabled(CONF)}')
+ LOG.debug(f'get_default_log_levels: {log.get_default_log_levels()}')
+
+ def _assert_error_resp(self, error_resp, error_msg, error_description):
+ resp_keys = (
+ 'error', 'error_description'
+ )
+ for key in resp_keys:
+ self.assertIsNotNone(error_resp.get(key, None))
+ self.assertEqual(error_msg, error_resp.get('error'))
+ self.assertEqual(error_description,
+ error_resp.get('error_description'))
+
+ def _create_app_cred(self, user_id, app_cred_name):
+ resp = self.post(
+ self.APP_CRED_CREATE_URL % {'user_id': user_id},
+ body={'application_credential': {'name': app_cred_name}}
+ )
+ LOG.debug(f'resp: {resp}')
+ app_ref = resp.result['application_credential']
+ return app_ref
+
+ def _delete_app_cred(self, user_id, app_cred_id):
+ resp = self.delete(
+ self.APP_CRED_CREATE_URL % {'user_id': user_id,
+ 'app_cred_id': app_cred_id})
+ LOG.debug(f'resp: {resp}')
+
+ def _get_access_token(self, app_cred, b64str, headers, data,
+ expected_status):
+ if b64str is None:
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ if headers is None:
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ if data is None:
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ expected_status=expected_status)
+ return resp
+
+
+class AccessTokenTests(OAuth2Tests):
+
+ def setUp(self):
+ super(AccessTokenTests, self).setUp()
+
+ def _create_access_token(self, client):
+ pass
+
+ def _get_access_token_method_not_allowed(self, app_cred,
+ http_func):
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ resp = http_func(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ expected_status=client.METHOD_NOT_ALLOWED)
+ LOG.debug(f'response: {resp}')
+ json_resp = jsonutils.loads(resp.body)
+ return json_resp
+
+ def test_get_access_token(self):
+ """Test case when an access token can be successfully obtain."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ resp = self._get_access_token(
+ app_cred,
+ b64str=None,
+ headers=None,
+ data=None,
+ expected_status=client.OK)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ def test_get_access_token_without_client_auth(self):
+ """Test case when there is no client authorization."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded'
+ }
+ error = 'invalid_client'
+ error_description = 'OAuth2.0 client authorization is required.'
+ resp = self._get_access_token(app_cred,
+ b64str=None,
+ headers=headers,
+ data=None,
+ expected_status=client.UNAUTHORIZED)
+ self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
+ self.assertEqual('Keystone uri="http://localhost/v3"',
+ resp.headers.get("WWW-Authenticate"))
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_auth_type_is_not_basic(self):
+ """Test case when auth_type is not basic."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ client_id = app_cred.get('id')
+
+ base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
+ 'response="%s"' % (
+ client_id, 'realm', 'nonce', 'path', 'responding')
+
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Digest {base}'
+ }
+ error = 'invalid_client'
+ error_description = 'OAuth2.0 client authorization type ' \
+ 'digest is not supported.'
+ resp = self._get_access_token(app_cred,
+ b64str=None,
+ headers=headers,
+ data=None,
+ expected_status=client.UNAUTHORIZED)
+ self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
+ self.assertEqual('Keystone uri="http://localhost/v3"',
+ resp.headers.get("WWW-Authenticate"))
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_without_client_id(self):
+ """Test case when there is no client_id."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f':{client_secret}'.encode()).decode().strip()
+ error = 'invalid_client'
+ error_description = 'OAuth2.0 client authorization is invalid.'
+ resp = self._get_access_token(app_cred,
+ b64str=b64str,
+ headers=None,
+ data=None,
+ expected_status=client.UNAUTHORIZED)
+ self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
+ self.assertEqual('Keystone uri="http://localhost/v3"',
+ resp.headers.get("WWW-Authenticate"))
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_without_client_secret(self):
+ """Test case when there is no client_secret."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ client_id = app_cred.get('id')
+ b64str = b64encode(
+ f'{client_id}:'.encode()).decode().strip()
+ error = 'invalid_client'
+ error_description = 'OAuth2.0 client authorization is invalid.'
+ resp = self._get_access_token(app_cred,
+ b64str=b64str,
+ headers=None,
+ data=None,
+ expected_status=client.UNAUTHORIZED)
+ self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
+ self.assertEqual('Keystone uri="http://localhost/v3"',
+ resp.headers.get("WWW-Authenticate"))
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_without_grant_type(self):
+ """Test case when there is no grant_type."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ data = {}
+ error = 'invalid_request'
+ error_description = 'The parameter grant_type is required.'
+ resp = self._get_access_token(app_cred,
+ b64str=None,
+ headers=None,
+ data=data,
+ expected_status=client.BAD_REQUEST)
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_blank_grant_type(self):
+ """Test case when grant_type is blank."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ data = {
+ 'grant_type': ''
+ }
+ error = 'unsupported_grant_type'
+ error_description = 'The parameter grant_type ' \
+ ' is not supported.'
+ resp = self._get_access_token(app_cred,
+ b64str=None,
+ headers=None,
+ data=data,
+ expected_status=client.BAD_REQUEST)
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_grant_type_is_not_client_credentials(self):
+ """Test case when grant_type is not client_credentials."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ data = {
+ 'grant_type': 'not_client_credentials'
+ }
+ error = 'unsupported_grant_type'
+ error_description = 'The parameter grant_type ' \
+ 'not_client_credentials is not supported.'
+ resp = self._get_access_token(app_cred,
+ b64str=None,
+ headers=None,
+ data=data,
+ expected_status=client.BAD_REQUEST)
+ json_resp = jsonutils.loads(resp.body)
+ LOG.debug(f'error: {json_resp.get("error")}')
+ LOG.debug(f'error_description: {json_resp.get("error_description")}')
+ self.assertEqual(error,
+ json_resp.get('error'))
+ self.assertEqual(error_description,
+ json_resp.get('error_description'))
+
+ def test_get_access_token_failed_401(self):
+ """Test case when client authentication failed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ error = 'invalid_client'
+
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ with mock.patch(
+ 'keystone.api._shared.authentication.'
+ 'authenticate_for_token') as co_mock:
+ co_mock.side_effect = exception.Unauthorized(
+ 'client is unauthorized')
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ noauth=True,
+ expected_status=client.UNAUTHORIZED)
+ self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
+ self.assertEqual('Keystone uri="http://localhost/v3"',
+ resp.headers.get("WWW-Authenticate"))
+ LOG.debug(f'response: {resp}')
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual(error,
+ json_resp.get('error'))
+ LOG.debug(f'error: {json_resp.get("error")}')
+
+ def test_get_access_token_failed_400(self):
+ """Test case when the called API is incorrect."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ error = 'invalid_request'
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ with mock.patch(
+ 'keystone.api._shared.authentication.'
+ 'authenticate_for_token') as co_mock:
+ co_mock.side_effect = exception.ValidationError(
+ 'Auth method is invalid')
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ noauth=True,
+ expected_status=client.BAD_REQUEST)
+ LOG.debug(f'response: {resp}')
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual(error,
+ json_resp.get('error'))
+ LOG.debug(f'error: {json_resp.get("error")}')
+
+ def test_get_access_token_failed_500_other(self):
+ """Test case when unexpected error."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ error = 'other_error'
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ with mock.patch(
+ 'keystone.api._shared.authentication.'
+ 'authenticate_for_token') as co_mock:
+ co_mock.side_effect = exception.UnexpectedError(
+ 'unexpected error.')
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ noauth=True,
+ expected_status=client.INTERNAL_SERVER_ERROR)
+
+ LOG.debug(f'response: {resp}')
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual(error,
+ json_resp.get('error'))
+
+ def test_get_access_token_failed_500(self):
+ """Test case when internal server error."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ error = 'other_error'
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ data = parse.urlencode(data).encode()
+ with mock.patch(
+ 'keystone.api._shared.authentication.'
+ 'authenticate_for_token') as co_mock:
+ co_mock.side_effect = Exception(
+ 'Internal server is invalid')
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ body=data,
+ noauth=True,
+ expected_status=client.INTERNAL_SERVER_ERROR)
+
+ LOG.debug(f'response: {resp}')
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual(error,
+ json_resp.get('error'))
+
+ def test_get_access_token_method_get_not_allowed(self):
+ """Test case when the request is get method that is not allowed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ json_resp = self._get_access_token_method_not_allowed(
+ app_cred, self.get)
+ self.assertEqual('other_error',
+ json_resp.get('error'))
+ self.assertEqual('The method is not allowed for the requested URL.',
+ json_resp.get('error_description'))
+
+ def test_get_access_token_method_patch_not_allowed(self):
+ """Test case when the request is patch method that is not allowed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ json_resp = self._get_access_token_method_not_allowed(
+ app_cred, self.patch)
+ self.assertEqual('other_error',
+ json_resp.get('error'))
+ self.assertEqual('The method is not allowed for the requested URL.',
+ json_resp.get('error_description'))
+
+ def test_get_access_token_method_put_not_allowed(self):
+ """Test case when the request is put method that is not allowed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ json_resp = self._get_access_token_method_not_allowed(
+ app_cred, self.put)
+ self.assertEqual('other_error',
+ json_resp.get('error'))
+ self.assertEqual('The method is not allowed for the requested URL.',
+ json_resp.get('error_description'))
+
+ def test_get_access_token_method_delete_not_allowed(self):
+ """Test case when the request is delete method that is not allowed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ json_resp = self._get_access_token_method_not_allowed(
+ app_cred, self.delete)
+ self.assertEqual('other_error',
+ json_resp.get('error'))
+ self.assertEqual('The method is not allowed for the requested URL.',
+ json_resp.get('error_description'))
+
+ def test_get_access_token_method_head_not_allowed(self):
+ """Test case when the request is head method that is not allowed."""
+
+ client_name = 'client_name_test'
+ app_cred = self._create_app_cred(self.user_id, client_name)
+ client_id = app_cred.get('id')
+ client_secret = app_cred.get('secret')
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ self.head(
+ self.ACCESS_TOKEN_URL,
+ headers=headers,
+ convert=False,
+ expected_status=client.METHOD_NOT_ALLOWED)