diff options
Diffstat (limited to 'keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py')
-rw-r--r-- | keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py | 699 |
1 files changed, 123 insertions, 576 deletions
diff --git a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py index 25fbf73..67b60de 100644 --- a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py @@ -14,9 +14,6 @@ import datetime import os -import shutil -import stat -import tempfile import time import uuid @@ -25,8 +22,6 @@ from keystoneauth1 import exceptions as ksa_exceptions from keystoneauth1 import fixture from keystoneauth1 import loading from keystoneauth1 import session -from keystoneclient.common import cms -from keystoneclient import exceptions as ksc_exceptions import mock import oslo_cache from oslo_log import log as logging @@ -55,9 +50,6 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = { 'HTTP_X_USER_NAME': 'user_name1', 'HTTP_X_ROLES': 'role1,role2', 'HTTP_X_IS_ADMIN_PROJECT': 'True', - 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat) - 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat) - 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) } EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = { @@ -90,7 +82,8 @@ BASE_URI = '%s/testadmin' % BASE_HOST FAKE_ADMIN_TOKEN_ID = 'admin_token2' FAKE_ADMIN_TOKEN = jsonutils.dumps( {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID, - 'expires': '2022-10-03T16:58:01Z'}}}) + 'expires': '%i-10-03T16:58:01Z' % + (1 + time.gmtime().tm_year)}}}) VERSION_LIST_v3 = fixture.DiscoveryList(href=BASE_URI) VERSION_LIST_v2 = fixture.DiscoveryList(v3=False, href=BASE_URI) @@ -285,11 +278,8 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): self.fake_app = fake_app or FakeApp self.middleware = None - signing_dir = self._setup_signing_directory() - self.conf = { 'identity_uri': 'https://keystone.example.com:1234/testadmin/', - 'signing_dir': signing_dir, 'auth_version': auth_version, 'www_authenticate_uri': 'https://keystone.example.com:1234', 'admin_user': uuid.uuid4().hex, @@ -302,16 +292,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): def call_middleware(self, **kwargs): return self.call(self.middleware, **kwargs) - def _setup_signing_directory(self): - directory_name = self.useFixture(fixtures.TempDir()).path - - # Copy the sample certificate files into the temporary directory. - for filename in ['cacert.pem', 'signing_cert.pem', ]: - shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename), - os.path.join(directory_name, filename)) - - return directory_name - def set_middleware(self, expected_env=None, conf=None): """Configure the class ready to call the auth_token middleware. @@ -348,44 +328,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): self.assertIsNone(self.requests_mock.last_request) -class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, - testresources.ResourcedTestCase): - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - """Auth Token middleware should understand Diablo keystone responses.""" - def setUp(self): - # pre-diablo only had Tenant ID, which was also the Name - expected_env = { - 'HTTP_X_TENANT_ID': 'tenant_id1', - 'HTTP_X_TENANT_NAME': 'tenant_id1', - # now deprecated (diablo-compat) - 'HTTP_X_TENANT': 'tenant_id1', - } - - super(DiabloAuthTokenMiddlewareTest, self).setUp( - expected_env=expected_env) - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) - - self.requests_mock.post("%s/v2.0/tokens" % BASE_URI, - text=FAKE_ADMIN_TOKEN) - - self.token_id = self.examples.VALID_DIABLO_TOKEN - token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id] - - url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id) - self.requests_mock.get(url, text=token_response) - - self.set_middleware() - - def test_valid_diablo_response(self): - resp = self.call_middleware(headers={'X-Auth-Token': self.token_id}) - self.assertIn('keystone.token_info', resp.request.environ) - - class CachePoolTest(BaseAuthTokenMiddlewareTest): def test_use_cache_from_env(self): # If `swift.cache` is set in the environment and `cache` is set in the @@ -569,10 +511,7 @@ class CommonAuthTokenMiddlewareTest(object): """These tests are run once using v2 tokens and again using v3 tokens.""" def test_init_does_not_call_http(self): - conf = { - 'revocation_cache_time': '1' - } - self.create_simple_middleware(conf=conf) + self.create_simple_middleware(conf={}) self.assertLastPath(None) def test_auth_with_no_token_does_not_call_http(self): @@ -619,40 +558,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assert_valid_request_200(self.token_dict['uuid_token_default']) self.assert_valid_last_url(self.token_dict['uuid_token_default']) - def test_valid_signed_request(self): - for _ in range(2): # Do it twice because first result was cached. - self.assert_valid_request_200( - self.token_dict['signed_token_scoped']) - # ensure that signed requests do not generate HTTP traffic - self.assertLastPath(None) - - def test_valid_signed_compressed_request(self): - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - # ensure that signed requests do not generate HTTP traffic - self.assertLastPath(None) - - def test_validate_offline_succeeds_for_unrevoked_token(self): - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped'], - [self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - - def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self): - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped_pkiz'], - [self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - - def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self): - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped'], - [self.token_dict['signed_token_scoped_hash_sha256'], - self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - def test_request_invalid_uuid_token(self): # remember because we are testing the middleware we stub the connection # to the keystone server, but this is not what gets returned @@ -664,20 +569,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assertEqual('Keystone uri="https://keystone.example.com:1234"', resp.headers['WWW-Authenticate']) - def test_request_invalid_signed_token(self): - token = self.examples.INVALID_SIGNED_TOKEN - resp = self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - self.assertEqual('Keystone uri="https://keystone.example.com:1234"', - resp.headers['WWW-Authenticate']) - - def test_request_invalid_signed_pkiz_token(self): - token = self.examples.INVALID_SIGNED_PKIZ_TOKEN - resp = self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - self.assertEqual('Keystone uri="https://keystone.example.com:1234"', - resp.headers['WWW-Authenticate']) - def test_request_no_token(self): resp = self.call_middleware(expected_status=401) self.assertEqual('Keystone uri="https://keystone.example.com:1234"', @@ -694,22 +585,11 @@ class CommonAuthTokenMiddlewareTest(object): self.assertEqual('Keystone uri="https://keystone.example.com:1234"', resp.headers['WWW-Authenticate']) - def _get_cached_token(self, token, mode='md5'): - token_id = cms.cms_hash_token(token, mode=mode) - return self.middleware._token_cache.get(token_id) - - def test_memcache(self): - token = self.token_dict['signed_token_scoped'] - self.call_middleware(headers={'X-Auth-Token': token}) - self.assertIsNotNone(self._get_cached_token(token)) - - def test_expired(self): - token = self.token_dict['signed_token_scoped_expired'] - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) + def _get_cached_token(self, token): + return self.middleware._token_cache.get(token) def test_memcache_set_invalid_uuid(self): - invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI + invalid_uri = "%s/v3/tokens/invalid-token" % BASE_URI self.requests_mock.get(invalid_uri, status_code=404) token = 'invalid-token' @@ -720,7 +600,7 @@ class CommonAuthTokenMiddlewareTest(object): def test_memcache_hit_invalid_token(self): token = 'invalid-token' - invalid_uri = '%s/v2.0/tokens/invalid-token' % BASE_URI + invalid_uri = '%s/v3/tokens/invalid-token' % BASE_URI self.requests_mock.get(invalid_uri, status_code=404) # Call once to cache token's invalid state; verify it cached as such @@ -743,7 +623,7 @@ class CommonAuthTokenMiddlewareTest(object): conf.update(extra_conf) self.set_middleware(conf=conf) - token = self.token_dict['signed_token_scoped'] + token = self.token_dict['uuid_token_default'] self.call_middleware(headers={'X-Auth-Token': token}) req = webob.Request.blank('/') @@ -989,7 +869,7 @@ class CommonAuthTokenMiddlewareTest(object): orig_cache_set = cache.set cache.set = mock.Mock(side_effect=orig_cache_set) - token = self.token_dict['signed_token_scoped'] + token = self.token_dict['uuid_token_default'] self.call_middleware(headers={'X-Auth-Token': token}) @@ -1126,144 +1006,6 @@ class CommonAuthTokenMiddlewareTest(object): resp.request.headers['X-Service-Identity-Status']) -class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, - testresources.ResourcedTestCase): - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - def __init__(self, *args, **kwargs): - super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) - self.auth_version = 'v2.0' - self.fake_app = None - self.ca_path = '/v2.0/certificates/ca' - self.signing_path = '/v2.0/certificates/signing' - - def setUp(self): - super(V2CertDownloadMiddlewareTest, self).setUp( - auth_version=self.auth_version, - fake_app=self.fake_app) - self.logger = self.useFixture(fixtures.FakeLogger()) - self.base_dir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.base_dir) - self.cert_dir = os.path.join(self.base_dir, 'certs') - os.makedirs(self.cert_dir, stat.S_IRWXU) - conf = { - 'signing_dir': self.cert_dir, - 'auth_version': self.auth_version, - } - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v3, - status_code=300) - - self.set_middleware(conf=conf) - - # Usually we supply a signed_dir with pre-installed certificates, - # so invocation of /usr/bin/openssl succeeds. This time we give it - # an empty directory, so it fails. - def test_request_no_token_dummy(self): - cms._ensure_subprocess() - - self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path), - status_code=404) - self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path), - status_code=404) - - token = self.middleware._validate_offline( - self.examples.SIGNED_TOKEN_SCOPED, - [self.examples.SIGNED_TOKEN_SCOPED_HASH]) - - self.assertIsNone(token) - - self.assertIn('Fetch certificate config failed', self.logger.output) - self.assertIn('fallback to online validation', self.logger.output) - - def test_fetch_signing_cert(self): - data = 'FAKE CERT' - url = "%s%s" % (BASE_URI, self.signing_path) - self.requests_mock.get(url, text=data) - self.middleware._fetch_signing_cert() - - signing_cert_path = self.middleware._signing_directory.calc_path( - self.middleware._SIGNING_CERT_FILE_NAME) - with open(signing_cert_path, 'r') as f: - self.assertEqual(f.read(), data) - - self.assertEqual(url, self.requests_mock.last_request.url) - - def test_fetch_signing_ca(self): - data = 'FAKE CA' - url = "%s%s" % (BASE_URI, self.ca_path) - self.requests_mock.get(url, text=data) - self.middleware._fetch_ca_cert() - - ca_file_path = self.middleware._signing_directory.calc_path( - self.middleware._SIGNING_CA_FILE_NAME) - with open(ca_file_path, 'r') as f: - self.assertEqual(f.read(), data) - - self.assertEqual(url, self.requests_mock.last_request.url) - - def test_prefix_trailing_slash(self): - del self.conf['identity_uri'] - self.conf['auth_protocol'] = 'https' - self.conf['auth_host'] = 'keystone.example.com' - self.conf['auth_port'] = '1234' - self.conf['auth_admin_prefix'] = '/newadmin/' - - base_url = '%s/newadmin' % BASE_HOST - ca_url = "%s%s" % (base_url, self.ca_path) - signing_url = "%s%s" % (base_url, self.signing_path) - - self.requests_mock.get(base_url, - json=VERSION_LIST_v3, - status_code=300) - self.requests_mock.get(ca_url, text='FAKECA') - self.requests_mock.get(signing_url, text='FAKECERT') - - self.set_middleware(conf=self.conf) - - self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests_mock.last_request.url) - - self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests_mock.last_request.url) - - def test_without_prefix(self): - del self.conf['identity_uri'] - self.conf['auth_protocol'] = 'https' - self.conf['auth_host'] = 'keystone.example.com' - self.conf['auth_port'] = '1234' - self.conf['auth_admin_prefix'] = '' - - ca_url = "%s%s" % (BASE_HOST, self.ca_path) - signing_url = "%s%s" % (BASE_HOST, self.signing_path) - - self.requests_mock.get(BASE_HOST, - json=VERSION_LIST_v3, - status_code=300) - self.requests_mock.get(ca_url, text='FAKECA') - self.requests_mock.get(signing_url, text='FAKECERT') - - self.set_middleware(conf=self.conf) - - self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests_mock.last_request.url) - - self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests_mock.last_request.url) - - -class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): - - def __init__(self, *args, **kwargs): - super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) - self.auth_version = 'v3.0' - self.fake_app = v3FakeApp - self.ca_path = '/v3/OS-SIMPLE-CERT/ca' - self.signing_path = '/v3/OS-SIMPLE-CERT/certificates' - - def network_error_response(request, context): raise ksa_exceptions.ConnectFailure("Network connection refused.") @@ -1273,190 +1015,6 @@ def request_timeout_response(request, context): "Request to https://host/token/path timed out") -class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, - CommonAuthTokenMiddlewareTest, - testresources.ResourcedTestCase): - """v2 token specific tests. - - There are some differences between how the auth-token middleware handles - v2 and v3 tokens over and above the token formats, namely: - - - A v3 keystone server will auto scope a token to a user's default project - if no scope is specified. A v2 server assumes that the auth-token - middleware will do that. - - A v2 keystone server may issue a token without a catalog, even with a - tenant - - The tests below were originally part of the generic AuthTokenMiddlewareTest - class, but now, since they really are v2 specific, they are included here. - - """ - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - def setUp(self): - super(v2AuthTokenMiddlewareTest, self).setUp() - - self.token_dict = { - 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT, - 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED, - 'uuid_token_bind': self.examples.UUID_TOKEN_BIND, - 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND, - 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED, - 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ, - 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH, - 'signed_token_scoped_hash_sha256': - self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256, - 'signed_token_scoped_expired': - self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, - 'uuid_service_token_default': - self.examples.UUID_SERVICE_TOKEN_DEFAULT, - } - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) - - self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) - - for token in (self.examples.UUID_TOKEN_DEFAULT, - self.examples.UUID_TOKEN_UNSCOPED, - self.examples.UUID_TOKEN_BIND, - self.examples.UUID_TOKEN_UNKNOWN_BIND, - self.examples.UUID_TOKEN_NO_SERVICE_CATALOG, - self.examples.UUID_SERVICE_TOKEN_DEFAULT, - self.examples.SIGNED_TOKEN_SCOPED_KEY, - self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): - url = "%s/v2.0/tokens/%s" % (BASE_URI, token) - text = self.examples.JSON_TOKEN_RESPONSES[token] - self.requests_mock.get(url, text=text) - - url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN) - self.requests_mock.get(url, text=network_error_response) - - url = '%s/v2.0/tokens/%s' % (BASE_URI, TIMEOUT_TOKEN) - self.requests_mock.get(url, text=request_timeout_response) - - self.set_middleware() - - def assert_unscoped_default_tenant_auto_scopes(self, token): - """Unscoped v2 requests with a default tenant should ``auto-scope``. - - The implied scope is the user's tenant ID. - - """ - resp = self.call_middleware(headers={'X-Auth-Token': token}) - self.assertEqual(FakeApp.SUCCESS, resp.body) - self.assertIn('keystone.token_info', resp.request.environ) - - def assert_valid_last_url(self, token_id): - self.assertLastPath("/v2.0/tokens/%s" % token_id) - - def test_default_tenant_uuid_token(self): - self.assert_unscoped_default_tenant_auto_scopes( - self.examples.UUID_TOKEN_DEFAULT) - - def test_default_tenant_signed_token(self): - self.assert_unscoped_default_tenant_auto_scopes( - self.examples.SIGNED_TOKEN_SCOPED) - - def assert_unscoped_token_receives_401(self, token): - """Unscoped requests with no default tenant ID should be rejected.""" - resp = self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - self.assertEqual('Keystone uri="https://keystone.example.com:1234"', - resp.headers['WWW-Authenticate']) - - def test_unscoped_uuid_token_receives_401(self): - self.assert_unscoped_token_receives_401( - self.examples.UUID_TOKEN_UNSCOPED) - - def test_unscoped_pki_token_receives_401(self): - self.assert_unscoped_token_receives_401( - self.examples.SIGNED_TOKEN_UNSCOPED) - - def test_request_prevent_service_catalog_injection(self): - token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG - resp = self.call_middleware(headers={'X-Service-Catalog': '[]', - 'X-Auth-Token': token}) - - self.assertFalse(resp.request.headers.get('X-Service-Catalog')) - self.assertEqual(FakeApp.SUCCESS, resp.body) - - def test_user_plugin_token_properties(self): - token = self.examples.UUID_TOKEN_DEFAULT - token_data = self.examples.TOKEN_RESPONSES[token] - service = self.examples.UUID_SERVICE_TOKEN_DEFAULT - - resp = self.call_middleware(headers={'X-Service-Catalog': '[]', - 'X-Auth-Token': token, - 'X-Service-Token': service}) - - self.assertEqual(FakeApp.SUCCESS, resp.body) - - token_auth = resp.request.environ['keystone.token_auth'] - - self.assertTrue(token_auth.has_user_token) - self.assertTrue(token_auth.has_service_token) - - self.assertEqual(token_data.user_id, token_auth.user.user_id) - self.assertEqual(token_data.tenant_id, token_auth.user.project_id) - - self.assertThat(token_auth.user.role_names, matchers.HasLength(2)) - self.assertIn('role1', token_auth.user.role_names) - self.assertIn('role2', token_auth.user.role_names) - - self.assertIsNone(token_auth.user.trust_id) - self.assertIsNone(token_auth.user.user_domain_id) - self.assertIsNone(token_auth.user.project_domain_id) - - self.assertThat(token_auth.service.role_names, matchers.HasLength(2)) - self.assertIn('service', token_auth.service.role_names) - self.assertIn('service_role2', token_auth.service.role_names) - - self.assertIsNone(token_auth.service.trust_id) - - -class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, - testresources.ResourcedTestCase): - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - def test_valid_uuid_request_forced_to_2_0(self): - """Test forcing auth_token to use lower api version. - - By installing the v3 http hander, auth_token will be get - a version list that looks like a v3 server - from which it - would normally chose v3.0 as the auth version. However, here - we specify v2.0 in the configuration - which should force - auth_token to use that version instead. - - """ - conf = { - 'auth_version': 'v2.0' - } - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v3, - status_code=300) - - self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) - - token = self.examples.UUID_TOKEN_DEFAULT - url = "%s/v2.0/tokens/%s" % (BASE_URI, token) - text = self.examples.JSON_TOKEN_RESPONSES[token] - self.requests_mock.get(url, text=text) - - self.set_middleware(conf=conf) - - # This tests will only work is auth_token has chosen to use the - # lower, v2, api version - self.call_middleware(headers={'X-Auth-Token': token}) - self.assertEqual(url, self.requests_mock.last_request.url) - - class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, CommonAuthTokenMiddlewareTest, testresources.ResourcedTestCase): @@ -1468,19 +1026,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, This is done by configuring the AuthTokenMiddlewareTest class via its Setup(), passing in v3 style data that will then be used by - the tests themselves. This approach has been used to ensure we - really are running the same tests for both v2 and v3 tokens. - - There a few additional specific test for v3 only: - - - We allow an unscoped token to be validated (as unscoped), where - as for v2 tokens, the auth_token middleware is expected to try and - auto-scope it (and fail if there is no default tenant) - - Domain scoped tokens - - Since we don't specify an auth version for auth_token to use, by - definition we are thefore implicitely testing that it will use - the highest available auth version, i.e. v3.0 + the tests themselves. """ @@ -1497,15 +1043,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND, 'uuid_token_unknown_bind': self.examples.v3_UUID_TOKEN_UNKNOWN_BIND, - 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED, - 'signed_token_scoped_pkiz': - self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ, - 'signed_token_scoped_hash': - self.examples.SIGNED_v3_TOKEN_SCOPED_HASH, - 'signed_token_scoped_hash_sha256': - self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256, - 'signed_token_scoped_expired': - self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, 'uuid_service_token_default': self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT, } @@ -1600,39 +1137,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED) self.assertLastPath('/v3/auth/tokens') - def test_gives_v2_catalog(self): - self.set_middleware() - req = self.assert_valid_request_200( - self.examples.SIGNED_v3_TOKEN_SCOPED) - - catalog = jsonutils.loads(req.headers['X-Service-Catalog']) - - for service in catalog: - for endpoint in service['endpoints']: - # no point checking everything, just that it's in v2 format - self.assertIn('adminURL', endpoint) - self.assertIn('publicURL', endpoint) - self.assertIn('internalURL', endpoint) - - def test_fallback_to_online_validation_with_signing_error(self): - self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI, - status_code=404) - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - - def test_fallback_to_online_validation_with_ca_error(self): - self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI, - status_code=404) - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - - def test_fallback_to_online_validation_with_revocation_list_error(self): - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - def test_user_plugin_token_properties(self): token = self.examples.v3_UUID_TOKEN_DEFAULT token_data = self.examples.TOKEN_RESPONSES[token] @@ -1734,6 +1238,110 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, e = self.requests_mock.request_history[3].qs.get('allow_expired') self.assertIsNone(e) + def test_app_cred_token_without_access_rules(self): + self.set_middleware(conf={'service_type': 'compute'}) + token = self.examples.v3_APP_CRED_TOKEN + token_data = self.examples.TOKEN_RESPONSES[token] + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(FakeApp.SUCCESS, resp.body) + token_auth = resp.request.environ['keystone.token_auth'] + self.assertEqual(token_data.application_credential_id, + token_auth.user.application_credential_id) + + def test_app_cred_access_rules_token(self): + self.set_middleware(conf={'service_type': 'compute'}) + token = self.examples.v3_APP_CRED_ACCESS_RULES + token_data = self.examples.TOKEN_RESPONSES[token] + resp = self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', path='/v2.1/servers') + token_auth = resp.request.environ['keystone.token_auth'] + self.assertEqual(token_data.application_credential_id, + token_auth.user.application_credential_id) + self.assertEqual(token_data.application_credential_access_rules, + token_auth.user.application_credential_access_rules) + resp = self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', + path='/v2.1/servers/someuuid') + token_auth = resp.request.environ['keystone.token_auth'] + self.assertEqual(token_data.application_credential_id, + token_auth.user.application_credential_id) + self.assertEqual(token_data.application_credential_access_rules, + token_auth.user.application_credential_access_rules) + + def test_app_cred_access_rules_service_request(self): + self.set_middleware(conf={'service_type': 'image'}) + token = self.examples.v3_APP_CRED_ACCESS_RULES + headers = {'X-Auth-Token': token} + self.call_middleware(headers=headers, + expected_status=401, + method='GET', path='/v2/images') + service_token = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT + headers['X-Service-Token'] = service_token + self.call_middleware(headers=headers, + expected_status=200, + method='GET', path='/v2/images') + + def test_app_cred_no_access_rules_token(self): + self.set_middleware(conf={'service_type': 'compute'}) + token = self.examples.v3_APP_CRED_EMPTY_ACCESS_RULES + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v2.1/servers') + service_token = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT + headers = { + 'X-Auth-Token': token, + 'X-Service-Token': service_token + } + self.call_middleware(headers=headers, expected_status=401, + method='GET', path='/v2.1/servers') + + def test_app_cred_matching_rules(self): + self.set_middleware(conf={'service_type': 'compute'}) + token = self.examples.v3_APP_CRED_MATCHING_RULES + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', path='/v2.1/servers/foobar') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v2.1/servers/foobar/barfoo') + self.set_middleware(conf={'service_type': 'image'}) + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', path='/v2/images/foobar') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v2/images/foobar/barfoo') + self.set_middleware(conf={'service_type': 'identity'}) + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', + path='/v3/projects/123/users/456/roles/member') + self.set_middleware(conf={'service_type': 'block-storage'}) + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', path='/v3/123/types/456') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v3/123/types') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v2/123/types/456') + self.set_middleware(conf={'service_type': 'object-store'}) + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=200, + method='GET', path='/v1/1/2/3') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v1/1/2') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/v2/1/2') + self.call_middleware(headers={'X-Auth-Token': token}, + expected_status=401, + method='GET', path='/info') + class DelayedAuthTests(BaseAuthTokenMiddlewareTest): @@ -1743,7 +1351,7 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest): if request.headers.get('X-Subject-Token') == ERROR_TOKEN: msg = 'Network connection refused.' - raise ksc_exceptions.ConnectionRefused(msg) + raise ksa_exceptions.ConnectFailure(msg) # All others just fail context.status_code = 404 @@ -1753,7 +1361,7 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest): body = uuid.uuid4().hex www_authenticate_uri = 'http://local.test' conf = {'delay_auth_decision': 'True', - 'auth_version': 'v3.0', + 'auth_version': 'v3', 'www_authenticate_uri': www_authenticate_uri} middleware = self.create_simple_middleware(status='401 Unauthorized', @@ -2057,59 +1665,6 @@ class CommonCompositeAuthTests(object): bind_level='required') -class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, - CommonCompositeAuthTests, - testresources.ResourcedTestCase): - """Test auth_token middleware with v2 token based composite auth. - - Execute the Composite auth class tests, but with the - auth_token middleware configured to expect v2 tokens back from - a keystone server. - """ - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - def setUp(self): - super(v2CompositeAuthTests, self).setUp( - expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE, - fake_app=CompositeFakeApp) - - uuid_token_default = self.examples.UUID_TOKEN_DEFAULT - uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT - uuid_token_bind = self.examples.UUID_TOKEN_BIND - uuid_service_token_bind = self.examples.UUID_SERVICE_TOKEN_BIND - self.token_dict = { - 'uuid_token_default': uuid_token_default, - 'uuid_service_token_default': uuid_service_token_default, - 'uuid_token_bind': uuid_token_bind, - 'uuid_service_token_bind': uuid_service_token_bind, - } - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) - - self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) - - for token in (self.examples.UUID_TOKEN_DEFAULT, - self.examples.UUID_SERVICE_TOKEN_DEFAULT, - self.examples.UUID_TOKEN_BIND, - self.examples.UUID_SERVICE_TOKEN_BIND): - text = self.examples.JSON_TOKEN_RESPONSES[token] - self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token), - text=text) - - for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI, - "%s/v2.0/tokens/invalid-service-token" % BASE_URI): - self.requests_mock.get(invalid_uri, text='', status_code=404) - - self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) - self.service_token_expected_env = dict( - EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) - self.set_middleware() - - class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, CommonCompositeAuthTests, testresources.ResourcedTestCase): @@ -2124,7 +1679,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, def setUp(self): super(v3CompositeAuthTests, self).setUp( - auth_version='v3.0', + auth_version='v3', fake_app=v3CompositeFakeApp) uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT @@ -2167,7 +1722,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, if token_id == ERROR_TOKEN: msg = "Network connection refused." - raise ksc_exceptions.ConnectionRefused(msg) + raise ksa_exceptions.ConnectFailure(msg) elif token_id == TIMEOUT_TOKEN: request_timeout_response(request, context) @@ -2195,7 +1750,7 @@ class OtherTests(BaseAuthTokenMiddlewareTest): self.call_middleware(headers={'X-Auth-Token': uuid.uuid4().hex}, expected_status=503) - self.assertIn('versions [v3.0, v2.0]', self.logger.output) + self.assertIn('versions [v3.0]', self.logger.output) def _assert_auth_version(self, conf_version, identity_server_version): self.set_middleware(conf={'auth_version': conf_version}) @@ -2204,8 +1759,6 @@ class OtherTests(BaseAuthTokenMiddlewareTest): identity_server.auth_version) def test_micro_version(self): - self._assert_auth_version('v2', (2, 0)) - self._assert_auth_version('v2.0', (2, 0)) self._assert_auth_version('v3', (3, 0)) self._assert_auth_version('v3.0', (3, 0)) self._assert_auth_version('v3.1', (3, 0)) @@ -2219,14 +1772,10 @@ class OtherTests(BaseAuthTokenMiddlewareTest): self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) self._assert_auth_version(None, (3, 0)) - # VERSION_LIST_v2 contains only v2 version elements - self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300) - self._assert_auth_version(None, (2, 0)) - def test_unsupported_auth_version(self): - # If the requested version isn't supported we will use v2 - self._assert_auth_version('v1', (2, 0)) - self._assert_auth_version('v10', (2, 0)) + # If the requested version isn't supported we will use v3 + self._assert_auth_version('v1', (3, 0)) + self._assert_auth_version('v10', (3, 0)) class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): @@ -2236,9 +1785,7 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): KEYSTONE_BASE_URL = 'http://keystone.url/prefix' CRUD_URL = 'http://crud.url/prefix' - # NOTE(jamielennox): use the /v2.0 prefix here because this is what's most - # likely to be in the service catalog and we should be able to ignore it. - KEYSTONE_URL = KEYSTONE_BASE_URL + '/v2.0' + KEYSTONE_URL = KEYSTONE_BASE_URL + '/v3' def setUp(self): super(AuthProtocolLoadingTests, self).setUp() |