summaryrefslogtreecommitdiff
path: root/keystone/tests/unit/test_v3_auth.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests/unit/test_v3_auth.py')
-rw-r--r--keystone/tests/unit/test_v3_auth.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py
index cf456518a..eb7ea0e29 100644
--- a/keystone/tests/unit/test_v3_auth.py
+++ b/keystone/tests/unit/test_v3_auth.py
@@ -19,8 +19,10 @@ import itertools
import operator
import re
from unittest import mock
+from urllib import parse
import uuid
+from cryptography.hazmat.primitives.serialization import Encoding
import freezegun
import http.client
from oslo_serialization import jsonutils as json
@@ -2645,6 +2647,187 @@ class TokenAPITests(object):
r = self._validate_token(token, allow_expired=True)
self.assertValidProjectScopedTokenResponse(r)
+ def _create_project_user(self):
+ new_domain_ref = unit.new_domain_ref()
+ PROVIDERS.resource_api.create_domain(
+ new_domain_ref['id'], new_domain_ref
+ )
+ new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
+ PROVIDERS.resource_api.create_project(
+ new_project_ref['id'], new_project_ref
+ )
+ new_user = unit.create_user(PROVIDERS.identity_api,
+ domain_id=new_domain_ref['id'],
+ project_id=new_project_ref['id'])
+ PROVIDERS.assignment_api.create_grant(
+ self.role['id'],
+ user_id=new_user['id'],
+ project_id=new_project_ref['id'])
+ return new_user, new_domain_ref, new_project_ref
+
+ def _create_certificates(self,
+ root_dn=None,
+ server_dn=None,
+ client_dn=None):
+ root_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='root'
+ )
+ if root_dn:
+ root_subj = unit.update_dn(root_subj, root_dn)
+
+ root_cert, root_key = unit.create_certificate(root_subj)
+ keystone_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='keystone.local'
+ )
+ if server_dn:
+ keystone_subj = unit.update_dn(keystone_subj, server_dn)
+
+ ks_cert, ks_key = unit.create_certificate(
+ keystone_subj, ca=root_cert, ca_key=root_key)
+ client_subj = unit.create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='client'
+ )
+ if client_dn:
+ client_subj = unit.update_dn(client_subj, client_dn)
+
+ client_cert, client_key = unit.create_certificate(
+ client_subj, ca=root_cert, ca_key=root_key)
+ return root_cert, root_key, ks_cert, ks_key, client_cert, client_key
+
+ def _get_cert_content(self, cert):
+ return cert.public_bytes(Encoding.PEM).decode('ascii')
+
+ def _get_oauth2_access_token(self, client_id, client_cert_content,
+ expected_status=http.client.OK):
+ headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ data = {
+ 'grant_type': 'client_credentials',
+ 'client_id': client_id
+ }
+ extra_environ = {
+ 'SSL_CLIENT_CERT': client_cert_content
+ }
+ data = parse.urlencode(data).encode()
+ resp = self.post(
+ '/OS-OAUTH2/token',
+ headers=headers,
+ noauth=True,
+ convert=False,
+ body=data,
+ environ=extra_environ,
+ expected_status=expected_status)
+ return resp
+
+ def _create_mapping(self):
+ mapping = {
+ 'id': 'oauth2_mapping',
+ 'rules': [
+ {
+ 'local': [
+ {
+ 'user': {
+ 'name': '{0}',
+ 'id': '{1}',
+ 'email': '{2}',
+ 'domain': {
+ 'name': '{3}',
+ 'id': '{4}'
+ }
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'SSL_CLIENT_SUBJECT_DN_CN'
+ },
+ {
+ 'type': 'SSL_CLIENT_SUBJECT_DN_UID'
+ },
+ {
+ 'type': 'SSL_CLIENT_SUBJECT_DN_EMAILADDRESS'
+ },
+ {
+ 'type': 'SSL_CLIENT_SUBJECT_DN_O'
+ },
+ {
+ 'type': 'SSL_CLIENT_SUBJECT_DN_DC'
+ },
+ {
+ 'type': 'SSL_CLIENT_ISSUER_DN_CN',
+ 'any_one_of': [
+ 'root'
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ PROVIDERS.federation_api.create_mapping(mapping['id'], mapping)
+
+ def test_verify_oauth2_token_project_scope_ok(self):
+ cache_on_issue = CONF.token.cache_on_issue
+ caching = CONF.token.caching
+ self._create_mapping()
+ user, user_domain, _ = self._create_project_user()
+
+ *_, client_cert, _ = self._create_certificates(
+ root_dn=unit.create_dn(
+ common_name='root'
+ ),
+ client_dn=unit.create_dn(
+ common_name=user['name'],
+ user_id=user['id'],
+ email_address=user['email'],
+ organization_name=user_domain['name'],
+ domain_component=user_domain['id']
+ )
+ )
+
+ cert_content = self._get_cert_content(client_cert)
+ CONF.token.cache_on_issue = False
+ CONF.token.caching = False
+ resp = self._get_oauth2_access_token(user['id'], cert_content)
+
+ json_resp = json.loads(resp.body)
+ self.assertIn('access_token', json_resp)
+ self.assertEqual('Bearer', json_resp['token_type'])
+ self.assertEqual(3600, json_resp['expires_in'])
+
+ verify_resp = self.get(
+ '/auth/tokens',
+ headers={
+ 'X-Subject-Token': json_resp['access_token'],
+ 'X-Auth-Token': json_resp['access_token']
+ },
+ expected_status=http.client.OK)
+ self.assertIn('token', verify_resp.result)
+ self.assertIn('oauth2_credential', verify_resp.result['token'])
+ self.assertIn('roles', verify_resp.result['token'])
+ self.assertIn('project', verify_resp.result['token'])
+ self.assertIn('catalog', verify_resp.result['token'])
+ check_oauth2 = verify_resp.result['token']['oauth2_credential']
+ self.assertEqual(utils.get_certificate_thumbprint(cert_content),
+ check_oauth2['x5t#S256'])
+ CONF.token.cache_on_issue = cache_on_issue
+ CONF.token.caching = caching
+
class TokenDataTests(object):
"""Test the data in specific token types."""