summaryrefslogtreecommitdiff
path: root/tests/test_auth_token_middleware.py
diff options
context:
space:
mode:
authorHenry Nash <henryn@linux.vnet.ibm.com>2013-03-04 05:05:15 +0000
committerHenry Nash <henryn@linux.vnet.ibm.com>2013-03-11 11:50:09 +0000
commitd782a998474d92d4299b4404b69442f0288efc3b (patch)
tree4ada0437add2a199329f77b0043cd4094f20b4a1 /tests/test_auth_token_middleware.py
parentae36809fdecdedd09abf23eeadf2374c77b8a8df (diff)
downloadpython-keystoneclient-d782a998474d92d4299b4404b69442f0288efc3b.tar.gz
Fix auth-token middleware to understand v3 tokens
Now that the Identity server supports v3 tokens, the auth_token middleware should permit the in-line validation of such a token. This essentially means just setting any new environment items that correspond to the new attributes that may be in a v3 token (such as domains), as well as allowing for the slight format differences. Most of the work in this change is actually in the unit tests, where it was important to try and enable the existing tests to be run against an auth_token middleware configured for both v2 and v3. This meant restructing the test class so that the token format is separated from the individual tests and is initialized by the class Setup(). Since there are some new signed token formats included in this testing, a new set of the signed tokens was generated. Fixes Bug #1132390 Change-Id: I78b232d30f5310c39089fbbc8e56c23df291f89f
Diffstat (limited to 'tests/test_auth_token_middleware.py')
-rw-r--r--tests/test_auth_token_middleware.py863
1 files changed, 650 insertions, 213 deletions
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py
index 58be41d..cc730a7 100644
--- a/tests/test_auth_token_middleware.py
+++ b/tests/test_auth_token_middleware.py
@@ -43,11 +43,16 @@ CA = os.path.join(CERTDIR, 'ca.pem')
REVOCATION_LIST = None
REVOKED_TOKEN = None
REVOKED_TOKEN_HASH = None
+REVOKED_v3_TOKEN = None
+REVOKED_v3_TOKEN_HASH = None
SIGNED_REVOCATION_LIST = None
SIGNED_TOKEN_SCOPED = None
SIGNED_TOKEN_UNSCOPED = None
+SIGNED_v3_TOKEN_SCOPED = None
+SIGNED_v3_TOKEN_UNSCOPED = None
SIGNED_TOKEN_SCOPED_KEY = None
SIGNED_TOKEN_UNSCOPED_KEY = None
+SIGNED_v3_TOKEN_SCOPED_KEY = None
VALID_SIGNED_REVOCATION_LIST = None
@@ -55,6 +60,9 @@ UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
+v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
+v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
+v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
INVALID_SIGNED_TOKEN = string.replace(
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
@@ -152,6 +160,79 @@ TOKEN_RESPONSES = {
}
},
},
+ v3_UUID_TOKEN_DEFAULT: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'project': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'roles': [
+ {'name': 'role1', 'id': 'Role1'},
+ {'name': 'role2', 'id': 'Role2'},
+ ],
+ 'catalog': {}
+ }
+ },
+ v3_UUID_TOKEN_UNSCOPED: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ }
+ }
+ },
+ v3_UUID_TOKEN_DOMAIN_SCOPED: {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1',
+ },
+ 'roles': [
+ {'name': 'role1', 'id': 'Role1'},
+ {'name': 'role2', 'id': 'Role2'},
+ ],
+ 'catalog': {}
+ }
+ }
+}
+
+EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
+ 'HTTP_X_TENANT_ID': 'tenant_id1',
+ 'HTTP_X_TENANT_NAME': 'tenant_name1',
+ 'HTTP_X_USER_ID': 'user_id1',
+ 'HTTP_X_USER_NAME': 'user_name1',
+ 'HTTP_X_ROLES': 'role1,role2',
+ 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
+ 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
+ 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
FAKE_RESPONSE_STACK = []
@@ -166,18 +247,25 @@ def setUpModule(self):
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
+ with open(os.path.join(signing_path, 'auth_v3_token_scoped.pem')) as f:
+ self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
+ with open(os.path.join(signing_path, 'auth_v3_token_revoked.pem')) as f:
+ self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
+ self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_v3_TOKEN)
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
self.REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
{'signed': f.read()})
- self.SIGNED_TOKEN_SCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
- self.SIGNED_TOKEN_UNSCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
+ self.SIGNED_TOKEN_SCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED))
+ self.SIGNED_TOKEN_UNSCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED))
+ self.SIGNED_v3_TOKEN_SCOPED_KEY = (
+ cms.cms_hash_token(self.SIGNED_v3_TOKEN_SCOPED))
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
'access': {
@@ -213,23 +301,80 @@ def setUpModule(self):
},
},
+ self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY] = {
+ 'token': {
+ 'expires': '2999-01-01T00:00:10Z',
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'project': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ 'domain': {
+ 'id': 'domain_id1',
+ 'name': 'domain_name1'
+ }
+ },
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'}
+ ],
+ 'catalog': {}
+ }
+ }
+
+VERSION_LIST_v3 = {
+ "versions": {
+ "values": [
+ {
+ "id": "v3.0",
+ "status": "stable",
+ "updated": "2013-03-06T00:00:00Z",
+ "links": []
+ },
+ {
+ "id": "v2.0",
+ "status": "beta",
+ "updated": "2011-11-19T00:00:00Z",
+ "links": []
+ }
+ ]
+ }
+}
+
+VERSION_LIST_v2 = {
+ "versions": {
+ "values": [
+ {
+ "id": "v2.0",
+ "status": "beta",
+ "updated": "2011-11-19T00:00:00Z",
+ "links": []
+ }
+ ]
+ }
+}
+
class FakeMemcache(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
+ self.token_key = SIGNED_TOKEN_SCOPED_KEY
def get(self, key):
- data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
- if not data or key != "tokens/%s" % (data['access']['token']['id']):
+ data = TOKEN_RESPONSES[self.token_key].copy()
+ if not data or key != "tokens/%s" % self.token_key:
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
- dt = datetime.datetime.now() + datetime.timedelta(hours=24)
- ks_expires = dt.isoformat()
- data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, time=None):
@@ -237,22 +382,26 @@ class FakeMemcache(object):
self.set_key = key
+class v3FakeMemcache(FakeMemcache):
+ def __init__(self):
+ super(v3FakeMemcache, self).__init__()
+ self.token_key = SIGNED_v3_TOKEN_SCOPED_KEY
+
+
class FakeSwiftMemcacheRing(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
+ self.token_key = SIGNED_TOKEN_SCOPED_KEY
def get(self, key):
- data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
- if not data or key != "tokens/%s" % (data['access']['token']['id']):
+ data = TOKEN_RESPONSES[self.token_key].copy()
+ if not data or key != "tokens/%s" % self.token_key:
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
- dt = datetime.datetime.now() + datetime.timedelta(hours=24)
- ks_expires = dt.isoformat()
- data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, serialize=True, timeout=0):
@@ -260,6 +409,12 @@ class FakeSwiftMemcacheRing(object):
self.set_key = key
+class v3FakeSwiftMemcacheRing(FakeSwiftMemcacheRing):
+ def __init__(self):
+ super(v3FakeSwiftMemcacheRing, self).__init__()
+ self.token_key = SIGNED_v3_TOKEN_SCOPED_KEY
+
+
class FakeHTTPResponse(object):
def __init__(self, status, body):
self.status = status
@@ -269,86 +424,129 @@ class FakeHTTPResponse(object):
return self.body
-class FakeStackHTTPConnection(object):
-
- def __init__(self, *args, **kwargs):
- pass
+class BaseFakeHTTPConnection(object):
- def getresponse(self):
- if len(FAKE_RESPONSE_STACK):
- return FAKE_RESPONSE_STACK.pop()
- return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
+ def _user_token_responses(self, token_id):
+ """ Emulate user token responses.
- def request(self, *_args, **_kwargs):
- pass
-
- def close(self):
- pass
+ Return success if the token is in the list we know
+ about. If the request is for revoked tokens, then return
+ the revoked list, else if a different token is provided,
+ return 404 indicating an unknown (therefore unauthorized) token.
+ """
+ if token_id in TOKEN_RESPONSES.keys():
+ status = 200
+ body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
+ elif token_id == "revoked":
+ status = 200
+ body = SIGNED_REVOCATION_LIST
+ else:
+ status = 404
+ body = str()
+ return status, body
+
+ def fake_v2_responses(self, path):
+ token_id = path.rsplit('/', 1)[1]
+ return self._user_token_responses(token_id)
+
+ def fake_v3_responses(self, path, **kwargs):
+ headers = kwargs.get('headers')
+ token_id = headers['X-Subject-Token']
+ return self._user_token_responses(token_id)
+
+ def fake_v2_admin_token(self, path):
+ status = 200
+ body = jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2',
+ 'expires': '2012-10-03T16:58:01Z'}
+ },
+ })
+ return status, body
-class FakeHTTPConnection(object):
- last_requested_url = ''
+class FakeHTTPConnection(BaseFakeHTTPConnection):
+ """ Emulate a fake Keystone v2 server """
def __init__(self, *args, **kwargs):
self.send_valid_revocation_list = True
+ self.resp = None
def request(self, method, path, **kwargs):
"""Fakes out several http responses.
- If a POST request is made, we assume the calling code is trying
- to get a new admin token.
+ Support the following requests:
- If a GET request is made to validate a token, return success
- if the token is 'token1'. If a different token is provided, return
- a 404, indicating an unknown (therefore unauthorized) token.
+ - Create admin token ('POST /testadmin/v2.0/tokens')
+ - Get versions ('GET /testadmin/')
+ - Get v2 user token responses (see fake_v2_responses)
"""
FakeHTTPConnection.last_requested_url = path
- if method == 'POST':
- status = 200
- body = jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- })
-
+ if method == 'POST' and path == '/testadmin/v2.0/tokens':
+ status, body = self.fake_v2_admin_token(path)
else:
- token_id = path.rsplit('/', 1)[1]
- if token_id in TOKEN_RESPONSES.keys():
- status = 200
- body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
- elif token_id == "revoked":
- status = 200
- body = SIGNED_REVOCATION_LIST
+ if path == '/testadmin/':
+ # It's a GET versions call
+ status = 300
+ body = jsonutils.dumps(VERSION_LIST_v2)
else:
- status = 404
- body = str()
+ status, body = self.fake_v2_responses(path)
self.resp = FakeHTTPResponse(status, body)
def getresponse(self):
- return self.resp
+ # If self.resp is set then this is just the response to
+ # the earlier request. If it is not set, then we expect
+ # a stack of responses to have been pre-prepared
+ if self.resp:
+ return self.resp
+ else:
+ if len(FAKE_RESPONSE_STACK):
+ return FAKE_RESPONSE_STACK.pop()
+ return FakeHTTPResponse(
+ 500, jsonutils.dumps('UNEXPECTED RESPONSE'))
def close(self):
pass
+class v3FakeHTTPConnection(FakeHTTPConnection):
+ """ Emulate a fake Keystone v3 server """
+
+ def request(self, method, path, **kwargs):
+ """Fakes out several http responses.
+
+ Support the following requests:
+
+ - Create admin token ('POST /testadmin/v2.0/tokens')
+ - Get versions ('GET /testadmin/')
+ - Get v2 user token responses (see fake_v2_responses)
+ - Get v3 user token responses (see fake_v3_responses)
+
+ """
+ v3FakeHTTPConnection.last_requested_url = path
+ if method == 'POST' and path == '/testadmin/v2.0/tokens':
+ status, body = self.fake_v2_admin_token(path)
+ else:
+ if path == '/testadmin/':
+ # It's a GET versions call
+ status = 300
+ body = jsonutils.dumps(VERSION_LIST_v3)
+ elif path.split('/')[2] == 'v2.0':
+ status, body = self.fake_v2_responses(path)
+ else:
+ status, body = self.fake_v3_responses(path, **kwargs)
+
+ self.resp = FakeHTTPResponse(status, body)
+
+
class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
- self.expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_name1',
- 'HTTP_X_USER_ID': 'user_id1',
- 'HTTP_X_USER_NAME': 'user_name1',
- 'HTTP_X_ROLES': 'role1,role2',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
- }
+ self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.expected_env.update(expected_env)
def __call__(self, env, start_response):
@@ -360,36 +558,121 @@ class FakeApp(object):
return resp(env, start_response)
+class v3FakeApp(object):
+ """This represents a v3 WSGI app protected by the auth_token middleware."""
+ def __init__(self, expected_env=None):
+ expected_env = expected_env or {}
+ # We should always get back the same v2 items
+ self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
+ # ...and with v3 additions, these are for the DEFAULT TOKEN
+ v3_default_env_additions = {
+ 'HTTP_X_PROJECT_ID': 'tenant_id1',
+ 'HTTP_X_PROJECT_NAME': 'tenant_name1',
+ 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
+ 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
+ }
+ self.expected_env.update(v3_default_env_additions)
+ # And finally update for anything passed in
+ self.expected_env.update(expected_env)
+
+ def __call__(self, env, start_response):
+ for k, v in self.expected_env.items():
+ assert env[k] == v, '%s != %s' % (env[k], v)
+ resp = webob.Response()
+ resp.body = 'SUCCESS'
+ return resp(env, start_response)
+
+
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
+ """ Base test class for auth_token middleware.
+
+ All the tests allow for running with auth_token
+ configured for receiving v2 or v3 tokens, with the
+ choice being made by passing configuration data into
+ Setup().
- def setUp(self, expected_env=None):
+ The base class will, by default, run all the tests
+ expecting v2 token formats. Child classes can override
+ this to specify, for instance, v3 format.
+
+ """
+ def setUp(self, expected_env=None, auth_version=None,
+ fake_app=None, fake_http=None, token_dict=None,
+ fake_memcache=None, fake_memcache_ring=None):
testtools.TestCase.setUp(self)
expected_env = expected_env or {}
- conf = {
- 'admin_token': 'admin_token1',
+ if token_dict:
+ self.token_dict = token_dict
+ else:
+ self.token_dict = {
+ 'uuid_token_default': UUID_TOKEN_DEFAULT,
+ 'uuid_token_unscoped': UUID_TOKEN_UNSCOPED,
+ 'signed_token_scoped': SIGNED_TOKEN_SCOPED,
+ 'revoked_token': REVOKED_TOKEN,
+ 'revoked_token_hash': REVOKED_TOKEN_HASH
+ }
+
+ self.conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
+ 'auth_version': auth_version
}
- self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
- self.middleware.http_client_class = FakeHTTPConnection
- self.middleware._iso8601 = iso8601
+ # Base assumes v2 for fake app and http, can be overridden for
+ # child classes by called set_middleware() directly
+ self.fake_app = fake_app or FakeApp
+ self.fake_http = fake_http or FakeHTTPConnection
+ self.set_middleware(self.fake_app, self.fake_http,
+ expected_env, self.conf)
self.response_status = None
self.response_headers = None
+
+ self.fake_memcache = fake_memcache or FakeMemcache
+ self.fake_memcache_ring = fake_memcache_ring or FakeSwiftMemcacheRing
+
+ signed_list = 'SIGNED_REVOCATION_LIST'
+ valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
+ globals()[signed_list] = globals()[valid_signed_list]
+
+ def set_fake_http(self, http_handler):
+ """ Configure the http handler for the auth_token middleware.
+
+ Allows tests to override the default handler on specific tests,
+ e.g. to use v2 for those parts of auth_token that still use v2
+ tokens while running the v3 test class, i.e. getting an admin
+ token or revocation list.
+
+ """
+ self.middleware.http_client_class = http_handler
+
+ def set_middleware(self, fake_app=None, fake_http=None,
+ expected_env=None, conf=None):
+ """ Configure the class ready to call the auth_token middleware.
+
+ Set up the various fake items needed to run the middleware.
+ Individual tests that need to further refine these can call this
+ function to override the class defaults.
+
+ """
+ conf = conf or self.conf
+ if 'http_handler' not in conf:
+ fake_http = fake_http or self.fake_http
+ conf['http_handler'] = fake_http
+ fake_app = fake_app or self.fake_app
+ self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
+ self.middleware._iso8601 = iso8601
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
cache_timeout = datetime.timedelta(days=1)
self.middleware.token_revocation_list_cache_timeout = cache_timeout
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- signed_list = 'SIGNED_REVOCATION_LIST'
- valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
- globals()[signed_list] = globals()[valid_signed_list]
-
def tearDown(self):
testtools.TestCase.tearDown(self)
try:
@@ -449,9 +732,8 @@ class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""
- def setUp(self, expected_env=None):
- super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
- self.middleware.http_client_class = FakeStackHTTPConnection
+ def setUp(self):
+ super(StackResponseAuthTokenMiddlewareTest, self).setUp()
def test_fetch_revocation_list_with_expire(self):
# first response to revocation list should return 401 Unauthorized
@@ -493,7 +775,8 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
# now deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_id1',
}
- super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
+ super(DiabloAuthTokenMiddlewareTest, self).setUp(
+ expected_env=expected_env)
def test_valid_diablo_response(self):
req = webob.Request.blank('/')
@@ -504,72 +787,47 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
- def assert_valid_request_200(self, token):
+
+ def assert_valid_last_url(self, token_id):
+ # Default version (v2) has id in the token, override this
+ # method for v3 and other versions
+ self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
+ self.middleware.http_client_class.last_requested_url)
+
+ def assert_valid_request_200(self, token, with_catalog=True):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
- self.assertTrue(req.headers.get('X-Service-Catalog'))
+ if with_catalog:
+ self.assertTrue(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_valid_uuid_request(self):
- self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
- self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
- FakeHTTPConnection.last_requested_url)
+ self.assert_valid_request_200(self.token_dict['uuid_token_default'])
+ self.assert_valid_last_url(self.token_dict['uuid_token_default'])
def test_valid_signed_request(self):
- FakeHTTPConnection.last_requested_url = ''
- self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
+ self.middleware.http_client_class.last_requested_url = ''
+ self.assert_valid_request_200(
+ self.token_dict['signed_token_scoped'])
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
"/testadmin")
#ensure that signed requests do not generate HTTP traffic
- self.assertEqual('', FakeHTTPConnection.last_requested_url)
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped requests with a default tenant should "auto-scope."
-
- The implied scope is the user's tenant ID.
-
- """
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, ['SUCCESS'])
- self.assertTrue('keystone.token_info' in req.environ)
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
+ self.assertEqual(
+ '', self.middleware.http_client_class.last_requested_url)
def test_revoked_token_receives_401(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = REVOKED_TOKEN
+ req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
def get_revocation_list_json(self, token_ids=None):
if token_ids is None:
- token_ids = [REVOKED_TOKEN_HASH]
+ token_ids = [self.token_dict['revoked_token_hash']]
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
for x in token_ids]}
return jsonutils.dumps(revocation_list)
@@ -578,22 +836,26 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
#explicitly setting an empty revocation list here to document intent
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ result = self.middleware.is_signed_token_revoked(
+ self.token_dict['revoked_token'])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ result = self.middleware.is_signed_token_revoked(
+ self.token_dict['revoked_token'])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token, REVOKED_TOKEN)
+ self.middleware.verify_signed_token,
+ self.token_dict['revoked_token'])
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
- self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
+ self.middleware.verify_signed_token(
+ self.token_dict['signed_token_scoped'])
def test_cert_file_missing(self):
self.assertFalse(self.middleware.cert_file_missing(
@@ -622,6 +884,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
expected)
def test_get_revocation_list_returns_fetched_list(self):
+ # auth_token uses v2 to fetch this, so don't allow the v3
+ # tests to override the fake http connection
+ self.set_fake_http(FakeHTTPConnection)
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
@@ -642,6 +907,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware.fetch_revocation_list)
def test_fetch_revocation_list(self):
+ # auth_token uses v2 to fetch this, so don't allow the v3
+ # tests to override the fake http connection
+ self.set_fake_http(FakeHTTPConnection)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
@@ -651,7 +919,7 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_invalid_signed_token(self):
req = webob.Request.blank('/')
@@ -659,14 +927,14 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token(self):
req = webob.Request.blank('/')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token_log_message(self):
class FakeLog(object):
@@ -693,12 +961,13 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
+ "Keystone uri='https://keystone.example.com:1234'")
def test_memcache(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, None)
@@ -706,15 +975,16 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_memcache_set_invalid(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware._cache = FakeMemcache()
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, "invalid")
def test_memcache_set_expired(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache()
self.middleware._use_keystone_cache = True
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
@@ -723,8 +993,9 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_swift_memcache_set_expired(self):
req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeSwiftMemcacheRing()
+ req.headers['X-Auth-Token'] = (
+ self.token_dict['signed_token_scoped'])
+ self.middleware._cache = self.fake_memcache_ring()
self.middleware._use_keystone_cache = False
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
@@ -735,48 +1006,38 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
self.disable_module('memcache')
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
+ 'auth_admin_prefix': '/testadmin',
+ 'memcache_servers': 'localhost:11211'
}
-
- auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
def test_use_cache_from_env(self):
env = {'swift.cache': 'CACHE_TEST'}
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'cache': 'swift.cache',
- 'memcache_servers': 'localhost:11211',
+ 'memcache_servers': 'localhost:11211'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- auth._init_cache(env)
- self.assertEqual(auth._cache, 'CACHE_TEST')
+ self.set_middleware(conf=conf)
+ self.middleware._init_cache(env)
+ self.assertEqual(self.middleware._cache, 'CACHE_TEST')
def test_not_use_cache_from_env(self):
self.disable_module('memcache')
env = {'swift.cache': 'CACHE_TEST'}
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
+ 'auth_admin_prefix': '/testadmin',
+ 'memcache_servers': 'localhost:11211'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- auth._init_cache(env)
- self.assertEqual(auth._cache, None)
-
- def test_request_prevent_service_catalog_injection(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
- req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertFalse(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, ['SUCCESS'])
+ self.set_middleware(conf=conf)
+ self.middleware._init_cache(env)
+ self.assertEqual(self.middleware._cache, None)
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
@@ -788,152 +1049,328 @@ class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def test_encrypt_cache_data(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'encrypt',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- encrypted_data = \
- auth._protect_cache_value('token',
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
+ self.set_middleware(conf=conf)
+ encrypted_data = self.middleware._protect_cache_value(
+ 'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
self.assertEqual(
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
- auth._unprotect_cache_value('token', encrypted_data))
+ TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
+ self.middleware._unprotect_cache_value('token', encrypted_data))
# should return None if unable to decrypt
self.assertIsNone(
- auth._unprotect_cache_value('token', '{ENCRYPT:AES256}corrupted'))
+ self.middleware._unprotect_cache_value(
+ 'token', '{ENCRYPT:AES256}corrupted'))
self.assertIsNone(
- auth._unprotect_cache_value('mykey', encrypted_data))
+ self.middleware._unprotect_cache_value('mykey', encrypted_data))
def test_sign_cache_data(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- signed_data = \
- auth._protect_cache_value('mykey',
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
+ self.set_middleware(conf=conf)
+ signed_data = self.middleware._protect_cache_value(
+ 'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
expected = '{MAC:SHA1}'
self.assertEqual(
signed_data[:10],
expected)
self.assertEqual(
- TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
- auth._unprotect_cache_value('mykey', signed_data))
+ TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
+ self.middleware._unprotect_cache_value('mykey', signed_data))
# should return None on corrupted data
self.assertIsNone(
- auth._unprotect_cache_value('mykey', '{MAC:SHA1}corrupted'))
+ self.middleware._unprotect_cache_value('mykey',
+ '{MAC:SHA1}corrupted'))
def test_no_memcache_protection(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
- data = auth._protect_cache_value('mykey', 'This is a test!')
+ self.set_middleware(conf=conf)
+ data = self.middleware._protect_cache_value('mykey',
+ 'This is a test!')
self.assertEqual(data, 'This is a test!')
self.assertEqual(
'This is a test!',
- auth._unprotect_cache_value('mykey', data))
+ self.middleware._unprotect_cache_value('mykey', data))
def test_get_cache_key(self):
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
self.assertEqual(
'tokens/mytoken',
- auth._get_cache_key('mytoken'))
+ self.middleware._get_cache_key('mytoken'))
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret',
+ 'memcache_secret_key': 'mysecret'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
- self.assertEqual(auth._get_cache_key('mytoken'), expected)
+ self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
- 'memcache_secret_key': 'abc!',
+ 'memcache_secret_key': 'abc!'
}
- auth = auth_token.AuthProtocol(FakeApp(), conf)
+ self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
- self.assertEqual(auth._get_cache_key('mytoken'), expected)
+ self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'Encrypt',
+ 'memcache_security_strategy': 'Encrypt'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
# test invalue memcache_security_strategy
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'whatever',
+ 'memcache_security_strategy': 'whatever'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
# test missing memcache_secret_key
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
- 'memcache_security_strategy': 'mac',
+ 'memcache_security_strategy': 'mac'
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': ''
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
conf = {
- 'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mAc',
'memcache_secret_key': ''
}
- self.assertRaises(Exception, auth_token.AuthProtocol,
- FakeApp(), conf)
+ self.assertRaises(Exception, self.set_middleware, conf)
+
+
+class v2AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
+ """ v2 token specific tests.
+
+ There are some differences between how the auth-token middleware handles
+ v2 and v3 tokens over and above the token formats, namely:
+
+ - A v3 keystone server will auto scope a token to a user's default project
+ if no scope is specified. A v2 server assumes that the auth-token
+ middleware will do that.
+ - A v2 keystone server may issue a token without a catalog, even with a
+ tenant
+
+ The tests below were originally part of the generic AuthTokenMiddlewareTest
+ class, but now, since they really are v2 specifc, they are included here.
+
+ """
+ def assert_unscoped_default_tenant_auto_scopes(self, token):
+ """Unscoped v2 requests with a default tenant should "auto-scope."
+
+ The implied scope is the user's tenant ID.
+
+ """
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertEqual(body, ['SUCCESS'])
+ self.assertTrue('keystone.token_info' in req.environ)
+
+ def test_default_tenant_uuid_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
+
+ def test_default_tenant_signed_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
+
+ def assert_unscoped_token_receives_401(self, token):
+ """Unscoped requests with no default tenant ID should be rejected."""
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ "Keystone uri='https://keystone.example.com:1234'")
+
+ def test_unscoped_uuid_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
+
+ def test_unscoped_pki_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
+
+ def test_request_prevent_service_catalog_injection(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Service-Catalog'] = '[]'
+ req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertFalse(req.headers.get('X-Service-Catalog'))
+ self.assertEqual(body, ['SUCCESS'])
+
+ def test_valid_uuid_request_forced_to_2_0(self):
+ """ Test forcing auth_token to use lower api version.
+
+ By installing the v3 http hander, auth_token will be get
+ a version list that looks like a v3 server - from which it
+ would normally chose v3.0 as the auth version. However, here
+ we specify v2.0 in the configuration - which should force
+ auth_token to use that version instead.
+
+ """
+ conf = {
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
+ 'signing_dir': CERTDIR,
+ 'auth_version': 'v2.0'
+ }
+ self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
+ # This tests will only work is auth_token has chosen to use the
+ # lower, v2, api version
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = UUID_TOKEN_DEFAULT
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_invalid_auth_version_request(self):
+ conf = {
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
+ 'signing_dir': CERTDIR,
+ 'auth_version': 'v1.0' # v1.0 is no longer supported
+ }
+ self.assertRaises(Exception, self.set_middleware, conf)
+
+
+class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
+ """ Test auth_token middleware with v3 tokens.
+
+ Re-execute the AuthTokenMiddlewareTest class tests, but with the
+ the auth_token middleware configured to expect v3 tokens back from
+ a keystone server.
+
+ This is done by configuring the AuthTokenMiddlewareTest class via
+ its Setup(), passing in v3 style data that will then be used by
+ the tests themselves. This approach has been used to ensure we
+ really are running the same tests for both v2 and v3 tokens.
+
+ There a few additional specific test for v3 only:
+
+ - We allow an unscoped token to be validated (as unscoped), where
+ as for v2 tokens, the auth_token middleware is expected to try and
+ auto-scope it (and fail if there is no default tenant)
+ - Domain scoped tokens
+
+ Since we don't specify an auth version for auth_token to use, by
+ definition we are thefore implicitely testing that it will use
+ the highest available auth version, i.e. v3.0
+
+ """
+ def setUp(self):
+ token_dict = {
+ 'uuid_token_default': v3_UUID_TOKEN_DEFAULT,
+ 'uuid_token_unscoped': v3_UUID_TOKEN_UNSCOPED,
+ 'signed_token_scoped': SIGNED_v3_TOKEN_SCOPED,
+ 'revoked_token': REVOKED_v3_TOKEN,
+ 'revoked_token_hash': REVOKED_v3_TOKEN_HASH
+ }
+ super(v3AuthTokenMiddlewareTest, self).setUp(
+ fake_app=v3FakeApp,
+ fake_http=v3FakeHTTPConnection,
+ token_dict=token_dict,
+ fake_memcache=v3FakeMemcache,
+ fake_memcache_ring=v3FakeSwiftMemcacheRing)
+
+ def assert_valid_last_url(self, token_id):
+ # Token ID is not part of the url in v3, so override
+ # this assert test in the base class
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_valid_unscoped_uuid_request(self):
+ # Remove items that won't be in an unscoped token
+ delta_expected_env = {
+ 'HTTP_X_PROJECT_ID': None,
+ 'HTTP_X_PROJECT_NAME': None,
+ 'HTTP_X_PROJECT_DOMAIN_ID': None,
+ 'HTTP_X_PROJECT_DOMAIN_NAME': None,
+ 'HTTP_X_TENANT_ID': None,
+ 'HTTP_X_TENANT_NAME': None,
+ 'HTTP_X_ROLES': '',
+ 'HTTP_X_TENANT': None,
+ 'HTTP_X_ROLE': '',
+ }
+ self.set_middleware(expected_env=delta_expected_env)
+ self.assert_valid_request_200(v3_UUID_TOKEN_UNSCOPED,
+ with_catalog=False)
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
+
+ def test_domain_scoped_uuid_request(self):
+ # Modify items comapred to default token for a domain scope
+ delta_expected_env = {
+ 'HTTP_X_DOMAIN_ID': 'domain_id1',
+ 'HTTP_X_DOMAIN_NAME': 'domain_name1',
+ 'HTTP_X_PROJECT_ID': None,
+ 'HTTP_X_PROJECT_NAME': None,
+ 'HTTP_X_PROJECT_DOMAIN_ID': None,
+ 'HTTP_X_PROJECT_DOMAIN_NAME': None,
+ 'HTTP_X_TENANT_ID': None,
+ 'HTTP_X_TENANT_NAME': None,
+ 'HTTP_X_TENANT': None
+ }
+ self.set_middleware(expected_env=delta_expected_env)
+ self.assert_valid_request_200(v3_UUID_TOKEN_DOMAIN_SCOPED)
+ self.assertEqual('/testadmin/v3/auth/tokens',
+ v3FakeHTTPConnection.last_requested_url)
class TokenEncodingTest(testtools.TestCase):