diff options
Diffstat (limited to 'keystone/tests/unit/test_v3_credential.py')
-rw-r--r-- | keystone/tests/unit/test_v3_credential.py | 427 |
1 files changed, 393 insertions, 34 deletions
diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index 81f2b70b7..4cce2d1bf 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -21,12 +21,14 @@ import http.client from keystoneclient.contrib.ec2 import utils as ec2_utils from oslo_db import exception as oslo_db_exception from testtools import matchers +import urllib from keystone.api import ec2tokens from keystone.common import provider_api from keystone.common import utils from keystone.credential.providers import fernet as credential_fernet from keystone import exception +from keystone import oauth1 from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 @@ -63,6 +65,33 @@ class CredentialBaseTestCase(test_v3.RestfulTestCase): return json.dumps(blob), credential_id + def _test_get_token(self, access, secret): + """Test signature validation with the access/secret provided.""" + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + # Now make a request to validate the signed dummy request via the + # ec2tokens API. This proves the v3 ec2 credentials actually work. + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + r = self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + expected_status=http.client.OK) + self.assertValidTokenResponse(r) + return r.result['token'] + class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD.""" @@ -258,6 +287,126 @@ class CredentialTestCase(CredentialBaseTestCase): 'credential_id': credential_id}, body={'credential': update_ref}) + def test_update_credential_non_owner(self): + """Call ``PATCH /credentials/{credential_id}``.""" + alt_user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id) + alt_user_id = alt_user['id'] + alt_project = unit.new_project_ref(domain_id=self.domain_id) + alt_project_id = alt_project['id'] + PROVIDERS.resource_api.create_project( + alt_project['id'], alt_project) + alt_role = unit.new_role_ref(name='reader') + alt_role_id = alt_role['id'] + PROVIDERS.role_api.create_role(alt_role_id, alt_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + alt_user_id, alt_project_id, alt_role_id) + auth = self.build_authentication_request( + user_id=alt_user_id, + password=alt_user['password'], + project_id=alt_project_id) + ref = unit.new_credential_ref(user_id=alt_user_id, + project_id=alt_project_id) + r = self.post( + '/credentials', + auth=auth, + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + + # Cannot change the credential to be owned by another user + update_ref = {'user_id': self.user_id, 'project_id': self.project_id} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + expected_status=403, + auth=auth, + body={'credential': update_ref}) + + def test_update_ec2_credential_change_trust_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['trust_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different trust + blob['trust_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the trust + del blob['trust_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + + def test_update_ec2_credential_change_app_cred_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['app_cred_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different app cred + blob['app_cred_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the app cred + del blob['app_cred_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + + def test_update_ec2_credential_change_access_token_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['access_token_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different access token + blob['access_token_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the access token + del blob['access_token_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + def test_delete_credential(self): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( @@ -393,7 +542,7 @@ class CredentialTestCase(CredentialBaseTestCase): self.assertValidCredentialResponse(r, ref) -class TestCredentialTrustScoped(test_v3.RestfulTestCase): +class TestCredentialTrustScoped(CredentialBaseTestCase): """Test credential with trust scoped token.""" def setUp(self): @@ -446,7 +595,7 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + blob, ref = unit.new_ec2_credential(user_id=self.user_id, project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}, token=token_id) @@ -463,6 +612,21 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) + # Create a role assignment to ensure that it is ignored and only the + # trust-delegated roles are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + # Create second ec2 credential with the same access key id and check # for conflict. self.post( @@ -472,34 +636,229 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): expected_status=http.client.CONFLICT) -class TestCredentialEc2(CredentialBaseTestCase): - """Test v3 credential compatibility with ec2tokens.""" +class TestCredentialAppCreds(CredentialBaseTestCase): + """Test credential with application credential token.""" - def _validate_signature(self, access, secret): - """Test signature validation with the access/secret provided.""" - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) + def setUp(self): + super(TestCredentialAppCreds, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) - # Now make a request to validate the signed dummy request via the - # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - r = self.post( - '/ec2tokens', - body={'ec2Credentials': sig_ref}, - expected_status=http.client.OK) - self.assertValidTokenResponse(r) + def test_app_cred_ec2_credential(self): + """Test creating ec2 credential from an application credential. + + Call ``POST /credentials``. + """ + # Create the app cred + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + r = self.post('/users/%s/application_credentials' % self.user_id, + body={'application_credential': ref}) + app_cred = r.result['application_credential'] + + # Get an application credential token + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], + secret=app_cred['secret']) + r = self.v3_create_token(auth_data) + token_id = r.headers.get('X-Subject-Token') + + # Create the credential with the app cred token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the app_cred_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['app_cred_id'] = app_cred['id'] + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the app cred are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + # Create second ec2 credential with the same access key id and check + # for conflict. + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http.client.CONFLICT) + + +class TestCredentialAccessToken(CredentialBaseTestCase): + """Test credential with access token.""" + + def setUp(self): + super(TestCredentialAccessToken, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) + self.base_url = 'http://localhost/v3' + + def _urllib_parse_qs_text_keys(self, content): + results = urllib.parse.parse_qs(content) + return {key.decode('utf-8'): value for key, value in results.items()} + + def _create_single_consumer(self): + endpoint = '/OS-OAUTH1/consumers' + + ref = {'description': uuid.uuid4().hex} + resp = self.post(endpoint, body={'consumer': ref}) + return resp.result['consumer'] + + def _create_request_token(self, consumer, project_id, base_url=None): + endpoint = '/OS-OAUTH1/request_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + signature_method=oauth1.SIG_HMAC, + callback_uri="oob") + headers = {'requested_project_id': project_id} + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST', + headers=headers) + return endpoint, headers + + def _create_access_token(self, consumer, token, base_url=None): + endpoint = '/OS-OAUTH1/access_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC, + verifier=token.verifier) + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + return endpoint, headers + + def _get_oauth_token(self, consumer, token): + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC) + endpoint = '/auth/tokens' + url, headers, body = client.sign(self.base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}} + return endpoint, headers, ref + + def _authorize_request_token(self, request_id): + if isinstance(request_id, bytes): + request_id = request_id.decode() + return '/OS-OAUTH1/authorize/%s' % (request_id) + + def _get_access_token(self): + consumer = self._create_single_consumer() + consumer_id = consumer['id'] + consumer_secret = consumer['secret'] + consumer = {'key': consumer_id, 'secret': consumer_secret} + + url, headers = self._create_request_token(consumer, self.project_id) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + request_key = credentials['oauth_token'][0] + request_secret = credentials['oauth_token_secret'][0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + body = {'roles': [{'id': self.role_id}]} + resp = self.put(url, body=body, expected_status=http.client.OK) + verifier = resp.result['token']['oauth_verifier'] + + request_token.set_verifier(verifier) + url, headers = self._create_access_token(consumer, request_token) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + access_key = credentials['oauth_token'][0] + access_secret = credentials['oauth_token_secret'][0] + access_token = oauth1.Token(access_key, access_secret) + + url, headers, body = self._get_oauth_token(consumer, access_token) + content = self.post(url, headers=headers, body=body) + return access_key, content.headers['X-Subject-Token'] + + def test_access_token_ec2_credential(self): + """Test creating ec2 credential from an oauth access token. + + Call ``POST /credentials``. + """ + access_key, token_id = self._get_access_token() + + # Create the credential with the access token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the access_token_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['access_token_id'] = access_key.decode('utf-8') + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the access token are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + +class TestCredentialEc2(CredentialBaseTestCase): + """Test v3 credential compatibility with ec2tokens.""" def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" @@ -514,15 +873,15 @@ class TestCredentialEc2(CredentialBaseTestCase): cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def _get_ec2_cred_uri(self): return '/users/%s/credentials/OS-EC2' % self.user_id @@ -538,8 +897,8 @@ class TestCredentialEc2(CredentialBaseTestCase): self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertIsNone(ec2_cred['trust_id']) - self._validate_signature(access=ec2_cred['access'], - secret=ec2_cred['secret']) + self._test_get_token(access=ec2_cred['access'], + secret=ec2_cred['secret']) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri)) |