diff options
Diffstat (limited to 'keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py')
-rw-r--r-- | keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py | 292 |
1 files changed, 6 insertions, 286 deletions
diff --git a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py index 9ea8077..558a10e 100644 --- a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py +++ b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py @@ -14,9 +14,6 @@ import datetime import os -import shutil -import stat -import tempfile import time import uuid @@ -25,7 +22,6 @@ from keystoneauth1 import exceptions as ksa_exceptions from keystoneauth1 import fixture from keystoneauth1 import loading from keystoneauth1 import session -from keystoneclient.common import cms from keystoneclient import exceptions as ksc_exceptions import mock import oslo_cache @@ -285,11 +281,8 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): self.fake_app = fake_app or FakeApp self.middleware = None - signing_dir = self._setup_signing_directory() - self.conf = { 'identity_uri': 'https://keystone.example.com:1234/testadmin/', - 'signing_dir': signing_dir, 'auth_version': auth_version, 'www_authenticate_uri': 'https://keystone.example.com:1234', 'admin_user': uuid.uuid4().hex, @@ -302,16 +295,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): def call_middleware(self, **kwargs): return self.call(self.middleware, **kwargs) - def _setup_signing_directory(self): - directory_name = self.useFixture(fixtures.TempDir()).path - - # Copy the sample certificate files into the temporary directory. - for filename in ['cacert.pem', 'signing_cert.pem', ]: - shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename), - os.path.join(directory_name, filename)) - - return directory_name - def set_middleware(self, expected_env=None, conf=None): """Configure the class ready to call the auth_token middleware. @@ -569,10 +552,7 @@ class CommonAuthTokenMiddlewareTest(object): """These tests are run once using v2 tokens and again using v3 tokens.""" def test_init_does_not_call_http(self): - conf = { - 'revocation_cache_time': '1' - } - self.create_simple_middleware(conf=conf) + self.create_simple_middleware(conf={}) self.assertLastPath(None) def test_auth_with_no_token_does_not_call_http(self): @@ -619,40 +599,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assert_valid_request_200(self.token_dict['uuid_token_default']) self.assert_valid_last_url(self.token_dict['uuid_token_default']) - def test_valid_signed_request(self): - for _ in range(2): # Do it twice because first result was cached. - self.assert_valid_request_200( - self.token_dict['signed_token_scoped']) - # ensure that signed requests do not generate HTTP traffic - self.assertLastPath(None) - - def test_valid_signed_compressed_request(self): - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - # ensure that signed requests do not generate HTTP traffic - self.assertLastPath(None) - - def test_validate_offline_succeeds_for_unrevoked_token(self): - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped'], - [self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - - def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self): - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped_pkiz'], - [self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - - def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self): - self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) - self.set_middleware() - token = self.middleware._validate_offline( - self.token_dict['signed_token_scoped'], - [self.token_dict['signed_token_scoped_hash_sha256'], - self.token_dict['signed_token_scoped_hash']]) - self.assertIsInstance(token, dict) - def test_request_invalid_uuid_token(self): # remember because we are testing the middleware we stub the connection # to the keystone server, but this is not what gets returned @@ -664,20 +610,6 @@ class CommonAuthTokenMiddlewareTest(object): self.assertEqual('Keystone uri="https://keystone.example.com:1234"', resp.headers['WWW-Authenticate']) - def test_request_invalid_signed_token(self): - token = self.examples.INVALID_SIGNED_TOKEN - resp = self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - self.assertEqual('Keystone uri="https://keystone.example.com:1234"', - resp.headers['WWW-Authenticate']) - - def test_request_invalid_signed_pkiz_token(self): - token = self.examples.INVALID_SIGNED_PKIZ_TOKEN - resp = self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) - self.assertEqual('Keystone uri="https://keystone.example.com:1234"', - resp.headers['WWW-Authenticate']) - def test_request_no_token(self): resp = self.call_middleware(expected_status=401) self.assertEqual('Keystone uri="https://keystone.example.com:1234"', @@ -694,19 +626,8 @@ class CommonAuthTokenMiddlewareTest(object): self.assertEqual('Keystone uri="https://keystone.example.com:1234"', resp.headers['WWW-Authenticate']) - def _get_cached_token(self, token, mode='md5'): - token_id = cms.cms_hash_token(token, mode=mode) - return self.middleware._token_cache.get(token_id) - - def test_memcache(self): - token = self.token_dict['signed_token_scoped'] - self.call_middleware(headers={'X-Auth-Token': token}) - self.assertIsNotNone(self._get_cached_token(token)) - - def test_expired(self): - token = self.token_dict['signed_token_scoped_expired'] - self.call_middleware(headers={'X-Auth-Token': token}, - expected_status=401) + def _get_cached_token(self, token): + return self.middleware._token_cache.get(token) def test_memcache_set_invalid_uuid(self): invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI @@ -743,7 +664,7 @@ class CommonAuthTokenMiddlewareTest(object): conf.update(extra_conf) self.set_middleware(conf=conf) - token = self.token_dict['signed_token_scoped'] + token = self.token_dict['uuid_token_default'] self.call_middleware(headers={'X-Auth-Token': token}) req = webob.Request.blank('/') @@ -989,7 +910,7 @@ class CommonAuthTokenMiddlewareTest(object): orig_cache_set = cache.set cache.set = mock.Mock(side_effect=orig_cache_set) - token = self.token_dict['signed_token_scoped'] + token = self.token_dict['uuid_token_default'] self.call_middleware(headers={'X-Auth-Token': token}) @@ -1126,144 +1047,6 @@ class CommonAuthTokenMiddlewareTest(object): resp.request.headers['X-Service-Identity-Status']) -class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, - testresources.ResourcedTestCase): - - resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] - - def __init__(self, *args, **kwargs): - super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) - self.auth_version = 'v2.0' - self.fake_app = None - self.ca_path = '/v2.0/certificates/ca' - self.signing_path = '/v2.0/certificates/signing' - - def setUp(self): - super(V2CertDownloadMiddlewareTest, self).setUp( - auth_version=self.auth_version, - fake_app=self.fake_app) - self.logger = self.useFixture(fixtures.FakeLogger()) - self.base_dir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.base_dir) - self.cert_dir = os.path.join(self.base_dir, 'certs') - os.makedirs(self.cert_dir, stat.S_IRWXU) - conf = { - 'signing_dir': self.cert_dir, - 'auth_version': self.auth_version, - } - - self.requests_mock.get(BASE_URI, - json=VERSION_LIST_v3, - status_code=300) - - self.set_middleware(conf=conf) - - # Usually we supply a signed_dir with pre-installed certificates, - # so invocation of /usr/bin/openssl succeeds. This time we give it - # an empty directory, so it fails. - def test_request_no_token_dummy(self): - cms._ensure_subprocess() - - self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path), - status_code=404) - self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path), - status_code=404) - - token = self.middleware._validate_offline( - self.examples.SIGNED_TOKEN_SCOPED, - [self.examples.SIGNED_TOKEN_SCOPED_HASH]) - - self.assertIsNone(token) - - self.assertIn('Fetch certificate config failed', self.logger.output) - self.assertIn('fallback to online validation', self.logger.output) - - def test_fetch_signing_cert(self): - data = 'FAKE CERT' - url = "%s%s" % (BASE_URI, self.signing_path) - self.requests_mock.get(url, text=data) - self.middleware._fetch_signing_cert() - - signing_cert_path = self.middleware._signing_directory.calc_path( - self.middleware._SIGNING_CERT_FILE_NAME) - with open(signing_cert_path, 'r') as f: - self.assertEqual(f.read(), data) - - self.assertEqual(url, self.requests_mock.last_request.url) - - def test_fetch_signing_ca(self): - data = 'FAKE CA' - url = "%s%s" % (BASE_URI, self.ca_path) - self.requests_mock.get(url, text=data) - self.middleware._fetch_ca_cert() - - ca_file_path = self.middleware._signing_directory.calc_path( - self.middleware._SIGNING_CA_FILE_NAME) - with open(ca_file_path, 'r') as f: - self.assertEqual(f.read(), data) - - self.assertEqual(url, self.requests_mock.last_request.url) - - def test_prefix_trailing_slash(self): - del self.conf['identity_uri'] - self.conf['auth_protocol'] = 'https' - self.conf['auth_host'] = 'keystone.example.com' - self.conf['auth_port'] = '1234' - self.conf['auth_admin_prefix'] = '/newadmin/' - - base_url = '%s/newadmin' % BASE_HOST - ca_url = "%s%s" % (base_url, self.ca_path) - signing_url = "%s%s" % (base_url, self.signing_path) - - self.requests_mock.get(base_url, - json=VERSION_LIST_v3, - status_code=300) - self.requests_mock.get(ca_url, text='FAKECA') - self.requests_mock.get(signing_url, text='FAKECERT') - - self.set_middleware(conf=self.conf) - - self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests_mock.last_request.url) - - self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests_mock.last_request.url) - - def test_without_prefix(self): - del self.conf['identity_uri'] - self.conf['auth_protocol'] = 'https' - self.conf['auth_host'] = 'keystone.example.com' - self.conf['auth_port'] = '1234' - self.conf['auth_admin_prefix'] = '' - - ca_url = "%s%s" % (BASE_HOST, self.ca_path) - signing_url = "%s%s" % (BASE_HOST, self.signing_path) - - self.requests_mock.get(BASE_HOST, - json=VERSION_LIST_v3, - status_code=300) - self.requests_mock.get(ca_url, text='FAKECA') - self.requests_mock.get(signing_url, text='FAKECERT') - - self.set_middleware(conf=self.conf) - - self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests_mock.last_request.url) - - self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests_mock.last_request.url) - - -class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): - - def __init__(self, *args, **kwargs): - super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) - self.auth_version = 'v3.0' - self.fake_app = v3FakeApp - self.ca_path = '/v3/OS-SIMPLE-CERT/ca' - self.signing_path = '/v3/OS-SIMPLE-CERT/certificates' - - def network_error_response(request, context): raise ksa_exceptions.ConnectFailure("Network connection refused.") @@ -1302,13 +1085,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED, 'uuid_token_bind': self.examples.UUID_TOKEN_BIND, 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND, - 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED, - 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ, - 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH, - 'signed_token_scoped_hash_sha256': - self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256, - 'signed_token_scoped_expired': - self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, 'uuid_service_token_default': self.examples.UUID_SERVICE_TOKEN_DEFAULT, } @@ -1325,9 +1101,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.UUID_TOKEN_BIND, self.examples.UUID_TOKEN_UNKNOWN_BIND, self.examples.UUID_TOKEN_NO_SERVICE_CATALOG, - self.examples.UUID_SERVICE_TOKEN_DEFAULT, - self.examples.SIGNED_TOKEN_SCOPED_KEY, - self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): + self.examples.UUID_SERVICE_TOKEN_DEFAULT,): url = "%s/v2.0/tokens/%s" % (BASE_URI, token) text = self.examples.JSON_TOKEN_RESPONSES[token] self.requests_mock.get(url, text=text) @@ -1357,10 +1131,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assert_unscoped_default_tenant_auto_scopes( self.examples.UUID_TOKEN_DEFAULT) - def test_default_tenant_signed_token(self): - self.assert_unscoped_default_tenant_auto_scopes( - self.examples.SIGNED_TOKEN_SCOPED) - def assert_unscoped_token_receives_401(self, token): """Unscoped requests with no default tenant ID should be rejected.""" resp = self.call_middleware(headers={'X-Auth-Token': token}, @@ -1368,14 +1138,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertEqual('Keystone uri="https://keystone.example.com:1234"', resp.headers['WWW-Authenticate']) - def test_unscoped_uuid_token_receives_401(self): - self.assert_unscoped_token_receives_401( - self.examples.UUID_TOKEN_UNSCOPED) - - def test_unscoped_pki_token_receives_401(self): - self.assert_unscoped_token_receives_401( - self.examples.SIGNED_TOKEN_UNSCOPED) - def test_request_prevent_service_catalog_injection(self): token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG resp = self.call_middleware(headers={'X-Service-Catalog': '[]', @@ -1497,15 +1259,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND, 'uuid_token_unknown_bind': self.examples.v3_UUID_TOKEN_UNKNOWN_BIND, - 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED, - 'signed_token_scoped_pkiz': - self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ, - 'signed_token_scoped_hash': - self.examples.SIGNED_v3_TOKEN_SCOPED_HASH, - 'signed_token_scoped_hash_sha256': - self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256, - 'signed_token_scoped_expired': - self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, 'uuid_service_token_default': self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT, } @@ -1600,39 +1353,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED) self.assertLastPath('/v3/auth/tokens') - def test_gives_v2_catalog(self): - self.set_middleware() - req = self.assert_valid_request_200( - self.examples.SIGNED_v3_TOKEN_SCOPED) - - catalog = jsonutils.loads(req.headers['X-Service-Catalog']) - - for service in catalog: - for endpoint in service['endpoints']: - # no point checking everything, just that it's in v2 format - self.assertIn('adminURL', endpoint) - self.assertIn('publicURL', endpoint) - self.assertIn('internalURL', endpoint) - - def test_fallback_to_online_validation_with_signing_error(self): - self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI, - status_code=404) - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - - def test_fallback_to_online_validation_with_ca_error(self): - self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI, - status_code=404) - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - - def test_fallback_to_online_validation_with_revocation_list_error(self): - self.assert_valid_request_200(self.token_dict['signed_token_scoped']) - self.assert_valid_request_200( - self.token_dict['signed_token_scoped_pkiz']) - def test_user_plugin_token_properties(self): token = self.examples.v3_UUID_TOKEN_DEFAULT token_data = self.examples.TOKEN_RESPONSES[token] |