summaryrefslogtreecommitdiff
path: root/tests/test_auth_token_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_auth_token_middleware.py')
-rw-r--r--tests/test_auth_token_middleware.py863
1 files changed, 650 insertions, 213 deletions
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py
index 58be41d..cc730a7 100644
--- a/tests/test_auth_token_middleware.py
+++ b/tests/test_auth_token_middleware.py
@@ -43,11 +43,16 @@ CA = os.path.join(CERTDIR, 'ca.pem')
REVOCATION_LIST = None
REVOKED_TOKEN = None
REVOKED_TOKEN_HASH = None
+REVOKED_v3_TOKEN = None
+REVOKED_v3_TOKEN_HASH = None
SIGNED_REVOCATION_LIST = None
SIGNED_TOKEN_SCOPED = None
SIGNED_TOKEN_UNSCOPED = None
+SIGNED_v3_TOKEN_SCOPED = None
+SIGNED_v3_TOKEN_UNSCOPED = None
SIGNED_TOKEN_SCOPED_KEY = None
SIGNED_TOKEN_UNSCOPED_KEY = None
+SIGNED_v3_TOKEN_SCOPED_KEY = None
VALID_SIGNED_REVOCATION_LIST = None
@@ -55,6 +60,9 @@ UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
+v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
+v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
+v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
INVALID_SIGNED_TOKEN = string.replace(
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
@@ -152,6 +160,79 @@ TOKEN_RESPONSES = {
}
},
},
+ v3_UUID_TOKEN_DEFAULT: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'project': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'roles': [
+ {'name': 'role1', 'id': 'Role1'},
+ {'name': 'role2', 'id': 'Role2'},
+ ],
+ 'catalog': {}
+ }
+ },
+ v3_UUID_TOKEN_UNSCOPED: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ }
+ }
+ },
+ v3_UUID_TOKEN_DOMAIN_SCOPED: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1',
+ },
+ 'roles': [
+ {'name': 'role1', 'id': 'Role1'},
+ {'name': 'role2', 'id': 'Role2'},
+ ],
+ 'catalog': {}
+ }
+ }
+}
+
+EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
+ 'HTTP_X_TENANT_ID': 'tenant_id1',
+ 'HTTP_X_TENANT_NAME': 'tenant_name1',
+ 'HTTP_X_USER_ID': 'user_id1',
+ 'HTTP_X_USER_NAME': 'user_name1',
+ 'HTTP_X_ROLES': 'role1,role2',
+ 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
+ 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
+ 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
FAKE_RESPONSE_STACK = []
@@ -166,18 +247,25 @@ def setUpModule(self):
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
+ with open(os.path.join(signing_path, 'auth_v3_token_scoped.pem')) as f:
+ self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
+ with open(os.path.join(signing_path, 'auth_v3_token_revoked.pem')) as f:
+ self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
+ self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_v3_TOKEN)
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
self.REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
{'signed': f.read()})
- self.SIGNED_TOKEN_SCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
- self.SIGNED_TOKEN_UNSCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
+ self.SIGNED_TOKEN_SCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED))
+ self.SIGNED_TOKEN_UNSCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED))
+ self.SIGNED_v3_TOKEN_SCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_v3_TOKEN_SCOPED))
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
'access': {
@@ -213,23 +301,80 @@ def setUpModule(self):
},
},
+ self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY] = {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'project': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'}
+ ],
+ 'catalog': {}
+ }
+ }
+
+VERSION_LIST_v3 = {
+ "versions": {
+ "values": [
+ {
+ "id": "v3.0",
+ "status": "stable",
+ "updated": "2013-03-06T00:00:00Z",
+ "links": []
+ },
+ {
+ "id": "v2.0",
+ "status": "beta",
+ "updated": "2011-11-19T00:00:00Z",
+ "links": []
+ }
+ ]
+ }
+}
+
+VERSION_LIST_v2 = {
+ "versions": {
+ "values": [
+ {
+ "id": "v2.0",
+ "status": "beta",
+ "updated": "2011-11-19T00:00:00Z",
+ "links": []
+ }
+ ]
+ }
+}
+
class FakeMemcache(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
+ self.token_key = SIGNED_TOKEN_SCOPED_KEY
def get(self, key):
- data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
- if not data or key != "tokens/%s" % (data['access']['token']['id']):
+ data = TOKEN_RESPONSES[self.token_key].copy()
+ if not data or key != "tokens/%s" % self.token_key:
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
- dt = datetime.datetime.now() + datetime.timedelta(hours=24)
- ks_expires = dt.isoformat()
- data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, time=None):
@@ -237,22 +382,26 @@ class FakeMemcache(object):
self.set_key = key
+class v3FakeMemcache(FakeMemcache):
+ def __init__(self):
+ super(v3FakeMemcache, self).__init__()
+ self.token_key = SIGNED_v3_TOKEN_SCOPED_KEY
+
+
class FakeSwiftMemcacheRing(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
+ self.token_key = SIGNED_TOKEN_SCOPED_KEY
def get(self, key):
- data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
- if not data or key != "tokens/%s" % (data['access']['token']['id']):
+ data = TOKEN_RESPONSES[self.token_key].copy()
+ if not data or key != "tokens/%s" % self.token_key:
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
- dt = datetime.datetime.now() + datetime.timedelta(hours=24)
- ks_expires = dt.isoformat()
- data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, serialize=True, timeout=0):
@@ -260,6 +409,12 @@ class FakeSwiftMemcacheRing(object):
self.set_key = key
+class v3FakeSwiftMemcacheRing(FakeSwiftMemcacheRing):
+ def __init__(self):
+ super(v3FakeSwiftMemcacheRing, self).__init__()
+ self.token_key = SIGNED_v3_TOKEN_SCOPED_KEY
+
+
class FakeHTTPResponse(object):
def __init__(self, status, body):
self.status = status
@@ -269,86 +424,129 @@ class FakeHTTPResponse(object):
return self.body
-class FakeStackHTTPConnection(object):
-
- def __init__(self, *args, **kwargs):
- pass
+class BaseFakeHTTPConnection(object):
- def getresponse(self):
- if len(FAKE_RESPONSE_STACK):
- return FAKE_RESPONSE_STACK.pop()
- return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
+ def _user_token_responses(self, token_id):
+ """ Emulate user token responses.
- def request(self, *_args, **_kwargs):
- pass
-
- def close(self):
- pass
+ Return success if the token is in the list we know
+ about. If the request is for revoked tokens, then return
+ the revoked list, else if a different token is provided,
+ return 404 indicating an unknown (therefore unauthorized) token.
+ """
+ if token_id in TOKEN_RESPONSES.keys():
+ status = 200
+ body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
+ elif token_id == "revoked":
+ status = 200
+ body = SIGNED_REVOCATION_LIST
+ else:
+ status = 404
+ body = str()
+ return status, body
+
+ def fake_v2_responses(self, path):
+ token_id = path.rsplit('/', 1)[1]
+ return self._user_token_responses(token_id)
+
+ def fake_v3_responses(self, path, **kwargs):
+ headers = kwargs.get('headers')
+ token_id = headers['X-Subject-Token']
+ return self._user_token_responses(token_id)
+
+ def fake_v2_admin_token(self, path):
+ status = 200
+ body = jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2',
+ 'expires': '2012-10-03T16:58:01Z'}
+ },
+ })
+ return status, body
-class FakeHTTPConnection(object):
- last_requested_url = ''
+class FakeHTTPConnection(BaseFakeHTTPConnection):
+ """ Emulate a fake Keystone v2 server """
def __init__(self, *args, **kwargs):
self.send_valid_revocation_list = True
+ self.resp = None
def request(self, method, path, **kwargs):
"""Fakes out several http responses.
- If a POST request is made, we assume the calling code is trying
- to get a new admin token.
+ Support the following requests:
- If a GET request is made to validate a token, return success
- if the token is 'token1'. If a different token is provided, return
- a 404, indicating an unknown (therefore unauthorized) token.
+ - Create admin token ('POST /testadmin/v2.0/tokens')
+ - Get versions ('GET /testadmin/')
+ - Get v2 user token responses (see fake_v2_responses)
"""
FakeHTTPConnection.last_requested_url = path
- if method == 'POST':
- status = 200
- body = jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- })
-
+ if method == 'POST' and path == '/testadmin/v2.0/tokens':
+ status, body = self.fake_v2_admin_token(path)
else:
- token_id = path.rsplit('/', 1)[1]
- if token_id in TOKEN_RESPONSES.keys():
- status = 200
- body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
- elif token_id == "revoked":
- status = 200
- body = SIGNED_REVOCATION_LIST
+ if path == '/testadmin/':
+ # It's a GET versions call
+ status = 300
+ body = jsonutils.dumps(VERSION_LIST_v2)
else:
- status = 404
- body = str()
+ status, body = self.fake_v2_responses(path)
self.resp = FakeHTTPResponse(status, body)
def getresponse(self):
- return self.resp
+ # If self.resp is set then this is just the response to
+ # the earlier request. If it is not set, then we expect
+ # a stack of responses to have been pre-prepared
+ if self.resp:
+ return self.resp
+ else:
+ if len(FAKE_RESPONSE_STACK):
+ return FAKE_RESPONSE_STACK.pop()
+ return FakeHTTPResponse(
+ 500, jsonutils.dumps('UNEXPECTED RESPONSE'))
def close(self):
pass
+class v3FakeHTTPConnection(FakeHTTPConnection):
+ """ Emulate a fake Keystone v3 server """
+
+ def request(self, method, path, **kwargs):
+ """Fakes out several http responses.
+
+ Support the following requests:
+
+ - Create admin token ('POST /testadmin/v2.0/tokens')
+ - Get versions ('GET /testadmin/')
+ - Get v2 user token responses (see fake_v2_responses)
+ - Get v3 user token responses (see fake_v3_responses)
+
+ """
+ v3FakeHTTPConnection.last_requested_url = path
+ if method == 'POST' and path == '/testadmin/v2.0/tokens':
+ status, body = self.fake_v2_admin_token(path)
+ else:
+ if path == '/testadmin/':
+ # It's a GET versions call
+ status = 300
+ body = jsonutils.dumps(VERSION_LIST_v3)
+ elif path.split('/')[2] == 'v2.0':
+ status, body = self.fake_v2_responses(path)
+ else:
+ status, body = self.fake_v3_responses(path, **kwargs)
+
+ self.resp = FakeHTTPResponse(status, body)
+
+
class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
- self.expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_name1',
- 'HTTP_X_USER_ID': 'user_id1',
- 'HTTP_X_USER_NAME': 'user_name1',
- 'HTTP_X_ROLES': 'role1,role2',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
- }
+ self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.expected_env.update(expected_env)
def __call__(self, env, start_response):
@@ -360,36 +558,121 @@ class FakeApp(object):
return resp(env, start_response)
+class v3FakeApp(object):
+ """This represents a v3 WSGI app protected by the auth_token middleware."""
+ def __init__(self, expected_env=None):
+ expected_env = expected_env or {}
+ # We should always get back the same v2 items
+ self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
+ # ...and with v3 additions, these are for the DEFAULT TOKEN
+ v3_default_env_additions = {
+ 'HTTP_X_PROJECT_ID': 'tenant_id1',
+ 'HTTP_X_PROJECT_NAME': 'tenant_name1',
+ 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
+ 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
+ }
+ self.expected_env.update(v3_default_env_additions)
+ # And finally update for anything passed in
+ self.expected_env.update(expected_env)
+
+ def __call__(self, env, start_response):
+ for k, v in self.expected_env.items():
+ assert env[k] == v, '%s != %s' % (env[k], v)
+ resp = webob.Response()
+ resp.body = 'SUCCESS'
+ return resp(env, start_response)
+
+
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
+ """ Base test class for auth_token middleware.
+
+ All the tests allow for running with auth_token
+ configured for receiving v2 or v3 tokens, with the
+ choice being made by passing configuration data into
+ Setup().
- def setUp(self, expected_env=None):
+ The base class will, by default, run all the tests
+ expecting v2 token formats. Child classes can override
+ this to specify, for instance, v3 format.
+
+ """
+ def setUp(self, expected_env=None, auth_version=None,
+ fake_app=None, fake_http=None, token_dict=None,
+ fake_memcache=None, fake_memcache_ring=None):
testtools.TestCase.setUp(self)
expected_env = expected_env or {}
- conf = {
- 'admin_token': 'admin_token1',
+ if token_dict:
+ self.token_dict = token_dict
+ else:
+ self.token_dict = {
+ 'uuid_token_default': UUID_TOKEN_DEFAULT,
+ 'uuid_token_unscoped': UUID_TOKEN_UNSCOPED,
+ 'signed_token_scoped': SIGNED_TOKEN_SCOPED,
+ 'revoked_token': REVOKED_TOKEN,
+ 'revoked_token_hash': REVOKED_TOKEN_HASH
+ }
+
+ self.conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
+ 'auth_version': auth_version
}
- self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
- self.middleware.http_client_class = FakeHTTPConnection
- self.middleware._iso8601 = iso8601
+ # Base assumes v2 for fake app and http, can be overridden for
+ # child classes by called set_middleware() directly
+ self.fake_app = fake_app or FakeApp
+ self.fake_http = fake_http or FakeHTTPConnection
+ self.set_middleware(self.fake_app, self.fake_http,
+ expected_env, self.conf)
self.response_status = None
self.response_headers = None
+
+ self.fake_memcache = fake_memcache or FakeMemcache
+ self.fake_memcache_ring = fake_memcache_ring or FakeSwiftMemcacheRing
+
+ signed_list = 'SIGNED_REVOCATION_LIST'
+ valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
+ globals()[signed_list] = globals()[valid_signed_list]
+
+ def set_fake_http(self, http_handler):
+ """ Configure the http handler for the auth_token middleware.
+
+ Allows tests to override the default handler on specific tests,
+ e.g. to use v2 for those parts of auth_token that still use v2
+ tokens while running the v3 test class, i.e. getting an admin
+ token or revocation list.
+
+ """
+ self.middleware.http_client_class = http_handler
+
+ def set_middleware(self, fake_app=None, fake_http=None,
+ expected_env=None, conf=None):
+ """ Configure the class ready to call the auth_token middleware.
+
+ Set up the various fake items needed to run the middleware.
+ Individual tests that need to further refine these can call this
+ function to override the class defaults.
+
+ """
+ conf = conf or self.conf
+ if 'http_handler' not in conf:
+ fake_http = fake_http or self.fake_http
+ conf['http_handler'] = fake_http
+ fake_app = fake_app or self.fake_app
+ self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
+ self.middleware._iso8601 = iso8601
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
cache_timeout = datetime.timedelta(days=1)
self.middleware.token_revocation_list_cache_timeout = cache_timeout
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- signed_list = 'SIGNED_REVOCATION_LIST'
- valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
- globals()[signed_list] = globals()[valid_signed_list]
-
def tearDown(self):
testtools.TestCase.tearDown(self)
try:
@@ -449,9 +732,8 @@ class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""
- def setUp(self, expected_env=None):
- super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
- self.middleware.http_client_class = FakeStackHTTPConnection
+ def setUp(self):
+ super(StackResponseAuthTokenMiddlewareTest, self).setUp()
def test_fetch_revocation_list_with_expire(self):
# first response to revocation list should return 401 Unauthorized
@@ -493,7 +775,8 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
# now deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_id1',
}
- super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
+ super(DiabloAuthTokenMiddlewareTest, self).setUp(
+ expected_env=expected_env)
def test_valid_diablo_response(self):
req = webob.Request.blank('/')
@@ -504,72 +787,47 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
- def assert_valid_request_200(self, token):
+
+ def assert_valid_last_url(self, token_id):
+ # Default version (v2) has id in the token, override this
+ # method for v3 and other versions
+ self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
+ self.middleware.http_client_class.last_requested_url)
+
+ def assert_valid_request_200(self, token, with_catalog=True):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
- self.assertTrue(req.headers.get('X-Service-Catalog'))
+ if with_catalog:
+ self.assertTrue(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_valid_uuid_request(self):
- self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
- self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
- FakeHTTPConnection.last_requested_url)
+ self.assert_valid_request_200(self.token_dict['uuid_token_default'])
+ self.assert_valid_last_url(self.token_dict['uuid_token_default'])
def test_valid_signed_request(self):
- FakeHTTPConnection.last_requested_url = ''
- self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
+ self.middleware.http_client_class.last_requested_url = ''
+ self.assert_valid_request_200(
+ self.token_dict['signed_token_scoped'])
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
"/testadmin")
#ensure that signed requests do not generate HTTP traffic
- self.assertEqual('', FakeHTTPConnection.last_requested_url)
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped requests with a default tenant should "auto-scope."
-
- The implied scope is the user's tenant ID.
-
- """
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, ['SUCCESS'])
- self.assertTrue('keystone.token_info' in req.environ)
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
+ self.assertEqual(
+ '', self.middleware.http_client_class.last_requested_url)
def test_revoked_token_receives_401(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = REVOKED_TOKEN
+ req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
def get_revocation_list_json(self, token_ids=None):
if token_ids is None:
- token_ids = [REVOKED_TOKEN_HASH]
+ token_ids = [self.token_dict['revoked_token_hash']]
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
for x in token_ids]}
return jsonutils.dumps(revocation_list)
@@ -578,22 +836,26 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
#explicitly setting an empty revocation list here to document intent
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ result = self.middleware.is_signed_token_revoked(
+ self.token_dict['revoked_token'])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ result = self.middleware.is_signed_token_revoked(
+ self.token_dict['revoked_token'])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token, REVOKED_TOKEN)
+ self.middleware.verify_signed_token,
+ self.token_dict['revoked_token'])
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
- self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
+ self.middleware.verify_signed_token(
+ self.token_dict['signed_token_scoped'])
def test_cert_file_missing(self):
self.assertFalse(self.middleware.cert_file_missing(
@@ -622,6 +884,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
expected)
def test_get_revocation_list_returns_fetched_list(self):
+ # auth_token uses v2 to fetch this, so don't allow the v3
+ # tests to override the fake http connection
+ self.set_fake_http(FakeHTTPConnection)
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
@@ -642,6 +907,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware.fetch_revocation_list)
def test_fetch_revocation_list(self):
+ # auth_token uses v2 to fetch this, so don't allow the v3
+ # tests to override the fake http connection
+ self.set_fake_http(FakeHTTPConnection)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
@@ -651,7 +919,7 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_invalid_signed_token(self):
req = webob.Request.blank('/')
@@ -659,14 +927,14 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token(self):
req = webob.Request.blank('/')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token_log_message(self):
class FakeLog(object):
@@ -693,12 +961,13 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_memcache(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, None)
@@ -706,15 +975,16 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_memcache_set_invalid(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware._cache = FakeMemcache()
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, "invalid")
def test_memcache_set_expired(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
@@ -723,8 +993,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_swift_memcache_set_expired(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeSwiftMemcacheRing()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache_ring()
self.middleware._use_keystone_cache = False
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
@@ -735,48 +1006,38 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.disable_module('memcache')
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
+ 'auth_admin_prefix': '/testadmin',
+ 'memcache_servers': 'localhost:11211'
}
-
- auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
def test_use_cache_from_env(self):
env = {'swift.cache': 'CACHE_TEST'}
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'cache': 'swift.cache',
- 'memcache_servers': 'localhost:11211',
+ 'memcache_servers': 'localhost:11211'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- auth._init_cache(env)
- self.assertEqual(auth._cache, 'CACHE_TEST')
+ self.set_middleware(conf=conf)
+ self.middleware._init_cache(env)
+ self.assertEqual(self.middleware._cache, 'CACHE_TEST')
def test_not_use_cache_from_env(self):
self.disable_module('memcache')
env = {'swift.cache': 'CACHE_TEST'}
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
+ 'auth_admin_prefix': '/testadmin',
+ 'memcache_servers': 'localhost:11211'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- auth._init_cache(env)
- self.assertEqual(auth._cache, None)
-
- def test_request_prevent_service_catalog_injection(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
- req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertFalse(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, ['SUCCESS'])
+ self.set_middleware(conf=conf)
+ self.middleware._init_cache(env)
+ self.assertEqual(self.middleware._cache, None)
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
@@ -788,152 +1049,328 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_encrypt_cache_data(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'encrypt',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- encrypted_data = \
- auth._protect_cache_value('token',
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
+ self.set_middleware(conf=conf)
+ encrypted_data = self.middleware._protect_cache_value(
+ 'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
self.assertEqual(
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
- auth._unprotect_cache_value('token', encrypted_data))
+ TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
+ self.middleware._unprotect_cache_value('token', encrypted_data))
# should return None if unable to decrypt
self.assertIsNone(
- auth._unprotect_cache_value('token', '{ENCRYPT:AES256}corrupted'))
+ self.middleware._unprotect_cache_value(
+ 'token', '{ENCRYPT:AES256}corrupted'))
self.assertIsNone(
- auth._unprotect_cache_value('mykey', encrypted_data))
+ self.middleware._unprotect_cache_value('mykey', encrypted_data))
def test_sign_cache_data(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- signed_data = \
- auth._protect_cache_value('mykey',
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
+ self.set_middleware(conf=conf)
+ signed_data = self.middleware._protect_cache_value(
+ 'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
expected = '{MAC:SHA1}'
self.assertEqual(
signed_data[:10],
expected)
self.assertEqual(
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
- auth._unprotect_cache_value('mykey', signed_data))
+ TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
+ self.middleware._unprotect_cache_value('mykey', signed_data))
# should return None on corrupted data
self.assertIsNone(
- auth._unprotect_cache_value('mykey', '{MAC:SHA1}corrupted'))
+ self.middleware._unprotect_cache_value('mykey',
+ '{MAC:SHA1}corrupted'))
def test_no_memcache_protection(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- data = auth._protect_cache_value('mykey', 'This is a test!')
+ self.set_middleware(conf=conf)
+ data = self.middleware._protect_cache_value('mykey',
+ 'This is a test!')
self.assertEqual(data, 'This is a test!')
self.assertEqual(
'This is a test!',
- auth._unprotect_cache_value('mykey', data))
+ self.middleware._unprotect_cache_value('mykey', data))
def test_get_cache_key(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
self.assertEqual(
'tokens/mytoken',
- auth._get_cache_key('mytoken'))
+ self.middleware._get_cache_key('mytoken'))
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
- self.assertEqual(auth._get_cache_key('mytoken'), expected)
+ self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
- 'memcache_secret_key': 'abc!',
+ 'memcache_secret_key': 'abc!'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
- self.assertEqual(auth._get_cache_key('mytoken'), expected)
+ self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'Encrypt',
+ 'memcache_security_strategy': 'Encrypt'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
# test invalue memcache_security_strategy
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'whatever',
+ 'memcache_security_strategy': 'whatever'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
# test missing memcache_secret_key
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'mac',
+ 'memcache_security_strategy': 'mac'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': ''
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mAc',
'memcache_secret_key': ''
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
+
+
+class v2AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
+ """ v2 token specific tests.
+
+ There are some differences between how the auth-token middleware handles
+ v2 and v3 tokens over and above the token formats, namely:
+
+ - A v3 keystone server will auto scope a token to a user's default project
+ if no scope is specified. A v2 server assumes that the auth-token
+ middleware will do that.
+ - A v2 keystone server may issue a token without a catalog, even with a
+ tenant
+
+ The tests below were originally part of the generic AuthTokenMiddlewareTest
+ class, but now, since they really are v2 specifc, they are included here.
+
+ """
+ def assert_unscoped_default_tenant_auto_scopes(self, token):
+ """Unscoped v2 requests with a default tenant should "auto-scope."
+
+ The implied scope is the user's tenant ID.
+
+ """
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertEqual(body, ['SUCCESS'])
+ self.assertTrue('keystone.token_info' in req.environ)
+
+ def test_default_tenant_uuid_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
+
+ def test_default_tenant_signed_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
+
+ def assert_unscoped_token_receives_401(self, token):
+ """Unscoped requests with no default tenant ID should be rejected."""
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ "Keystone uri='https://keystone.example.com:1234'")
+
+ def test_unscoped_uuid_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
+
+ def test_unscoped_pki_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
+
+ def test_request_prevent_service_catalog_injection(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Service-Catalog'] = '[]'
+ req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertFalse(req.headers.get('X-Service-Catalog'))
+ self.assertEqual(body, ['SUCCESS'])
+
+ def test_valid_uuid_request_forced_to_2_0(self):
+ """ Test forcing auth_token to use lower api version.
+
+ By installing the v3 http hander, auth_token will be get
+ a version list that looks like a v3 server - from which it
+ would normally chose v3.0 as the auth version. However, here
+ we specify v2.0 in the configuration - which should force
+ auth_token to use that version instead.
+
+ """
+ conf = {
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
+ 'signing_dir': CERTDIR,
+ 'auth_version': 'v2.0'
+ }
+ self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
+ # This tests will only work is auth_token has chosen to use the
+ # lower, v2, api version
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = UUID_TOKEN_DEFAULT
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_invalid_auth_version_request(self):
+ conf = {
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
+ 'signing_dir': CERTDIR,
+ 'auth_version': 'v1.0' # v1.0 is no longer supported
+ }
+ self.assertRaises(Exception, self.set_middleware, conf)
+
+
+class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
+ """ Test auth_token middleware with v3 tokens.
+
+ Re-execute the AuthTokenMiddlewareTest class tests, but with the
+ the auth_token middleware configured to expect v3 tokens back from
+ a keystone server.
+
+ This is done by configuring the AuthTokenMiddlewareTest class via
+ its Setup(), passing in v3 style data that will then be used by
+ the tests themselves. This approach has been used to ensure we
+ really are running the same tests for both v2 and v3 tokens.
+
+ There a few additional specific test for v3 only:
+
+ - We allow an unscoped token to be validated (as unscoped), where
+ as for v2 tokens, the auth_token middleware is expected to try and
+ auto-scope it (and fail if there is no default tenant)
+ - Domain scoped tokens
+
+ Since we don't specify an auth version for auth_token to use, by
+ definition we are thefore implicitely testing that it will use
+ the highest available auth version, i.e. v3.0
+
+ """
+ def setUp(self):
+ token_dict = {
+ 'uuid_token_default': v3_UUID_TOKEN_DEFAULT,
+ 'uuid_token_unscoped': v3_UUID_TOKEN_UNSCOPED,
+ 'signed_token_scoped': SIGNED_v3_TOKEN_SCOPED,
+ 'revoked_token': REVOKED_v3_TOKEN,
+ 'revoked_token_hash': REVOKED_v3_TOKEN_HASH
+ }
+ super(v3AuthTokenMiddlewareTest, self).setUp(
+ fake_app=v3FakeApp,
+ fake_http=v3FakeHTTPConnection,
+ token_dict=token_dict,
+ fake_memcache=v3FakeMemcache,
+ fake_memcache_ring=v3FakeSwiftMemcacheRing)
+
+ def assert_valid_last_url(self, token_id):
+ # Token ID is not part of the url in v3, so override
+ # this assert test in the base class
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_valid_unscoped_uuid_request(self):
+ # Remove items that won't be in an unscoped token
+ delta_expected_env = {
+ 'HTTP_X_PROJECT_ID': None,
+ 'HTTP_X_PROJECT_NAME': None,
+ 'HTTP_X_PROJECT_DOMAIN_ID': None,
+ 'HTTP_X_PROJECT_DOMAIN_NAME': None,
+ 'HTTP_X_TENANT_ID': None,
+ 'HTTP_X_TENANT_NAME': None,
+ 'HTTP_X_ROLES': '',
+ 'HTTP_X_TENANT': None,
+ 'HTTP_X_ROLE': '',
+ }
+ self.set_middleware(expected_env=delta_expected_env)
+ self.assert_valid_request_200(v3_UUID_TOKEN_UNSCOPED,
+ with_catalog=False)
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_domain_scoped_uuid_request(self):
+ # Modify items comapred to default token for a domain scope
+ delta_expected_env = {
+ 'HTTP_X_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_DOMAIN_NAME': 'domain_name1',
+ 'HTTP_X_PROJECT_ID': None,
+ 'HTTP_X_PROJECT_NAME': None,
+ 'HTTP_X_PROJECT_DOMAIN_ID': None,
+ 'HTTP_X_PROJECT_DOMAIN_NAME': None,
+ 'HTTP_X_TENANT_ID': None,
+ 'HTTP_X_TENANT_NAME': None,
+ 'HTTP_X_TENANT': None
+ }
+ self.set_middleware(expected_env=delta_expected_env)
+ self.assert_valid_request_200(v3_UUID_TOKEN_DOMAIN_SCOPED)
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
class TokenEncodingTest(testtools.TestCase):