summaryrefslogtreecommitdiff
path: root/keystone/tests/unit/test_v3_oauth2.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests/unit/test_v3_oauth2.py')
-rw-r--r--keystone/tests/unit/test_v3_oauth2.py1607
1 files changed, 1573 insertions, 34 deletions
diff --git a/keystone/tests/unit/test_v3_oauth2.py b/keystone/tests/unit/test_v3_oauth2.py
index 3dcee1a04..6eaa8560f 100644
--- a/keystone/tests/unit/test_v3_oauth2.py
+++ b/keystone/tests/unit/test_v3_oauth2.py
@@ -13,16 +13,26 @@
# under the License.
from base64 import b64encode
+from cryptography.hazmat.primitives.serialization import Encoding
+import fixtures
+import http
from http import client
from oslo_log import log
from oslo_serialization import jsonutils
from unittest import mock
from urllib import parse
+from keystone.api.os_oauth2 import AccessTokenResource
+from keystone.common import provider_api
+from keystone.common import utils
from keystone import conf
from keystone import exception
+from keystone.federation.utils import RuleProcessor
+from keystone.tests import unit
from keystone.tests.unit import test_v3
+from keystone.token.provider import Manager
+PROVIDERS = provider_api.ProviderAPIs
LOG = log.getLogger(__name__)
CONF = conf.CONF
@@ -31,7 +41,169 @@ class FakeUserAppCredListCreateResource(mock.Mock):
pass
-class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
+class OAuth2AuthnMethodsTests(test_v3.OAuth2RestfulTestCase):
+ ACCESS_TOKEN_URL = '/OS-OAUTH2/token'
+
+ def setUp(self):
+ super(OAuth2AuthnMethodsTests, self).setUp()
+ self.config_fixture.config(
+ group='oauth2',
+ oauth2_authn_methods=['client_secret_basic', 'tls_client_auth'],
+ )
+
+ def _get_access_token(
+ self,
+ headers,
+ data,
+ expected_status,
+ client_cert_content=None):
+ data = parse.urlencode(data).encode()
+ kwargs = {
+ 'headers': headers,
+ 'noauth': True,
+ 'convert': False,
+ 'body': data,
+ 'expected_status': expected_status
+ }
+ if client_cert_content:
+ kwargs.update({'environ': {
+ 'SSL_CLIENT_CERT': client_cert_content
+ }})
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ **kwargs)
+ return resp
+
+ def _create_certificates(self):
+ return unit.create_certificate(
+ subject_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='tokyo',
+ locality_name='musashino',
+ organizational_unit_name='test'
+ )
+ )
+
+ def _get_cert_content(self, cert):
+ return cert.public_bytes(Encoding.PEM).decode('ascii')
+
+ @mock.patch.object(AccessTokenResource, '_client_secret_basic')
+ def test_secret_basic_header(self, mock_client_secret_basic):
+ """client_secret_basic is used if a client sercret is found."""
+ client_id = 'client_id'
+ client_secret = 'client_secret'
+ b64str = b64encode(
+ f'{client_id}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+
+ _ = self._get_access_token(
+ headers=headers,
+ data=data,
+ expected_status=client.OK)
+ mock_client_secret_basic.assert_called_once_with(
+ client_id, client_secret)
+
+ @mock.patch.object(AccessTokenResource, '_client_secret_basic')
+ def test_secret_basic_form(self, mock_client_secret_basic):
+ """client_secret_basic is used if a client sercret is found."""
+ client_id = 'client_id'
+ client_secret = 'client_secret'
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': client_id,
+ 'client_secret': client_secret
+ }
+
+ _ = self._get_access_token(
+ headers=headers,
+ data=data,
+ expected_status=client.OK)
+ mock_client_secret_basic.assert_called_once_with(
+ client_id, client_secret)
+
+ @mock.patch.object(AccessTokenResource, '_client_secret_basic')
+ def test_secret_basic_header_and_form(self, mock_client_secret_basic):
+ """A header is used if secrets are found in a header and body."""
+ client_id_h = 'client_id_h'
+ client_secret_h = 'client_secret_h'
+ client_id_d = 'client_id_d'
+ client_secret_d = 'client_secret_d'
+ b64str = b64encode(
+ f'{client_id_h}:{client_secret_h}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': client_id_d,
+ 'client_secret': client_secret_d
+ }
+
+ _ = self._get_access_token(
+ headers=headers,
+ data=data,
+ expected_status=client.OK)
+ mock_client_secret_basic.assert_called_once_with(
+ client_id_h, client_secret_h)
+
+ @mock.patch.object(AccessTokenResource, '_tls_client_auth')
+ def test_client_cert(self, mock_tls_client_auth):
+ """tls_client_auth is used if a certificate is found."""
+ client_id = 'client_id'
+ client_cert, _ = self._create_certificates()
+ cert_content = self._get_cert_content(client_cert)
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': client_id
+ }
+ _ = self._get_access_token(
+ headers=headers,
+ data=data,
+ expected_status=client.OK,
+ client_cert_content=cert_content)
+ mock_tls_client_auth.assert_called_once_with(client_id, cert_content)
+
+ @mock.patch.object(AccessTokenResource, '_tls_client_auth')
+ def test_secret_basic_and_client_cert(self, mock_tls_client_auth):
+ """tls_client_auth is used if a certificate and secret are found."""
+ client_id_s = 'client_id_s'
+ client_secret = 'client_secret'
+ client_id_c = 'client_id_c'
+ client_cert, _ = self._create_certificates()
+ cert_content = self._get_cert_content(client_cert)
+ b64str = b64encode(
+ f'{client_id_s}:{client_secret}'.encode()).decode().strip()
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Authorization': f'Basic {b64str}'
+ }
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': client_id_c,
+ }
+
+ _ = self._get_access_token(
+ headers=headers,
+ data=data,
+ expected_status=client.OK,
+ client_cert_content=cert_content)
+ mock_tls_client_auth.assert_called_once_with(client_id_c, cert_content)
+
+
+class OAuth2SecretBasicTests(test_v3.OAuth2RestfulTestCase):
APP_CRED_CREATE_URL = '/users/%(user_id)s/application_credentials'
APP_CRED_LIST_URL = '/users/%(user_id)s/application_credentials'
APP_CRED_DELETE_URL = '/users/%(user_id)s/application_credentials/' \
@@ -41,7 +213,7 @@ class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
ACCESS_TOKEN_URL = '/OS-OAUTH2/token'
def setUp(self):
- super(OAuth2Tests, self).setUp()
+ super(OAuth2SecretBasicTests, self).setUp()
log.set_defaults(
logging_context_format_string='%(asctime)s.%(msecs)03d %('
'color)s%(levelname)s %(name)s [^[['
@@ -53,6 +225,10 @@ class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
CONF.log_opt_values(LOG, log.DEBUG)
LOG.debug(f'is_debug_enabled: {log.is_debug_enabled(CONF)}')
LOG.debug(f'get_default_log_levels: {log.get_default_log_levels()}')
+ self.config_fixture.config(
+ group='oauth2',
+ oauth2_authn_methods=['client_secret_basic'],
+ )
def _assert_error_resp(self, error_resp, error_msg, error_description):
resp_keys = (
@@ -104,16 +280,8 @@ class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
expected_status=expected_status)
return resp
-
-class AccessTokenTests(OAuth2Tests):
-
- def setUp(self):
- super(AccessTokenTests, self).setUp()
-
- def _create_access_token(self, client):
- pass
-
- def _get_access_token_method_not_allowed(self, app_cred, http_func):
+ def _get_access_token_method_not_allowed(self, app_cred,
+ http_func):
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
@@ -151,30 +319,28 @@ class AccessTokenTests(OAuth2Tests):
self.assertEqual('Bearer', json_resp['token_type'])
self.assertEqual(3600, json_resp['expires_in'])
- def test_get_access_token_without_client_auth(self):
+ def test_get_access_token_form(self):
"""Test case when there is no client authorization."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
headers = {
- 'Content-Type': 'application/x-www-form-urlencoded'
+ 'Content-Type': 'application/x-www-form-urlencoded',
}
- error = 'invalid_client'
- error_description = 'OAuth2.0 client authorization is required.'
- resp = self._get_access_token(app_cred,
- b64str=None,
- headers=headers,
- data=None,
- expected_status=client.UNAUTHORIZED)
- self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
- self.assertEqual('Keystone uri="http://localhost/v3"',
- resp.headers.get("WWW-Authenticate"))
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': app_cred.get('id'),
+ 'client_secret': app_cred.get('secret'),
+ }
+ resp = self._get_access_token(
+ app_cred,
+ b64str=None,
+ headers=headers,
+ data=data,
+ expected_status=client.OK)
json_resp = jsonutils.loads(resp.body)
- LOG.debug(f'error: {json_resp.get("error")}')
- LOG.debug(f'error_description: {json_resp.get("error_description")}')
- self.assertEqual(error,
- json_resp.get('error'))
- self.assertEqual(error_description,
- json_resp.get('error_description'))
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
def test_get_access_token_auth_type_is_not_basic(self):
"""Test case when auth_type is not basic."""
@@ -191,8 +357,7 @@ class AccessTokenTests(OAuth2Tests):
'Authorization': f'Digest {base}'
}
error = 'invalid_client'
- error_description = 'OAuth2.0 client authorization type ' \
- 'digest is not supported.'
+ error_description = 'Client authentication failed.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=headers,
@@ -217,7 +382,7 @@ class AccessTokenTests(OAuth2Tests):
b64str = b64encode(
f':{client_secret}'.encode()).decode().strip()
error = 'invalid_client'
- error_description = 'OAuth2.0 client authorization is invalid.'
+ error_description = 'Client authentication failed.'
resp = self._get_access_token(app_cred,
b64str=b64str,
headers=None,
@@ -242,7 +407,7 @@ class AccessTokenTests(OAuth2Tests):
b64str = b64encode(
f'{client_id}:'.encode()).decode().strip()
error = 'invalid_client'
- error_description = 'OAuth2.0 client authorization is invalid.'
+ error_description = 'Client authentication failed.'
resp = self._get_access_token(app_cred,
b64str=b64str,
headers=None,
@@ -530,3 +695,1377 @@ class AccessTokenTests(OAuth2Tests):
headers=headers,
convert=False,
expected_status=client.METHOD_NOT_ALLOWED)
+
+
+class OAuth2CertificateTests(test_v3.OAuth2RestfulTestCase):
+ ACCESS_TOKEN_URL = '/OS-OAUTH2/token'
+
+ def setUp(self):
+ super(OAuth2CertificateTests, self).setUp()
+ self.log_fix = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
+ self.config_fixture.config(group='oauth2',
+ oauth2_authn_methods=['tls_client_auth'])
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_mapping')
+ (
+ self.oauth2_user,
+ self.oauth2_user_domain,
+ _,
+ ) = self._create_project_user()
+ *_, self.client_cert, self.client_key = self._create_certificates(
+ client_dn=unit.create_dn(
+ user_id=self.oauth2_user.get('id'),
+ common_name=self.oauth2_user.get('name'),
+ email_address=self.oauth2_user.get('email'),
+ domain_component=self.oauth2_user_domain.get('id'),
+ organization_name=self.oauth2_user_domain.get('name')
+ )
+ )
+
+ def _create_project_user(self, no_roles=False):
+ new_domain_ref = unit.new_domain_ref()
+ PROVIDERS.resource_api.create_domain(
+ new_domain_ref['id'], new_domain_ref
+ )
+ new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
+ PROVIDERS.resource_api.create_project(
+ new_project_ref['id'], new_project_ref
+ )
+ new_user = unit.create_user(PROVIDERS.identity_api,
+ domain_id=new_domain_ref['id'],
+ project_id=new_project_ref['id'])
+ if not no_roles:
+ PROVIDERS.assignment_api.create_grant(
+ self.role['id'],
+ user_id=new_user['id'],
+ project_id=new_project_ref['id'])
+ return new_user, new_domain_ref, new_project_ref
+
+ def _create_certificates(self,
+ root_dn=None,
+ server_dn=None,
+ client_dn=None):
+ root_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='root'
+ )
+ if root_dn:
+ root_subj = unit.update_dn(root_subj, root_dn)
+
+ root_cert, root_key = unit.create_certificate(root_subj)
+ keystone_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='keystone.local'
+ )
+ if server_dn:
+ keystone_subj = unit.update_dn(keystone_subj, server_dn)
+
+ ks_cert, ks_key = unit.create_certificate(
+ keystone_subj, ca=root_cert, ca_key=root_key)
+ client_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test'
+ )
+ if client_dn:
+ client_subj = unit.update_dn(client_subj, client_dn)
+
+ client_cert, client_key = unit.create_certificate(
+ client_subj, ca=root_cert, ca_key=root_key)
+ return root_cert, root_key, ks_cert, ks_key, client_cert, client_key
+
+ def _create_mapping(self, id='oauth2_mapping', dn_rules=None):
+ rules = []
+ if not dn_rules:
+ dn_rules = [
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }
+ ]
+ for info in dn_rules:
+ index = 0
+ local_user = {}
+ remote = []
+ for k in info:
+ if k == 'user.name':
+ local_user['name'] = '{%s}' % index
+ remote.append({'type': info.get(k)})
+ index += 1
+ elif k == 'user.id':
+ local_user['id'] = '{%s}' % index
+ remote.append({'type': info.get(k)})
+ index += 1
+ elif k == 'user.email':
+ local_user['email'] = '{%s}' % index
+ remote.append({'type': info.get(k)})
+ index += 1
+ elif k == 'user.domain.name' or k == 'user.domain.id':
+ if not local_user.get('domain'):
+ local_user['domain'] = {}
+ if k == 'user.domain.name':
+ local_user['domain']['name'] = '{%s}' % index
+ remote.append({'type': info.get(k)})
+ index += 1
+ else:
+ local_user['domain']['id'] = '{%s}' % index
+ remote.append({'type': info.get(k)})
+ index += 1
+ else:
+ remote.append({
+ 'type': k,
+ 'any_one_of': info.get(k)
+ })
+ rule = {
+ 'local': [
+ {
+ 'user': local_user
+ }
+ ],
+ 'remote': remote
+ }
+ rules.append(rule)
+
+ mapping = {
+ 'id': id,
+ 'rules': rules
+ }
+
+ PROVIDERS.federation_api.create_mapping(mapping['id'], mapping)
+
+ def _get_access_token(self, client_id=None, client_cert_content=None,
+ expected_status=http.client.OK):
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ data = {
+ 'grant_type': 'client_credentials'
+ }
+ if client_id:
+ data.update({'client_id': client_id})
+ data = parse.urlencode(data).encode()
+ kwargs = {
+ 'headers': headers,
+ 'noauth': True,
+ 'convert': False,
+ 'body': data,
+ 'expected_status': expected_status
+ }
+ if client_cert_content:
+ kwargs.update({'environ': {
+ 'SSL_CLIENT_CERT': client_cert_content
+ }})
+ resp = self.post(
+ self.ACCESS_TOKEN_URL,
+ **kwargs)
+ return resp
+
+ def _get_cert_content(self, cert):
+ return cert.public_bytes(Encoding.PEM).decode('ascii')
+
+ def assertUnauthorizedResp(self, resp):
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('invalid_client', json_resp['error'])
+ self.assertEqual(
+ 'Client authentication failed.',
+ json_resp['error_description'])
+
+ def test_get_access_token_project_scope(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping()
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_mapping_config(self):
+ """Test case when an access token can be successfully obtain."""
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_custom')
+ self._create_mapping(
+ id='oauth2_custom',
+ dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }
+ ])
+
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id='test_UID',
+ common_name=user.get('name'),
+ domain_component=user_domain.get('name'),
+ organization_name='test_O'
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_mapping')
+
+ def test_get_access_token_mapping_multi_ca(self):
+ """Test case when an access token can be successfully obtain."""
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_custom')
+ self._create_mapping(
+ id='oauth2_custom',
+ dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['rootA', 'rootB']
+ },
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['rootC']
+ }
+ ])
+
+ # CA rootA OK
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ root_dn=unit.create_dn(
+ common_name='rootA'
+ ),
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ # CA rootB OK
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ root_dn=unit.create_dn(
+ common_name='rootB'
+ ),
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ # CA rootC OK
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ root_dn=unit.create_dn(
+ common_name='rootC'
+ ),
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id='test_UID',
+ common_name=user.get('name'),
+ domain_component=user_domain.get('name'),
+ organization_name='test_O'
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ # CA not found NG
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ root_dn=unit.create_dn(
+ common_name='root_other'
+ ),
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_mapping')
+
+ def test_get_access_token_no_default_mapping(self):
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping id %s is not found. ' % 'oauth2_mapping',
+ self.log_fix.output)
+
+ def test_get_access_token_no_custom_mapping(self):
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_custom')
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping id %s is not found. ' % 'oauth2_custom',
+ self.log_fix.output)
+ self.config_fixture.config(group='oauth2',
+ oauth2_cert_dn_mapping_id='oauth2_mapping')
+
+ def test_get_access_token_ignore_userid(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id') + "_diff",
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_ignore_username(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_ignore_email(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_ignore_domain_id(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.name': 'SSL_CLIENT_SUBJECT_DN_O',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id') + "_diff",
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_ignore_domain_name(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'user.name': 'SSL_CLIENT_SUBJECT_DN_CN',
+ 'user.id': 'SSL_CLIENT_SUBJECT_DN_UID',
+ 'user.email': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS',
+ 'user.domain.id': 'SSL_CLIENT_SUBJECT_DN_DC',
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_ignore_all(self):
+ """Test case when an access token can be successfully obtain."""
+ self._create_mapping(dn_rules=[
+ {
+ 'SSL_CLIENT_ISSUER_DN_CN': ['root']
+ }])
+ user, user_domain, user_project = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id') + "_diff",
+ common_name=user.get('name') + "_diff",
+ email_address=user.get('email') + "_diff",
+ domain_component=user_domain.get('id') + "_diff"
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content)
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ }
+ )
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ self.assertEqual(user_project.get('id'),
+ verify_resp.result['token']['project']['id'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+
+ def test_get_access_token_no_roles_project_scope(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user(no_roles=True)
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED)
+ LOG.debug(resp)
+
+ def test_get_access_token_no_default_project_id(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user(no_roles=True)
+ user['default_project_id'] = None
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ _ = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED)
+
+ def test_get_access_token_without_client_id(self):
+ self._create_mapping()
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn('Get OAuth2.0 Access Token API: '
+ 'failed to get a client_id from the request.',
+ self.log_fix.output)
+
+ def test_get_access_token_without_client_cert(self):
+ self._create_mapping()
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn('Get OAuth2.0 Access Token API: '
+ 'failed to get client credentials from the request.',
+ self.log_fix.output)
+
+ @mock.patch.object(utils, 'get_certificate_subject_dn')
+ def test_get_access_token_failed_to_get_cert_subject_dn(
+ self, mock_get_certificate_subject_dn):
+ self._create_mapping()
+ mock_get_certificate_subject_dn.side_effect = \
+ exception.ValidationError('Boom!')
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn('Get OAuth2.0 Access Token API: '
+ 'failed to get the subject DN from the certificate.',
+ self.log_fix.output)
+
+ @mock.patch.object(utils, 'get_certificate_issuer_dn')
+ def test_get_access_token_failed_to_get_cert_issuer_dn(
+ self, mock_get_certificate_issuer_dn):
+ self._create_mapping()
+ mock_get_certificate_issuer_dn.side_effect = \
+ exception.ValidationError('Boom!')
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn('Get OAuth2.0 Access Token API: '
+ 'failed to get the issuer DN from the certificate.',
+ self.log_fix.output)
+
+ def test_get_access_token_user_not_exist(self):
+ self._create_mapping()
+ cert_content = self._get_cert_content(self.client_cert)
+ user_id_not_exist = 'user_id_not_exist'
+ resp = self._get_access_token(
+ client_id=user_id_not_exist,
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'the user does not exist. user id: %s'
+ % user_id_not_exist,
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_not_match_user_id(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id') + "_diff",
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: %s check failed. '
+ 'DN value: %s, DB value: %s.' % (
+ 'user id',
+ user.get('id') + '_diff',
+ user.get('id')),
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_not_match_user_name(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name') + "_diff",
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: %s check failed. '
+ 'DN value: %s, DB value: %s.' % (
+ 'user name',
+ user.get('name') + '_diff',
+ user.get('name')),
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_not_match_email(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email') + "_diff",
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: %s check failed. '
+ 'DN value: %s, DB value: %s.' % (
+ 'user email',
+ user.get('email') + '_diff',
+ user.get('email')),
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_not_match_domain_id(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id') + "_diff",
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: %s check failed. '
+ 'DN value: %s, DB value: %s.' % (
+ 'user domain id',
+ user_domain.get('id') + '_diff',
+ user_domain.get('id')),
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_not_match_domain_name(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name') + "_diff"
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: %s check failed. '
+ 'DN value: %s, DB value: %s.' % (
+ 'user domain name',
+ user_domain.get('name') + '_diff',
+ user_domain.get('name')),
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_missing_user_id(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_missing_user_name(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_missing_email(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ domain_component=user_domain.get('id'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_missing_domain_id(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ organization_name=user_domain.get('name')
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ def test_get_access_token_cert_dn_missing_domain_name(self):
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+ *_, client_cert, _ = self._create_certificates(
+ client_dn=unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organizational_unit_name='test',
+ user_id=user.get('id'),
+ common_name=user.get('name'),
+ email_address=user.get('email'),
+ domain_component=user_domain.get('id'),
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ resp = self._get_access_token(
+ client_id=user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ self.assertUnauthorizedResp(resp)
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)
+
+ @mock.patch.object(Manager, 'issue_token')
+ def test_get_access_token_issue_token_ks_error_400(self, mock_issue_token):
+ self._create_mapping()
+ err_msg = 'Boom!'
+ mock_issue_token.side_effect = exception.ValidationError(err_msg)
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.BAD_REQUEST
+ )
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('invalid_request', json_resp['error'])
+ self.assertEqual(err_msg, json_resp['error_description'])
+ self.assertIn(err_msg, self.log_fix.output)
+
+ @mock.patch.object(Manager, 'issue_token')
+ def test_get_access_token_issue_token_ks_error_401(self, mock_issue_token):
+ self._create_mapping()
+ err_msg = 'Boom!'
+ mock_issue_token.side_effect = exception.Unauthorized(err_msg)
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.UNAUTHORIZED
+ )
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('invalid_client', json_resp['error'])
+ self.assertEqual(
+ 'The request you have made requires authentication.',
+ json_resp['error_description'])
+
+ @mock.patch.object(Manager, 'issue_token')
+ def test_get_access_token_issue_token_ks_error_other(
+ self, mock_issue_token):
+ self._create_mapping()
+ err_msg = 'Boom!'
+ mock_issue_token.side_effect = exception.NotImplemented(err_msg)
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=exception.NotImplemented.code
+ )
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('other_error', json_resp['error'])
+ self.assertEqual(
+ 'An unknown error occurred and failed to get an OAuth2.0 '
+ 'access token.',
+ json_resp['error_description'])
+
+ @mock.patch.object(Manager, 'issue_token')
+ def test_get_access_token_issue_token_other_exception(
+ self, mock_issue_token):
+ self._create_mapping()
+ err_msg = 'Boom!'
+ mock_issue_token.side_effect = Exception(err_msg)
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.INTERNAL_SERVER_ERROR
+ )
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('other_error', json_resp['error'])
+ self.assertEqual(err_msg, json_resp['error_description'])
+
+ @mock.patch.object(RuleProcessor, 'process')
+ def test_get_access_token_process_other_exception(
+ self, mock_process):
+ self._create_mapping()
+ err_msg = 'Boom!'
+ mock_process.side_effect = Exception(err_msg)
+ cert_content = self._get_cert_content(self.client_cert)
+ resp = self._get_access_token(
+ client_id=self.oauth2_user.get('id'),
+ client_cert_content=cert_content,
+ expected_status=http.client.INTERNAL_SERVER_ERROR
+ )
+ LOG.debug(resp)
+ json_resp = jsonutils.loads(resp.body)
+ self.assertEqual('other_error', json_resp['error'])
+ self.assertEqual(err_msg, json_resp['error_description'])
+ self.assertIn(
+ 'Get OAuth2.0 Access Token API: '
+ 'mapping rule process failed.',
+ self.log_fix.output)