summaryrefslogtreecommitdiff
path: root/tests/test_auth_token_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_auth_token_middleware.py')
-rw-r--r--tests/test_auth_token_middleware.py670
1 files changed, 670 insertions, 0 deletions
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py
new file mode 100644
index 0000000..79cbc91
--- /dev/null
+++ b/tests/test_auth_token_middleware.py
@@ -0,0 +1,670 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import iso8601
+import os
+import string
+import tempfile
+import unittest2 as unittest
+
+import webob
+
+from keystoneclient.common import cms
+from keystoneclient import utils
+from keystoneclient.middleware import auth_token
+from keystoneclient.openstack.common import jsonutils
+from keystoneclient.openstack.common import timeutils
+from keystoneclient.middleware import test
+
+
+CERTDIR = test.rootdir("python-keystoneclient/examples/pki/certs")
+KEYDIR = test.rootdir("python-keystoneclient/examples/pki/private")
+CMSDIR = test.rootdir("python-keystoneclient/examples/pki/cms")
+SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
+SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
+CA = os.path.join(CERTDIR, 'ca.pem')
+
+REVOCATION_LIST = None
+REVOKED_TOKEN = None
+REVOKED_TOKEN_HASH = None
+SIGNED_REVOCATION_LIST = None
+SIGNED_TOKEN_SCOPED = None
+SIGNED_TOKEN_UNSCOPED = None
+SIGNED_TOKEN_SCOPED_KEY = None
+SIGNED_TOKEN_UNSCOPED_KEY = None
+
+VALID_SIGNED_REVOCATION_LIST = None
+
+UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
+UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
+UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
+VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
+
+INVALID_SIGNED_TOKEN = string.replace(
+ """AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
+CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
+DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
+EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
+FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
+0000000000000000000000000000000000000000000000000000000000000000
+1111111111111111111111111111111111111111111111111111111111111111
+2222222222222222222222222222222222222222222222222222222222222222
+3333333333333333333333333333333333333333333333333333333333333333
+4444444444444444444444444444444444444444444444444444444444444444
+5555555555555555555555555555555555555555555555555555555555555555
+6666666666666666666666666666666666666666666666666666666666666666
+7777777777777777777777777777777777777777777777777777777777777777
+8888888888888888888888888888888888888888888888888888888888888888
+9999999999999999999999999999999999999999999999999999999999999999
+0000000000000000000000000000000000000000000000000000000000000000
+xg==""", "\n", "")
+
+# JSON responses keyed by token ID
+TOKEN_RESPONSES = {
+ UUID_TOKEN_DEFAULT: {
+ 'access': {
+ 'token': {
+ 'id': UUID_TOKEN_DEFAULT,
+ 'expires': '2999-01-01T00:00:10Z',
+ 'tenant': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ },
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ },
+ 'serviceCatalog': {}
+ },
+ },
+ VALID_DIABLO_TOKEN: {
+ 'access': {
+ 'token': {
+ 'id': VALID_DIABLO_TOKEN,
+ 'expires': '2999-01-01T00:00:10',
+ 'tenantId': 'tenant_id1',
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ },
+ },
+ },
+ UUID_TOKEN_UNSCOPED: {
+ 'access': {
+ 'token': {
+ 'id': UUID_TOKEN_UNSCOPED,
+ 'expires': '2999-01-01T00:00:10Z',
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ },
+ },
+ },
+ UUID_TOKEN_NO_SERVICE_CATALOG: {
+ 'access': {
+ 'token': {
+ 'id': 'valid-token',
+ 'expires': '2999-01-01T00:00:10Z',
+ 'tenant': {
+ 'id': 'tenant_id1',
+ 'name': 'tenant_name1',
+ },
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ }
+ },
+ },
+}
+
+FAKE_RESPONSE_STACK = []
+
+
+# The data for these tests are signed using openssl and are stored in files
+# in the signing subdirectory. In order to keep the values consistent between
+# the tests and the signed documents, we read them in for use in the tests.
+def setUpModule(self):
+ signing_path = CMSDIR
+ with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
+ self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
+ with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
+ self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
+ with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
+ self.REVOKED_TOKEN = cms.cms_to_token(f.read())
+ self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
+ with open(os.path.join(signing_path, 'revocation_list.json')) as f:
+ self.REVOCATION_LIST = jsonutils.loads(f.read())
+ with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
+ self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
+ {'signed': f.read()})
+ self.SIGNED_TOKEN_SCOPED_KEY =\
+ cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
+ self.SIGNED_TOKEN_UNSCOPED_KEY =\
+ cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
+
+ self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
+ 'access': {
+ 'token': {
+ 'id': self.SIGNED_TOKEN_SCOPED_KEY,
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'tenantId': 'tenant_id1',
+ 'tenantName': 'tenant_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ },
+ },
+ }
+
+ self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
+ 'access': {
+ 'token': {
+ 'id': SIGNED_TOKEN_UNSCOPED_KEY,
+ },
+ 'user': {
+ 'id': 'user_id1',
+ 'name': 'user_name1',
+ 'roles': [
+ {'name': 'role1'},
+ {'name': 'role2'},
+ ],
+ },
+ },
+ },
+
+
+class FakeMemcache(object):
+ def __init__(self):
+ self.set_key = None
+ self.set_value = None
+ self.token_expiration = None
+
+ def get(self, key):
+ data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
+ if not data or key != "tokens/%s" % (data['access']['token']['id']):
+ return
+ if not self.token_expiration:
+ dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
+ self.token_expiration = dt.strftime("%s")
+ dt = datetime.datetime.now() + datetime.timedelta(hours=24)
+ ks_expires = dt.isoformat()
+ data['access']['token']['expires'] = ks_expires
+ return (data, str(self.token_expiration))
+
+ def set(self, key, value, time=None):
+ self.set_value = value
+ self.set_key = key
+
+
+class FakeHTTPResponse(object):
+ def __init__(self, status, body):
+ self.status = status
+ self.body = body
+
+ def read(self):
+ return self.body
+
+
+class FakeStackHTTPConnection(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def getresponse(self):
+ if len(FAKE_RESPONSE_STACK):
+ return FAKE_RESPONSE_STACK.pop()
+ return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
+
+ def request(self, *_args, **_kwargs):
+ pass
+
+ def close(self):
+ pass
+
+
+class FakeHTTPConnection(object):
+
+ last_requested_url = ''
+
+ def __init__(self, *args):
+ self.send_valid_revocation_list = True
+
+ def request(self, method, path, **kwargs):
+ """Fakes out several http responses.
+
+ If a POST request is made, we assume the calling code is trying
+ to get a new admin token.
+
+ If a GET request is made to validate a token, return success
+ if the token is 'token1'. If a different token is provided, return
+ a 404, indicating an unknown (therefore unauthorized) token.
+
+ """
+ FakeHTTPConnection.last_requested_url = path
+ if method == 'POST':
+ status = 200
+ body = jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2'},
+ },
+ })
+
+ else:
+ token_id = path.rsplit('/', 1)[1]
+ if token_id in TOKEN_RESPONSES.keys():
+ status = 200
+ body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
+ elif token_id == "revoked":
+ status = 200
+ body = SIGNED_REVOCATION_LIST
+ else:
+ status = 404
+ body = str()
+
+ self.resp = FakeHTTPResponse(status, body)
+
+ def getresponse(self):
+ return self.resp
+
+ def close(self):
+ pass
+
+
+class FakeApp(object):
+ """This represents a WSGI app protected by the auth_token middleware."""
+ def __init__(self, expected_env=None):
+ expected_env = expected_env or {}
+ self.expected_env = {
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
+ 'HTTP_X_TENANT_ID': 'tenant_id1',
+ 'HTTP_X_TENANT_NAME': 'tenant_name1',
+ 'HTTP_X_USER_ID': 'user_id1',
+ 'HTTP_X_USER_NAME': 'user_name1',
+ 'HTTP_X_ROLES': 'role1,role2',
+ 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
+ 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
+ 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
+ }
+ self.expected_env.update(expected_env)
+
+ def __call__(self, env, start_response):
+ for k, v in self.expected_env.items():
+ assert env[k] == v, '%s != %s' % (env[k], v)
+
+ resp = webob.Response()
+ resp.body = 'SUCCESS'
+ return resp(env, start_response)
+
+
+class BaseAuthTokenMiddlewareTest(unittest.TestCase):
+
+ def setUp(self, expected_env=None):
+ expected_env = expected_env or {}
+
+ conf = {
+ 'admin_token': 'admin_token1',
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'auth_admin_prefix': '/testadmin',
+ 'signing_dir': CERTDIR,
+ }
+
+ self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
+ self.middleware.http_client_class = FakeHTTPConnection
+ self.middleware._iso8601 = iso8601
+
+ self.response_status = None
+ self.response_headers = None
+ self.middleware.revoked_file_name = tempfile.mkstemp()[1]
+ cache_timeout = datetime.timedelta(days=1)
+ self.middleware.token_revocation_list_cache_timeout = cache_timeout
+ self.middleware.token_revocation_list = jsonutils.dumps(
+ {"revoked": [], "extra": "success"})
+
+ signed_list = 'SIGNED_REVOCATION_LIST'
+ valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
+ globals()[signed_list] = globals()[valid_signed_list]
+
+ super(BaseAuthTokenMiddlewareTest, self).setUp()
+
+ def tearDown(self):
+ super(BaseAuthTokenMiddlewareTest, self).tearDown()
+ try:
+ os.remove(self.middleware.revoked_file_name)
+ except OSError:
+ pass
+
+ def start_fake_response(self, status, headers):
+ self.response_status = int(status.split(' ', 1)[0])
+ self.response_headers = dict(headers)
+
+
+class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
+ """Auth Token middleware test setup that allows the tests to define
+ a stack of responses to HTTP requests in the test and get those
+ responses back in sequence for testing.
+
+ Example::
+
+ resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
+ resp2 = FakeHTTPResponse(200, jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2'},
+ },
+ })
+ FAKE_RESPONSE_STACK.append(resp1)
+ FAKE_RESPONSE_STACK.append(resp2)
+
+ ... do your testing code here ...
+
+ """
+
+ def setUp(self, expected_env=None):
+ super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
+ self.middleware.http_client_class = FakeStackHTTPConnection
+
+ def test_fetch_revocation_list_with_expire(self):
+ # first response to revocation list should return 401 Unauthorized
+ # to pretend to be an expired token
+ resp1 = FakeHTTPResponse(200, jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2'},
+ },
+ }))
+ resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
+ resp3 = FakeHTTPResponse(200, jsonutils.dumps({
+ 'access': {
+ 'token': {'id': 'admin_token2'},
+ },
+ }))
+ resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
+
+ # first get_admin_token() call
+ FAKE_RESPONSE_STACK.append(resp1)
+ # request revocation list, get "unauthorized" due to simulated expired
+ # token
+ FAKE_RESPONSE_STACK.append(resp2)
+ # request a new admin_token
+ FAKE_RESPONSE_STACK.append(resp3)
+ # request revocation list, get the revocation list properly
+ FAKE_RESPONSE_STACK.append(resp4)
+
+ fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
+ self.assertEqual(fetched_list, REVOCATION_LIST)
+
+
+class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
+ """Auth Token middleware should understand Diablo keystone responses."""
+ def setUp(self):
+ # pre-diablo only had Tenant ID, which was also the Name
+ expected_env = {
+ 'HTTP_X_TENANT_ID': 'tenant_id1',
+ 'HTTP_X_TENANT_NAME': 'tenant_id1',
+ # now deprecated (diablo-compat)
+ 'HTTP_X_TENANT': 'tenant_id1',
+ }
+ super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
+
+ def test_valid_diablo_response(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertTrue('keystone.token_info' in req.environ)
+
+
+class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
+ def assert_valid_request_200(self, token):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertTrue(req.headers.get('X-Service-Catalog'))
+ self.assertEqual(body, ['SUCCESS'])
+ self.assertTrue('keystone.token_info' in req.environ)
+
+ def test_valid_uuid_request(self):
+ self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
+ self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
+ FakeHTTPConnection.last_requested_url)
+
+ def test_valid_signed_request(self):
+ FakeHTTPConnection.last_requested_url = ''
+ self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
+ self.assertEqual(self.middleware.conf['auth_admin_prefix'],
+ "/testadmin")
+ #ensure that signed requests do not generate HTTP traffic
+ self.assertEqual('', FakeHTTPConnection.last_requested_url)
+
+ def assert_unscoped_default_tenant_auto_scopes(self, token):
+ """Unscoped requests with a default tenant should "auto-scope."
+
+ The implied scope is the user's tenant ID.
+
+ """
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertEqual(body, ['SUCCESS'])
+ self.assertTrue('keystone.token_info' in req.environ)
+
+ def test_default_tenant_uuid_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
+
+ def test_default_tenant_signed_token(self):
+ self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
+
+ def assert_unscoped_token_receives_401(self, token):
+ """Unscoped requests with no default tenant ID should be rejected."""
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = token
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ 'Keystone uri=\'https://keystone.example.com:1234\'')
+
+ def test_unscoped_uuid_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
+
+ def test_unscoped_pki_token_receives_401(self):
+ self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
+
+ def test_revoked_token_receives_401(self):
+ self.middleware.token_revocation_list = self.get_revocation_list_json()
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = REVOKED_TOKEN
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+
+ def get_revocation_list_json(self, token_ids=None):
+ if token_ids is None:
+ token_ids = [REVOKED_TOKEN_HASH]
+ revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
+ for x in token_ids]}
+ return jsonutils.dumps(revocation_list)
+
+ def test_is_signed_token_revoked_returns_false(self):
+ #explicitly setting an empty revocation list here to document intent
+ self.middleware.token_revocation_list = jsonutils.dumps(
+ {"revoked": [], "extra": "success"})
+ result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ self.assertFalse(result)
+
+ def test_is_signed_token_revoked_returns_true(self):
+ self.middleware.token_revocation_list = self.get_revocation_list_json()
+ result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
+ self.assertTrue(result)
+
+ def test_verify_signed_token_raises_exception_for_revoked_token(self):
+ self.middleware.token_revocation_list = self.get_revocation_list_json()
+ with self.assertRaises(auth_token.InvalidUserToken):
+ self.middleware.verify_signed_token(REVOKED_TOKEN)
+
+ def test_verify_signed_token_succeeds_for_unrevoked_token(self):
+ self.middleware.token_revocation_list = self.get_revocation_list_json()
+ self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
+
+ def test_get_token_revocation_list_fetched_time_returns_min(self):
+ self.middleware.token_revocation_list_fetched_time = None
+ self.middleware.revoked_file_name = ''
+ self.assertEqual(self.middleware.token_revocation_list_fetched_time,
+ datetime.datetime.min)
+
+ def test_get_token_revocation_list_fetched_time_returns_mtime(self):
+ self.middleware.token_revocation_list_fetched_time = None
+ mtime = os.path.getmtime(self.middleware.revoked_file_name)
+ fetched_time = datetime.datetime.fromtimestamp(mtime)
+ self.assertEqual(self.middleware.token_revocation_list_fetched_time,
+ fetched_time)
+
+ def test_get_token_revocation_list_fetched_time_returns_value(self):
+ expected = self.middleware._token_revocation_list_fetched_time
+ self.assertEqual(self.middleware.token_revocation_list_fetched_time,
+ expected)
+
+ def test_get_revocation_list_returns_fetched_list(self):
+ self.middleware.token_revocation_list_fetched_time = None
+ os.remove(self.middleware.revoked_file_name)
+ self.assertEqual(self.middleware.token_revocation_list,
+ REVOCATION_LIST)
+
+ def test_get_revocation_list_returns_current_list_from_memory(self):
+ self.assertEqual(self.middleware.token_revocation_list,
+ self.middleware._token_revocation_list)
+
+ def test_get_revocation_list_returns_current_list_from_disk(self):
+ in_memory_list = self.middleware.token_revocation_list
+ self.middleware._token_revocation_list = None
+ self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
+
+ def test_invalid_revocation_list_raises_service_error(self):
+ globals()['SIGNED_REVOCATION_LIST'] = "{}"
+ with self.assertRaises(auth_token.ServiceError):
+ self.middleware.fetch_revocation_list()
+
+ def test_fetch_revocation_list(self):
+ fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
+ self.assertEqual(fetched_list, REVOCATION_LIST)
+
+ def test_request_invalid_uuid_token(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = 'invalid-token'
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ 'Keystone uri=\'https://keystone.example.com:1234\'')
+
+ def test_request_invalid_signed_token(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ 'Keystone uri=\'https://keystone.example.com:1234\'')
+
+ def test_request_no_token(self):
+ req = webob.Request.blank('/')
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ 'Keystone uri=\'https://keystone.example.com:1234\'')
+
+ def test_request_blank_token(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = ''
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 401)
+ self.assertEqual(self.response_headers['WWW-Authenticate'],
+ 'Keystone uri=\'https://keystone.example.com:1234\'')
+
+ def test_memcache(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
+ self.middleware._cache = FakeMemcache()
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.middleware._cache.set_value, None)
+
+ def test_memcache_set_invalid(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = 'invalid-token'
+ self.middleware._cache = FakeMemcache()
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.middleware._cache.set_value, "invalid")
+
+ def test_memcache_set_expired(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
+ self.middleware._cache = FakeMemcache()
+ expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
+ self.middleware._cache.token_expiration = float(expired.strftime("%s"))
+ self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(len(self.middleware._cache.set_value), 2)
+
+ def test_nomemcache(self):
+ self.disable_module('memcache')
+
+ conf = {
+ 'admin_token': 'admin_token1',
+ 'auth_host': 'keystone.example.com',
+ 'auth_port': 1234,
+ 'memcache_servers': 'localhost:11211',
+ }
+
+ auth_token.AuthProtocol(FakeApp(), conf)
+
+ def test_request_prevent_service_catalog_injection(self):
+ req = webob.Request.blank('/')
+ req.headers['X-Service-Catalog'] = '[]'
+ req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
+ body = self.middleware(req.environ, self.start_fake_response)
+ self.assertEqual(self.response_status, 200)
+ self.assertFalse(req.headers.get('X-Service-Catalog'))
+ self.assertEqual(body, ['SUCCESS'])
+
+ def test_will_expire_soon(self):
+ tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
+ seconds=10)
+ self.assertTrue(auth_token.will_expire_soon(tenseconds))
+ fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
+ seconds=40)
+ self.assertFalse(auth_token.will_expire_soon(fortyseconds))