diff options
Diffstat (limited to 'keystone/tests/unit/test_v3_oauth2.py')
-rw-r--r-- | keystone/tests/unit/test_v3_oauth2.py | 550 |
1 files changed, 550 insertions, 0 deletions
diff --git a/keystone/tests/unit/test_v3_oauth2.py b/keystone/tests/unit/test_v3_oauth2.py new file mode 100644 index 000000000..5d01e8953 --- /dev/null +++ b/keystone/tests/unit/test_v3_oauth2.py @@ -0,0 +1,550 @@ +# Copyright 2022 openStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from base64 import b64encode +from http import client +from oslo_log import log +from oslo_serialization import jsonutils +from unittest import mock +from urllib import parse + +from keystone import conf +from keystone import exception +from keystone.tests.unit import test_v3 + +LOG = log.getLogger(__name__) +CONF = conf.CONF + + +class FakeUserAppCredListCreateResource(mock.Mock): + pass + + +class OAuth2Tests(test_v3.OAuth2RestfulTestCase): + APP_CRED_CREATE_URL = '/users/%(user_id)s/application_credentials' + APP_CRED_LIST_URL = '/users/%(user_id)s/application_credentials' + APP_CRED_DELETE_URL = '/users/%(user_id)s/application_credentials/' \ + '%(app_cred_id)s' + APP_CRED_SHOW_URL = '/users/%(user_id)s/application_credentials/' \ + '%(app_cred_id)s' + ACCESS_TOKEN_URL = '/OS-OAUTH2/token' + + def setUp(self): + super(OAuth2Tests, self).setUp() + log.set_defaults( + logging_context_format_string='%(asctime)s.%(msecs)03d %(' + 'color)s%(levelname)s %(name)s [^[[' + '01;36m%(request_id)s ^[[00;36m%(' + 'project_name)s %(user_name)s%(' + 'color)s] ^[[01;35m%(instance)s%(' + 'color)s%(message)s^[[00m', + default_log_levels=log.DEBUG) + CONF.log_opt_values(LOG, log.DEBUG) + LOG.debug(f'is_debug_enabled: {log.is_debug_enabled(CONF)}') + LOG.debug(f'get_default_log_levels: {log.get_default_log_levels()}') + + def _assert_error_resp(self, error_resp, error_msg, error_description): + resp_keys = ( + 'error', 'error_description' + ) + for key in resp_keys: + self.assertIsNotNone(error_resp.get(key, None)) + self.assertEqual(error_msg, error_resp.get('error')) + self.assertEqual(error_description, + error_resp.get('error_description')) + + def _create_app_cred(self, user_id, app_cred_name): + resp = self.post( + self.APP_CRED_CREATE_URL % {'user_id': user_id}, + body={'application_credential': {'name': app_cred_name}} + ) + LOG.debug(f'resp: {resp}') + app_ref = resp.result['application_credential'] + return app_ref + + def _delete_app_cred(self, user_id, app_cred_id): + resp = self.delete( + self.APP_CRED_CREATE_URL % {'user_id': user_id, + 'app_cred_id': app_cred_id}) + LOG.debug(f'resp: {resp}') + + def _get_access_token(self, app_cred, b64str, headers, data, + expected_status): + if b64str is None: + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + if headers is None: + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + if data is None: + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + resp = self.post( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + expected_status=expected_status) + return resp + + +class AccessTokenTests(OAuth2Tests): + + def setUp(self): + super(AccessTokenTests, self).setUp() + + def _create_access_token(self, client): + pass + + def _get_access_token_method_not_allowed(self, app_cred, + http_func): + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + resp = http_func( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + expected_status=client.METHOD_NOT_ALLOWED) + LOG.debug(f'response: {resp}') + json_resp = jsonutils.loads(resp.body) + return json_resp + + def test_get_access_token(self): + """Test case when an access token can be successfully obtain.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + resp = self._get_access_token( + app_cred, + b64str=None, + headers=None, + data=None, + expected_status=client.OK) + json_resp = jsonutils.loads(resp.body) + self.assertIn('access_token', json_resp) + self.assertEqual('Bearer', json_resp['token_type']) + self.assertEqual(3600, json_resp['expires_in']) + + def test_get_access_token_without_client_auth(self): + """Test case when there is no client authorization.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + error = 'invalid_client' + error_description = 'OAuth2.0 client authorization is required.' + resp = self._get_access_token(app_cred, + b64str=None, + headers=headers, + data=None, + expected_status=client.UNAUTHORIZED) + self.assertNotEmpty(resp.headers.get("WWW-Authenticate")) + self.assertEqual('Keystone uri="http://localhost/v3"', + resp.headers.get("WWW-Authenticate")) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_auth_type_is_not_basic(self): + """Test case when auth_type is not basic.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + client_id = app_cred.get('id') + + base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ + 'response="%s"' % ( + client_id, 'realm', 'nonce', 'path', 'responding') + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Digest {base}' + } + error = 'invalid_client' + error_description = 'OAuth2.0 client authorization type ' \ + 'digest is not supported.' + resp = self._get_access_token(app_cred, + b64str=None, + headers=headers, + data=None, + expected_status=client.UNAUTHORIZED) + self.assertNotEmpty(resp.headers.get("WWW-Authenticate")) + self.assertEqual('Keystone uri="http://localhost/v3"', + resp.headers.get("WWW-Authenticate")) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_without_client_id(self): + """Test case when there is no client_id.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + client_secret = app_cred.get('secret') + b64str = b64encode( + f':{client_secret}'.encode()).decode().strip() + error = 'invalid_client' + error_description = 'OAuth2.0 client authorization is invalid.' + resp = self._get_access_token(app_cred, + b64str=b64str, + headers=None, + data=None, + expected_status=client.UNAUTHORIZED) + self.assertNotEmpty(resp.headers.get("WWW-Authenticate")) + self.assertEqual('Keystone uri="http://localhost/v3"', + resp.headers.get("WWW-Authenticate")) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_without_client_secret(self): + """Test case when there is no client_secret.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + client_id = app_cred.get('id') + b64str = b64encode( + f'{client_id}:'.encode()).decode().strip() + error = 'invalid_client' + error_description = 'OAuth2.0 client authorization is invalid.' + resp = self._get_access_token(app_cred, + b64str=b64str, + headers=None, + data=None, + expected_status=client.UNAUTHORIZED) + self.assertNotEmpty(resp.headers.get("WWW-Authenticate")) + self.assertEqual('Keystone uri="http://localhost/v3"', + resp.headers.get("WWW-Authenticate")) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_without_grant_type(self): + """Test case when there is no grant_type.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + data = {} + error = 'invalid_request' + error_description = 'The parameter grant_type is required.' + resp = self._get_access_token(app_cred, + b64str=None, + headers=None, + data=data, + expected_status=client.BAD_REQUEST) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_blank_grant_type(self): + """Test case when grant_type is blank.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + data = { + 'grant_type': '' + } + error = 'unsupported_grant_type' + error_description = 'The parameter grant_type ' \ + ' is not supported.' + resp = self._get_access_token(app_cred, + b64str=None, + headers=None, + data=data, + expected_status=client.BAD_REQUEST) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_grant_type_is_not_client_credentials(self): + """Test case when grant_type is not client_credentials.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + data = { + 'grant_type': 'not_client_credentials' + } + error = 'unsupported_grant_type' + error_description = 'The parameter grant_type ' \ + 'not_client_credentials is not supported.' + resp = self._get_access_token(app_cred, + b64str=None, + headers=None, + data=data, + expected_status=client.BAD_REQUEST) + json_resp = jsonutils.loads(resp.body) + LOG.debug(f'error: {json_resp.get("error")}') + LOG.debug(f'error_description: {json_resp.get("error_description")}') + self.assertEqual(error, + json_resp.get('error')) + self.assertEqual(error_description, + json_resp.get('error_description')) + + def test_get_access_token_failed_401(self): + """Test case when client authentication failed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + error = 'invalid_client' + + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + with mock.patch( + 'keystone.api._shared.authentication.' + 'authenticate_for_token') as co_mock: + co_mock.side_effect = exception.Unauthorized( + 'client is unauthorized') + resp = self.post( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + noauth=True, + expected_status=client.UNAUTHORIZED) + self.assertNotEmpty(resp.headers.get("WWW-Authenticate")) + self.assertEqual('Keystone uri="http://localhost/v3"', + resp.headers.get("WWW-Authenticate")) + LOG.debug(f'response: {resp}') + json_resp = jsonutils.loads(resp.body) + self.assertEqual(error, + json_resp.get('error')) + LOG.debug(f'error: {json_resp.get("error")}') + + def test_get_access_token_failed_400(self): + """Test case when the called API is incorrect.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + error = 'invalid_request' + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + with mock.patch( + 'keystone.api._shared.authentication.' + 'authenticate_for_token') as co_mock: + co_mock.side_effect = exception.ValidationError( + 'Auth method is invalid') + resp = self.post( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + noauth=True, + expected_status=client.BAD_REQUEST) + LOG.debug(f'response: {resp}') + json_resp = jsonutils.loads(resp.body) + self.assertEqual(error, + json_resp.get('error')) + LOG.debug(f'error: {json_resp.get("error")}') + + def test_get_access_token_failed_500_other(self): + """Test case when unexpected error.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + error = 'other_error' + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + with mock.patch( + 'keystone.api._shared.authentication.' + 'authenticate_for_token') as co_mock: + co_mock.side_effect = exception.UnexpectedError( + 'unexpected error.') + resp = self.post( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + noauth=True, + expected_status=client.INTERNAL_SERVER_ERROR) + + LOG.debug(f'response: {resp}') + json_resp = jsonutils.loads(resp.body) + self.assertEqual(error, + json_resp.get('error')) + + def test_get_access_token_failed_500(self): + """Test case when internal server error.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + error = 'other_error' + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + data = { + 'grant_type': 'client_credentials' + } + data = parse.urlencode(data).encode() + with mock.patch( + 'keystone.api._shared.authentication.' + 'authenticate_for_token') as co_mock: + co_mock.side_effect = Exception( + 'Internal server is invalid') + resp = self.post( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + body=data, + noauth=True, + expected_status=client.INTERNAL_SERVER_ERROR) + + LOG.debug(f'response: {resp}') + json_resp = jsonutils.loads(resp.body) + self.assertEqual(error, + json_resp.get('error')) + + def test_get_access_token_method_get_not_allowed(self): + """Test case when the request is get method that is not allowed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + json_resp = self._get_access_token_method_not_allowed( + app_cred, self.get) + self.assertEqual('other_error', + json_resp.get('error')) + self.assertEqual('The method is not allowed for the requested URL.', + json_resp.get('error_description')) + + def test_get_access_token_method_patch_not_allowed(self): + """Test case when the request is patch method that is not allowed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + json_resp = self._get_access_token_method_not_allowed( + app_cred, self.patch) + self.assertEqual('other_error', + json_resp.get('error')) + self.assertEqual('The method is not allowed for the requested URL.', + json_resp.get('error_description')) + + def test_get_access_token_method_put_not_allowed(self): + """Test case when the request is put method that is not allowed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + json_resp = self._get_access_token_method_not_allowed( + app_cred, self.put) + self.assertEqual('other_error', + json_resp.get('error')) + self.assertEqual('The method is not allowed for the requested URL.', + json_resp.get('error_description')) + + def test_get_access_token_method_delete_not_allowed(self): + """Test case when the request is delete method that is not allowed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + json_resp = self._get_access_token_method_not_allowed( + app_cred, self.delete) + self.assertEqual('other_error', + json_resp.get('error')) + self.assertEqual('The method is not allowed for the requested URL.', + json_resp.get('error_description')) + + def test_get_access_token_method_head_not_allowed(self): + """Test case when the request is head method that is not allowed.""" + + client_name = 'client_name_test' + app_cred = self._create_app_cred(self.user_id, client_name) + client_id = app_cred.get('id') + client_secret = app_cred.get('secret') + b64str = b64encode( + f'{client_id}:{client_secret}'.encode()).decode().strip() + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Authorization': f'Basic {b64str}' + } + self.head( + self.ACCESS_TOKEN_URL, + headers=headers, + convert=False, + expected_status=client.METHOD_NOT_ALLOWED) |