summaryrefslogtreecommitdiff
path: root/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py')
-rw-r--r--keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py292
1 files changed, 6 insertions, 286 deletions
diff --git a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
index 9ea8077..558a10e 100644
--- a/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
@@ -14,9 +14,6 @@
import datetime
import os
-import shutil
-import stat
-import tempfile
import time
import uuid
@@ -25,7 +22,6 @@ from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1 import fixture
from keystoneauth1 import loading
from keystoneauth1 import session
-from keystoneclient.common import cms
from keystoneclient import exceptions as ksc_exceptions
import mock
import oslo_cache
@@ -285,11 +281,8 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
self.fake_app = fake_app or FakeApp
self.middleware = None
- signing_dir = self._setup_signing_directory()
-
self.conf = {
'identity_uri': 'https://keystone.example.com:1234/testadmin/',
- 'signing_dir': signing_dir,
'auth_version': auth_version,
'www_authenticate_uri': 'https://keystone.example.com:1234',
'admin_user': uuid.uuid4().hex,
@@ -302,16 +295,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
def call_middleware(self, **kwargs):
return self.call(self.middleware, **kwargs)
- def _setup_signing_directory(self):
- directory_name = self.useFixture(fixtures.TempDir()).path
-
- # Copy the sample certificate files into the temporary directory.
- for filename in ['cacert.pem', 'signing_cert.pem', ]:
- shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename),
- os.path.join(directory_name, filename))
-
- return directory_name
-
def set_middleware(self, expected_env=None, conf=None):
"""Configure the class ready to call the auth_token middleware.
@@ -569,10 +552,7 @@ class CommonAuthTokenMiddlewareTest(object):
"""These tests are run once using v2 tokens and again using v3 tokens."""
def test_init_does_not_call_http(self):
- conf = {
- 'revocation_cache_time': '1'
- }
- self.create_simple_middleware(conf=conf)
+ self.create_simple_middleware(conf={})
self.assertLastPath(None)
def test_auth_with_no_token_does_not_call_http(self):
@@ -619,40 +599,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
- def test_valid_signed_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_valid_signed_compressed_request(self):
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_validate_offline_succeeds_for_unrevoked_token(self):
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
- def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped_pkiz'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
- def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- token = self.middleware._validate_offline(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash_sha256'],
- self.token_dict['signed_token_scoped_hash']])
- self.assertIsInstance(token, dict)
-
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
@@ -664,20 +610,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
resp.headers['WWW-Authenticate'])
- def test_request_invalid_signed_token(self):
- token = self.examples.INVALID_SIGNED_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
- self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
- resp.headers['WWW-Authenticate'])
-
- def test_request_invalid_signed_pkiz_token(self):
- token = self.examples.INVALID_SIGNED_PKIZ_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
- self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
- resp.headers['WWW-Authenticate'])
-
def test_request_no_token(self):
resp = self.call_middleware(expected_status=401)
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
@@ -694,19 +626,8 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
resp.headers['WWW-Authenticate'])
- def _get_cached_token(self, token, mode='md5'):
- token_id = cms.cms_hash_token(token, mode=mode)
- return self.middleware._token_cache.get(token_id)
-
- def test_memcache(self):
- token = self.token_dict['signed_token_scoped']
- self.call_middleware(headers={'X-Auth-Token': token})
- self.assertIsNotNone(self._get_cached_token(token))
-
- def test_expired(self):
- token = self.token_dict['signed_token_scoped_expired']
- self.call_middleware(headers={'X-Auth-Token': token},
- expected_status=401)
+ def _get_cached_token(self, token):
+ return self.middleware._token_cache.get(token)
def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
@@ -743,7 +664,7 @@ class CommonAuthTokenMiddlewareTest(object):
conf.update(extra_conf)
self.set_middleware(conf=conf)
- token = self.token_dict['signed_token_scoped']
+ token = self.token_dict['uuid_token_default']
self.call_middleware(headers={'X-Auth-Token': token})
req = webob.Request.blank('/')
@@ -989,7 +910,7 @@ class CommonAuthTokenMiddlewareTest(object):
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
- token = self.token_dict['signed_token_scoped']
+ token = self.token_dict['uuid_token_default']
self.call_middleware(headers={'X-Auth-Token': token})
@@ -1126,144 +1047,6 @@ class CommonAuthTokenMiddlewareTest(object):
resp.request.headers['X-Service-Identity-Status'])
-class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def __init__(self, *args, **kwargs):
- super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v2.0'
- self.fake_app = None
- self.ca_path = '/v2.0/certificates/ca'
- self.signing_path = '/v2.0/certificates/signing'
-
- def setUp(self):
- super(V2CertDownloadMiddlewareTest, self).setUp(
- auth_version=self.auth_version,
- fake_app=self.fake_app)
- self.logger = self.useFixture(fixtures.FakeLogger())
- self.base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.base_dir)
- self.cert_dir = os.path.join(self.base_dir, 'certs')
- os.makedirs(self.cert_dir, stat.S_IRWXU)
- conf = {
- 'signing_dir': self.cert_dir,
- 'auth_version': self.auth_version,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- self.set_middleware(conf=conf)
-
- # Usually we supply a signed_dir with pre-installed certificates,
- # so invocation of /usr/bin/openssl succeeds. This time we give it
- # an empty directory, so it fails.
- def test_request_no_token_dummy(self):
- cms._ensure_subprocess()
-
- self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path),
- status_code=404)
- self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path),
- status_code=404)
-
- token = self.middleware._validate_offline(
- self.examples.SIGNED_TOKEN_SCOPED,
- [self.examples.SIGNED_TOKEN_SCOPED_HASH])
-
- self.assertIsNone(token)
-
- self.assertIn('Fetch certificate config failed', self.logger.output)
- self.assertIn('fallback to online validation', self.logger.output)
-
- def test_fetch_signing_cert(self):
- data = 'FAKE CERT'
- url = "%s%s" % (BASE_URI, self.signing_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_signing_cert()
-
- signing_cert_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CERT_FILE_NAME)
- with open(signing_cert_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_fetch_signing_ca(self):
- data = 'FAKE CA'
- url = "%s%s" % (BASE_URI, self.ca_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_ca_cert()
-
- ca_file_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CA_FILE_NAME)
- with open(ca_file_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_prefix_trailing_slash(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = '/newadmin/'
-
- base_url = '%s/newadmin' % BASE_HOST
- ca_url = "%s%s" % (base_url, self.ca_path)
- signing_url = "%s%s" % (base_url, self.signing_path)
-
- self.requests_mock.get(base_url,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
- def test_without_prefix(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = ''
-
- ca_url = "%s%s" % (BASE_HOST, self.ca_path)
- signing_url = "%s%s" % (BASE_HOST, self.signing_path)
-
- self.requests_mock.get(BASE_HOST,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
-
-class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
-
- def __init__(self, *args, **kwargs):
- super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v3.0'
- self.fake_app = v3FakeApp
- self.ca_path = '/v3/OS-SIMPLE-CERT/ca'
- self.signing_path = '/v3/OS-SIMPLE-CERT/certificates'
-
-
def network_error_response(request, context):
raise ksa_exceptions.ConnectFailure("Network connection refused.")
@@ -1302,13 +1085,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
'uuid_token_bind': self.examples.UUID_TOKEN_BIND,
'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'uuid_service_token_default':
self.examples.UUID_SERVICE_TOKEN_DEFAULT,
}
@@ -1325,9 +1101,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.UUID_TOKEN_BIND,
self.examples.UUID_TOKEN_UNKNOWN_BIND,
self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
- self.examples.UUID_SERVICE_TOKEN_DEFAULT,
- self.examples.SIGNED_TOKEN_SCOPED_KEY,
- self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
+ self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests_mock.get(url, text=text)
@@ -1357,10 +1131,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assert_unscoped_default_tenant_auto_scopes(
self.examples.UUID_TOKEN_DEFAULT)
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.SIGNED_TOKEN_SCOPED)
-
def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected."""
resp = self.call_middleware(headers={'X-Auth-Token': token},
@@ -1368,14 +1138,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertEqual('Keystone uri="https://keystone.example.com:1234"',
resp.headers['WWW-Authenticate'])
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.SIGNED_TOKEN_UNSCOPED)
-
def test_request_prevent_service_catalog_injection(self):
token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
@@ -1497,15 +1259,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND,
'uuid_token_unknown_bind':
self.examples.v3_UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz':
- self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'uuid_service_token_default':
self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT,
}
@@ -1600,39 +1353,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/v3/auth/tokens')
- def test_gives_v2_catalog(self):
- self.set_middleware()
- req = self.assert_valid_request_200(
- self.examples.SIGNED_v3_TOKEN_SCOPED)
-
- catalog = jsonutils.loads(req.headers['X-Service-Catalog'])
-
- for service in catalog:
- for endpoint in service['endpoints']:
- # no point checking everything, just that it's in v2 format
- self.assertIn('adminURL', endpoint)
- self.assertIn('publicURL', endpoint)
- self.assertIn('internalURL', endpoint)
-
- def test_fallback_to_online_validation_with_signing_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_ca_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_revocation_list_error(self):
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
def test_user_plugin_token_properties(self):
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]