summaryrefslogtreecommitdiff
path: root/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py')
-rw-r--r--keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py699
1 files changed, 123 insertions, 576 deletions
diff --git a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
index 25fbf73..67b60de 100644
--- a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
@@ -14,9 +14,6 @@
import datetime
import os
-import shutil
-import stat
-import tempfile
import time
import uuid
@@ -25,8 +22,6 @@ from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1 import fixture
from keystoneauth1 import loading
from keystoneauth1 import session
-from keystoneclient.common import cms
-from keystoneclient import exceptions as ksc_exceptions
import mock
import oslo_cache
from oslo_log import log as logging
@@ -55,9 +50,6 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
'HTTP_X_USER_NAME': 'user_name1',
'HTTP_X_ROLES': 'role1,role2',
'HTTP_X_IS_ADMIN_PROJECT': 'True',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = {
@@ -90,7 +82,8 @@ BASE_URI = '%s/testadmin' % BASE_HOST
FAKE_ADMIN_TOKEN_ID = 'admin_token2'
FAKE_ADMIN_TOKEN = jsonutils.dumps(
{'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID,
- 'expires': '2022-10-03T16:58:01Z'}}})
+ 'expires': '%i-10-03T16:58:01Z' %
+ (1 + time.gmtime().tm_year)}}})
VERSION_LIST_v3 = fixture.DiscoveryList(href=BASE_URI)
VERSION_LIST_v2 = fixture.DiscoveryList(v3=False, href=BASE_URI)
@@ -285,11 +278,8 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
self.fake_app = fake_app or FakeApp
self.middleware = None
- signing_dir = self._setup_signing_directory()
-
self.conf = {
'identity_uri': 'https://keystone.example.com:1234/testadmin/',
- 'signing_dir': signing_dir,
'auth_version': auth_version,
'www_authenticate_uri': 'https://keystone.example.com:1234',
'admin_user': uuid.uuid4().hex,
@@ -302,16 +292,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
def call_middleware(self, **kwargs):
return self.call(self.middleware, **kwargs)
- def _setup_signing_directory(self):
- directory_name = self.useFixture(fixtures.TempDir()).path
-
- # Copy the sample certificate files into the temporary directory.
- for filename in ['cacert.pem', 'signing_cert.pem', ]:
- shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename),
- os.path.join(directory_name, filename))
-
- return directory_name
-
def set_middleware(self, expected_env=None, conf=None):
"""Configure the class ready to call the auth_token middleware.
@@ -348,44 +328,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
self.assertIsNone(self.requests_mock.last_request)
-class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- """Auth Token middleware should understand Diablo keystone responses."""
- def setUp(self):
- # pre-diablo only had Tenant ID, which was also the Name
- expected_env = {
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_id1',
- # now deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_id1',
- }
-
- super(DiabloAuthTokenMiddlewareTest, self).setUp(
- expected_env=expected_env)
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.token_id = self.examples.VALID_DIABLO_TOKEN
- token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
-
- url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
- self.requests_mock.get(url, text=token_response)
-
- self.set_middleware()
-
- def test_valid_diablo_response(self):
- resp = self.call_middleware(headers={'X-Auth-Token': self.token_id})
- self.assertIn('keystone.token_info', resp.request.environ)
-
-
class CachePoolTest(BaseAuthTokenMiddlewareTest):
def test_use_cache_from_env(self):
# If `swift.cache` is set in the environment and `cache` is set in the
@@ -569,10 +511,7 @@ class CommonAuthTokenMiddlewareTest(object):
"""These tests are run once using v2 tokens and again using v3 tokens."""
def test_init_does_not_call_http(self):
- conf = {
- 'revocation_cache_time': '1'
- }
- self.create_simple_middleware(conf=conf)
+ self.create_simple_middleware(conf={})
self.assertLastPath(None)
def test_auth_with_no_token_does_not_call_http(self):
@@ -619,40 +558,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
- def test_valid_signed_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_valid_signed_compressed_request(self):
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_validate_offline_succeeds_for_unrevoked_token(self):
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
- def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped_pkiz'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
- def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash_sha256'],
- self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
@@ -664,20 +569,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
resp.headers['WWW-Authenticate'])
- def test_request_invalid_signed_token(self):
- token = self.examples.INVALID_SIGNED_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
- self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
- resp.headers['WWW-Authenticate'])
-
- def test_request_invalid_signed_pkiz_token(self):
- token = self.examples.INVALID_SIGNED_PKIZ_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
- self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
- resp.headers['WWW-Authenticate'])
-
def test_request_no_token(self):
resp = self.call_middleware(expected_status=401)
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
@@ -694,22 +585,11 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
resp.headers['WWW-Authenticate'])
- def _get_cached_token(self, token, mode='md5'):
- token_id = cms.cms_hash_token(token, mode=mode)
- return self.middleware._token_cache.get(token_id)
-
- def test_memcache(self):
- token = self.token_dict['signed_token_scoped']
- self.call_middleware(headers={'X-Auth-Token': token})
- self.assertIsNotNone(self._get_cached_token(token))
-
- def test_expired(self):
- token = self.token_dict['signed_token_scoped_expired']
- self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
+ def _get_cached_token(self, token):
+ return self.middleware._token_cache.get(token)
def test_memcache_set_invalid_uuid(self):
- invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
+ invalid_uri = "%s/v3/tokens/invalid-token" % BASE_URI
self.requests_mock.get(invalid_uri, status_code=404)
token = 'invalid-token'
@@ -720,7 +600,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_memcache_hit_invalid_token(self):
token = 'invalid-token'
- invalid_uri = '%s/v2.0/tokens/invalid-token' % BASE_URI
+ invalid_uri = '%s/v3/tokens/invalid-token' % BASE_URI
self.requests_mock.get(invalid_uri, status_code=404)
# Call once to cache token's invalid state; verify it cached as such
@@ -743,7 +623,7 @@ class CommonAuthTokenMiddlewareTest(object):
conf.update(extra_conf)
self.set_middleware(conf=conf)
- token = self.token_dict['signed_token_scoped']
+ token = self.token_dict['uuid_token_default']
self.call_middleware(headers={'X-Auth-Token': token})
req = webob.Request.blank('/')
@@ -989,7 +869,7 @@ class CommonAuthTokenMiddlewareTest(object):
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
- token = self.token_dict['signed_token_scoped']
+ token = self.token_dict['uuid_token_default']
self.call_middleware(headers={'X-Auth-Token': token})
@@ -1126,144 +1006,6 @@ class CommonAuthTokenMiddlewareTest(object):
resp.request.headers['X-Service-Identity-Status'])
-class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def __init__(self, *args, **kwargs):
- super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v2.0'
- self.fake_app = None
- self.ca_path = '/v2.0/certificates/ca'
- self.signing_path = '/v2.0/certificates/signing'
-
- def setUp(self):
- super(V2CertDownloadMiddlewareTest, self).setUp(
- auth_version=self.auth_version,
- fake_app=self.fake_app)
- self.logger = self.useFixture(fixtures.FakeLogger())
- self.base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.base_dir)
- self.cert_dir = os.path.join(self.base_dir, 'certs')
- os.makedirs(self.cert_dir, stat.S_IRWXU)
- conf = {
- 'signing_dir': self.cert_dir,
- 'auth_version': self.auth_version,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- self.set_middleware(conf=conf)
-
- # Usually we supply a signed_dir with pre-installed certificates,
- # so invocation of /usr/bin/openssl succeeds. This time we give it
- # an empty directory, so it fails.
- def test_request_no_token_dummy(self):
- cms._ensure_subprocess()
-
- self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path),
- status_code=404)
- self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path),
- status_code=404)
-
- token = self.middleware._validate_offline(
- self.examples.SIGNED_TOKEN_SCOPED,
- [self.examples.SIGNED_TOKEN_SCOPED_HASH])
-
- self.assertIsNone(token)
-
- self.assertIn('Fetch certificate config failed', self.logger.output)
- self.assertIn('fallback to online validation', self.logger.output)
-
- def test_fetch_signing_cert(self):
- data = 'FAKE CERT'
- url = "%s%s" % (BASE_URI, self.signing_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_signing_cert()
-
- signing_cert_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CERT_FILE_NAME)
- with open(signing_cert_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_fetch_signing_ca(self):
- data = 'FAKE CA'
- url = "%s%s" % (BASE_URI, self.ca_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_ca_cert()
-
- ca_file_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CA_FILE_NAME)
- with open(ca_file_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_prefix_trailing_slash(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = '/newadmin/'
-
- base_url = '%s/newadmin' % BASE_HOST
- ca_url = "%s%s" % (base_url, self.ca_path)
- signing_url = "%s%s" % (base_url, self.signing_path)
-
- self.requests_mock.get(base_url,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
- def test_without_prefix(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = ''
-
- ca_url = "%s%s" % (BASE_HOST, self.ca_path)
- signing_url = "%s%s" % (BASE_HOST, self.signing_path)
-
- self.requests_mock.get(BASE_HOST,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
-
-class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
-
- def __init__(self, *args, **kwargs):
- super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v3.0'
- self.fake_app = v3FakeApp
- self.ca_path = '/v3/OS-SIMPLE-CERT/ca'
- self.signing_path = '/v3/OS-SIMPLE-CERT/certificates'
-
-
def network_error_response(request, context):
raise ksa_exceptions.ConnectFailure("Network connection refused.")
@@ -1273,190 +1015,6 @@ def request_timeout_response(request, context):
"Request to https://host/token/path timed out")
-class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- CommonAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """v2 token specific tests.
-
- There are some differences between how the auth-token middleware handles
- v2 and v3 tokens over and above the token formats, namely:
-
- - A v3 keystone server will auto scope a token to a user's default project
- if no scope is specified. A v2 server assumes that the auth-token
- middleware will do that.
- - A v2 keystone server may issue a token without a catalog, even with a
- tenant
-
- The tests below were originally part of the generic AuthTokenMiddlewareTest
- class, but now, since they really are v2 specific, they are included here.
-
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v2AuthTokenMiddlewareTest, self).setUp()
-
- self.token_dict = {
- 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT,
- 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
- 'uuid_token_bind': self.examples.UUID_TOKEN_BIND,
- 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
- 'uuid_service_token_default':
- self.examples.UUID_SERVICE_TOKEN_DEFAULT,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- for token in (self.examples.UUID_TOKEN_DEFAULT,
- self.examples.UUID_TOKEN_UNSCOPED,
- self.examples.UUID_TOKEN_BIND,
- self.examples.UUID_TOKEN_UNKNOWN_BIND,
- self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
- self.examples.UUID_SERVICE_TOKEN_DEFAULT,
- self.examples.SIGNED_TOKEN_SCOPED_KEY,
- self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
- url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get(url, text=text)
-
- url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
- self.requests_mock.get(url, text=network_error_response)
-
- url = '%s/v2.0/tokens/%s' % (BASE_URI, TIMEOUT_TOKEN)
- self.requests_mock.get(url, text=request_timeout_response)
-
- self.set_middleware()
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped v2 requests with a default tenant should ``auto-scope``.
-
- The implied scope is the user's tenant ID.
-
- """
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(FakeApp.SUCCESS, resp.body)
- self.assertIn('keystone.token_info', resp.request.environ)
-
- def assert_valid_last_url(self, token_id):
- self.assertLastPath("/v2.0/tokens/%s" % token_id)
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- resp = self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
- self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
- resp.headers['WWW-Authenticate'])
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.SIGNED_TOKEN_UNSCOPED)
-
- def test_request_prevent_service_catalog_injection(self):
- token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
- resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
- 'X-Auth-Token': token})
-
- self.assertFalse(resp.request.headers.get('X-Service-Catalog'))
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- def test_user_plugin_token_properties(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_data = self.examples.TOKEN_RESPONSES[token]
- service = self.examples.UUID_SERVICE_TOKEN_DEFAULT
-
- resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
- 'X-Auth-Token': token,
- 'X-Service-Token': service})
-
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- token_auth = resp.request.environ['keystone.token_auth']
-
- self.assertTrue(token_auth.has_user_token)
- self.assertTrue(token_auth.has_service_token)
-
- self.assertEqual(token_data.user_id, token_auth.user.user_id)
- self.assertEqual(token_data.tenant_id, token_auth.user.project_id)
-
- self.assertThat(token_auth.user.role_names, matchers.HasLength(2))
- self.assertIn('role1', token_auth.user.role_names)
- self.assertIn('role2', token_auth.user.role_names)
-
- self.assertIsNone(token_auth.user.trust_id)
- self.assertIsNone(token_auth.user.user_domain_id)
- self.assertIsNone(token_auth.user.project_domain_id)
-
- self.assertThat(token_auth.service.role_names, matchers.HasLength(2))
- self.assertIn('service', token_auth.service.role_names)
- self.assertIn('service_role2', token_auth.service.role_names)
-
- self.assertIsNone(token_auth.service.trust_id)
-
-
-class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_valid_uuid_request_forced_to_2_0(self):
- """Test forcing auth_token to use lower api version.
-
- By installing the v3 http hander, auth_token will be get
- a version list that looks like a v3 server - from which it
- would normally chose v3.0 as the auth version. However, here
- we specify v2.0 in the configuration - which should force
- auth_token to use that version instead.
-
- """
- conf = {
- 'auth_version': 'v2.0'
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- token = self.examples.UUID_TOKEN_DEFAULT
- url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get(url, text=text)
-
- self.set_middleware(conf=conf)
-
- # This tests will only work is auth_token has chosen to use the
- # lower, v2, api version
- self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(url, self.requests_mock.last_request.url)
-
-
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
CommonAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
@@ -1468,19 +1026,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
This is done by configuring the AuthTokenMiddlewareTest class via
its Setup(), passing in v3 style data that will then be used by
- the tests themselves. This approach has been used to ensure we
- really are running the same tests for both v2 and v3 tokens.
-
- There a few additional specific test for v3 only:
-
- - We allow an unscoped token to be validated (as unscoped), where
- as for v2 tokens, the auth_token middleware is expected to try and
- auto-scope it (and fail if there is no default tenant)
- - Domain scoped tokens
-
- Since we don't specify an auth version for auth_token to use, by
- definition we are thefore implicitely testing that it will use
- the highest available auth version, i.e. v3.0
+ the tests themselves.
"""
@@ -1497,15 +1043,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND,
'uuid_token_unknown_bind':
self.examples.v3_UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz':
- self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'uuid_service_token_default':
self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT,
}
@@ -1600,39 +1137,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/v3/auth/tokens')
- def test_gives_v2_catalog(self):
- self.set_middleware()
- req = self.assert_valid_request_200(
- self.examples.SIGNED_v3_TOKEN_SCOPED)
-
- catalog = jsonutils.loads(req.headers['X-Service-Catalog'])
-
- for service in catalog:
- for endpoint in service['endpoints']:
- # no point checking everything, just that it's in v2 format
- self.assertIn('adminURL', endpoint)
- self.assertIn('publicURL', endpoint)
- self.assertIn('internalURL', endpoint)
-
- def test_fallback_to_online_validation_with_signing_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_ca_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_revocation_list_error(self):
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
def test_user_plugin_token_properties(self):
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]
@@ -1734,6 +1238,110 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
e = self.requests_mock.request_history[3].qs.get('allow_expired')
self.assertIsNone(e)
+ def test_app_cred_token_without_access_rules(self):
+ self.set_middleware(conf={'service_type': 'compute'})
+ token = self.examples.v3_APP_CRED_TOKEN
+ token_data = self.examples.TOKEN_RESPONSES[token]
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
+ token_auth = resp.request.environ['keystone.token_auth']
+ self.assertEqual(token_data.application_credential_id,
+ token_auth.user.application_credential_id)
+
+ def test_app_cred_access_rules_token(self):
+ self.set_middleware(conf={'service_type': 'compute'})
+ token = self.examples.v3_APP_CRED_ACCESS_RULES
+ token_data = self.examples.TOKEN_RESPONSES[token]
+ resp = self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET', path='/v2.1/servers')
+ token_auth = resp.request.environ['keystone.token_auth']
+ self.assertEqual(token_data.application_credential_id,
+ token_auth.user.application_credential_id)
+ self.assertEqual(token_data.application_credential_access_rules,
+ token_auth.user.application_credential_access_rules)
+ resp = self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET',
+ path='/v2.1/servers/someuuid')
+ token_auth = resp.request.environ['keystone.token_auth']
+ self.assertEqual(token_data.application_credential_id,
+ token_auth.user.application_credential_id)
+ self.assertEqual(token_data.application_credential_access_rules,
+ token_auth.user.application_credential_access_rules)
+
+ def test_app_cred_access_rules_service_request(self):
+ self.set_middleware(conf={'service_type': 'image'})
+ token = self.examples.v3_APP_CRED_ACCESS_RULES
+ headers = {'X-Auth-Token': token}
+ self.call_middleware(headers=headers,
+ expected_status=401,
+ method='GET', path='/v2/images')
+ service_token = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
+ headers['X-Service-Token'] = service_token
+ self.call_middleware(headers=headers,
+ expected_status=200,
+ method='GET', path='/v2/images')
+
+ def test_app_cred_no_access_rules_token(self):
+ self.set_middleware(conf={'service_type': 'compute'})
+ token = self.examples.v3_APP_CRED_EMPTY_ACCESS_RULES
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v2.1/servers')
+ service_token = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
+ headers = {
+ 'X-Auth-Token': token,
+ 'X-Service-Token': service_token
+ }
+ self.call_middleware(headers=headers, expected_status=401,
+ method='GET', path='/v2.1/servers')
+
+ def test_app_cred_matching_rules(self):
+ self.set_middleware(conf={'service_type': 'compute'})
+ token = self.examples.v3_APP_CRED_MATCHING_RULES
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET', path='/v2.1/servers/foobar')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v2.1/servers/foobar/barfoo')
+ self.set_middleware(conf={'service_type': 'image'})
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET', path='/v2/images/foobar')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v2/images/foobar/barfoo')
+ self.set_middleware(conf={'service_type': 'identity'})
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET',
+ path='/v3/projects/123/users/456/roles/member')
+ self.set_middleware(conf={'service_type': 'block-storage'})
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET', path='/v3/123/types/456')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v3/123/types')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v2/123/types/456')
+ self.set_middleware(conf={'service_type': 'object-store'})
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=200,
+ method='GET', path='/v1/1/2/3')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v1/1/2')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/v2/1/2')
+ self.call_middleware(headers={'X-Auth-Token': token},
+ expected_status=401,
+ method='GET', path='/info')
+
class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
@@ -1743,7 +1351,7 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
if request.headers.get('X-Subject-Token') == ERROR_TOKEN:
msg = 'Network connection refused.'
- raise ksc_exceptions.ConnectionRefused(msg)
+ raise ksa_exceptions.ConnectFailure(msg)
# All others just fail
context.status_code = 404
@@ -1753,7 +1361,7 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
body = uuid.uuid4().hex
www_authenticate_uri = 'http://local.test'
conf = {'delay_auth_decision': 'True',
- 'auth_version': 'v3.0',
+ 'auth_version': 'v3',
'www_authenticate_uri': www_authenticate_uri}
middleware = self.create_simple_middleware(status='401 Unauthorized',
@@ -2057,59 +1665,6 @@ class CommonCompositeAuthTests(object):
bind_level='required')
-class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
- CommonCompositeAuthTests,
- testresources.ResourcedTestCase):
- """Test auth_token middleware with v2 token based composite auth.
-
- Execute the Composite auth class tests, but with the
- auth_token middleware configured to expect v2 tokens back from
- a keystone server.
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v2CompositeAuthTests, self).setUp(
- expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE,
- fake_app=CompositeFakeApp)
-
- uuid_token_default = self.examples.UUID_TOKEN_DEFAULT
- uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT
- uuid_token_bind = self.examples.UUID_TOKEN_BIND
- uuid_service_token_bind = self.examples.UUID_SERVICE_TOKEN_BIND
- self.token_dict = {
- 'uuid_token_default': uuid_token_default,
- 'uuid_service_token_default': uuid_service_token_default,
- 'uuid_token_bind': uuid_token_bind,
- 'uuid_service_token_bind': uuid_service_token_bind,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- for token in (self.examples.UUID_TOKEN_DEFAULT,
- self.examples.UUID_SERVICE_TOKEN_DEFAULT,
- self.examples.UUID_TOKEN_BIND,
- self.examples.UUID_SERVICE_TOKEN_BIND):
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
- text=text)
-
- for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
- "%s/v2.0/tokens/invalid-service-token" % BASE_URI):
- self.requests_mock.get(invalid_uri, text='', status_code=404)
-
- self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
- self.service_token_expected_env = dict(
- EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
- self.set_middleware()
-
-
class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
CommonCompositeAuthTests,
testresources.ResourcedTestCase):
@@ -2124,7 +1679,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
def setUp(self):
super(v3CompositeAuthTests, self).setUp(
- auth_version='v3.0',
+ auth_version='v3',
fake_app=v3CompositeFakeApp)
uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT
@@ -2167,7 +1722,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
if token_id == ERROR_TOKEN:
msg = "Network connection refused."
- raise ksc_exceptions.ConnectionRefused(msg)
+ raise ksa_exceptions.ConnectFailure(msg)
elif token_id == TIMEOUT_TOKEN:
request_timeout_response(request, context)
@@ -2195,7 +1750,7 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
self.call_middleware(headers={'X-Auth-Token': uuid.uuid4().hex},
expected_status=503)
- self.assertIn('versions [v3.0, v2.0]', self.logger.output)
+ self.assertIn('versions [v3.0]', self.logger.output)
def _assert_auth_version(self, conf_version, identity_server_version):
self.set_middleware(conf={'auth_version': conf_version})
@@ -2204,8 +1759,6 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
identity_server.auth_version)
def test_micro_version(self):
- self._assert_auth_version('v2', (2, 0))
- self._assert_auth_version('v2.0', (2, 0))
self._assert_auth_version('v3', (3, 0))
self._assert_auth_version('v3.0', (3, 0))
self._assert_auth_version('v3.1', (3, 0))
@@ -2219,14 +1772,10 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
self._assert_auth_version(None, (3, 0))
- # VERSION_LIST_v2 contains only v2 version elements
- self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
- self._assert_auth_version(None, (2, 0))
-
def test_unsupported_auth_version(self):
- # If the requested version isn't supported we will use v2
- self._assert_auth_version('v1', (2, 0))
- self._assert_auth_version('v10', (2, 0))
+ # If the requested version isn't supported we will use v3
+ self._assert_auth_version('v1', (3, 0))
+ self._assert_auth_version('v10', (3, 0))
class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
@@ -2236,9 +1785,7 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
KEYSTONE_BASE_URL = 'http://keystone.url/prefix'
CRUD_URL = 'http://crud.url/prefix'
- # NOTE(jamielennox): use the /v2.0 prefix here because this is what's most
- # likely to be in the service catalog and we should be able to ignore it.
- KEYSTONE_URL = KEYSTONE_BASE_URL + '/v2.0'
+ KEYSTONE_URL = KEYSTONE_BASE_URL + '/v3'
def setUp(self):
super(AuthProtocolLoadingTests, self).setUp()