diff options
Diffstat (limited to 'tests/test_auth_token_middleware.py')
-rw-r--r-- | tests/test_auth_token_middleware.py | 670 |
1 files changed, 670 insertions, 0 deletions
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py new file mode 100644 index 0000000..79cbc91 --- /dev/null +++ b/tests/test_auth_token_middleware.py @@ -0,0 +1,670 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import iso8601 +import os +import string +import tempfile +import unittest2 as unittest + +import webob + +from keystoneclient.common import cms +from keystoneclient import utils +from keystoneclient.middleware import auth_token +from keystoneclient.openstack.common import jsonutils +from keystoneclient.openstack.common import timeutils +from keystoneclient.middleware import test + + +CERTDIR = test.rootdir("python-keystoneclient/examples/pki/certs") +KEYDIR = test.rootdir("python-keystoneclient/examples/pki/private") +CMSDIR = test.rootdir("python-keystoneclient/examples/pki/cms") +SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem') +SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem') +CA = os.path.join(CERTDIR, 'ca.pem') + +REVOCATION_LIST = None +REVOKED_TOKEN = None +REVOKED_TOKEN_HASH = None +SIGNED_REVOCATION_LIST = None +SIGNED_TOKEN_SCOPED = None +SIGNED_TOKEN_UNSCOPED = None +SIGNED_TOKEN_SCOPED_KEY = None +SIGNED_TOKEN_UNSCOPED_KEY = None + +VALID_SIGNED_REVOCATION_LIST = None + +UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d" +UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df' +UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776' +VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726' + +INVALID_SIGNED_TOKEN = string.replace( + """AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA +BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB +CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC +DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD +EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE +FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF +0000000000000000000000000000000000000000000000000000000000000000 +1111111111111111111111111111111111111111111111111111111111111111 +2222222222222222222222222222222222222222222222222222222222222222 +3333333333333333333333333333333333333333333333333333333333333333 +4444444444444444444444444444444444444444444444444444444444444444 +5555555555555555555555555555555555555555555555555555555555555555 +6666666666666666666666666666666666666666666666666666666666666666 +7777777777777777777777777777777777777777777777777777777777777777 +8888888888888888888888888888888888888888888888888888888888888888 +9999999999999999999999999999999999999999999999999999999999999999 +0000000000000000000000000000000000000000000000000000000000000000 +xg==""", "\n", "") + +# JSON responses keyed by token ID +TOKEN_RESPONSES = { + UUID_TOKEN_DEFAULT: { + 'access': { + 'token': { + 'id': UUID_TOKEN_DEFAULT, + 'expires': '2999-01-01T00:00:10Z', + 'tenant': { + 'id': 'tenant_id1', + 'name': 'tenant_name1', + }, + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + 'serviceCatalog': {} + }, + }, + VALID_DIABLO_TOKEN: { + 'access': { + 'token': { + 'id': VALID_DIABLO_TOKEN, + 'expires': '2999-01-01T00:00:10', + 'tenantId': 'tenant_id1', + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + }, + UUID_TOKEN_UNSCOPED: { + 'access': { + 'token': { + 'id': UUID_TOKEN_UNSCOPED, + 'expires': '2999-01-01T00:00:10Z', + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + }, + UUID_TOKEN_NO_SERVICE_CATALOG: { + 'access': { + 'token': { + 'id': 'valid-token', + 'expires': '2999-01-01T00:00:10Z', + 'tenant': { + 'id': 'tenant_id1', + 'name': 'tenant_name1', + }, + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + } + }, + }, +} + +FAKE_RESPONSE_STACK = [] + + +# The data for these tests are signed using openssl and are stored in files +# in the signing subdirectory. In order to keep the values consistent between +# the tests and the signed documents, we read them in for use in the tests. +def setUpModule(self): + signing_path = CMSDIR + with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f: + self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read()) + with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f: + self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read()) + with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f: + self.REVOKED_TOKEN = cms.cms_to_token(f.read()) + self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN) + with open(os.path.join(signing_path, 'revocation_list.json')) as f: + self.REVOCATION_LIST = jsonutils.loads(f.read()) + with open(os.path.join(signing_path, 'revocation_list.pem')) as f: + self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps( + {'signed': f.read()}) + self.SIGNED_TOKEN_SCOPED_KEY =\ + cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED) + self.SIGNED_TOKEN_UNSCOPED_KEY =\ + cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED) + + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = { + 'access': { + 'token': { + 'id': self.SIGNED_TOKEN_SCOPED_KEY, + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'tenantId': 'tenant_id1', + 'tenantName': 'tenant_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + } + + self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = { + 'access': { + 'token': { + 'id': SIGNED_TOKEN_UNSCOPED_KEY, + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + }, + }, + + +class FakeMemcache(object): + def __init__(self): + self.set_key = None + self.set_value = None + self.token_expiration = None + + def get(self, key): + data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy() + if not data or key != "tokens/%s" % (data['access']['token']['id']): + return + if not self.token_expiration: + dt = datetime.datetime.now() + datetime.timedelta(minutes=5) + self.token_expiration = dt.strftime("%s") + dt = datetime.datetime.now() + datetime.timedelta(hours=24) + ks_expires = dt.isoformat() + data['access']['token']['expires'] = ks_expires + return (data, str(self.token_expiration)) + + def set(self, key, value, time=None): + self.set_value = value + self.set_key = key + + +class FakeHTTPResponse(object): + def __init__(self, status, body): + self.status = status + self.body = body + + def read(self): + return self.body + + +class FakeStackHTTPConnection(object): + + def __init__(self, *args, **kwargs): + pass + + def getresponse(self): + if len(FAKE_RESPONSE_STACK): + return FAKE_RESPONSE_STACK.pop() + return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE')) + + def request(self, *_args, **_kwargs): + pass + + def close(self): + pass + + +class FakeHTTPConnection(object): + + last_requested_url = '' + + def __init__(self, *args): + self.send_valid_revocation_list = True + + def request(self, method, path, **kwargs): + """Fakes out several http responses. + + If a POST request is made, we assume the calling code is trying + to get a new admin token. + + If a GET request is made to validate a token, return success + if the token is 'token1'. If a different token is provided, return + a 404, indicating an unknown (therefore unauthorized) token. + + """ + FakeHTTPConnection.last_requested_url = path + if method == 'POST': + status = 200 + body = jsonutils.dumps({ + 'access': { + 'token': {'id': 'admin_token2'}, + }, + }) + + else: + token_id = path.rsplit('/', 1)[1] + if token_id in TOKEN_RESPONSES.keys(): + status = 200 + body = jsonutils.dumps(TOKEN_RESPONSES[token_id]) + elif token_id == "revoked": + status = 200 + body = SIGNED_REVOCATION_LIST + else: + status = 404 + body = str() + + self.resp = FakeHTTPResponse(status, body) + + def getresponse(self): + return self.resp + + def close(self): + pass + + +class FakeApp(object): + """This represents a WSGI app protected by the auth_token middleware.""" + def __init__(self, expected_env=None): + expected_env = expected_env or {} + self.expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'HTTP_X_TENANT_ID': 'tenant_id1', + 'HTTP_X_TENANT_NAME': 'tenant_name1', + 'HTTP_X_USER_ID': 'user_id1', + 'HTTP_X_USER_NAME': 'user_name1', + 'HTTP_X_ROLES': 'role1,role2', + 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat) + 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat) + 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) + } + self.expected_env.update(expected_env) + + def __call__(self, env, start_response): + for k, v in self.expected_env.items(): + assert env[k] == v, '%s != %s' % (env[k], v) + + resp = webob.Response() + resp.body = 'SUCCESS' + return resp(env, start_response) + + +class BaseAuthTokenMiddlewareTest(unittest.TestCase): + + def setUp(self, expected_env=None): + expected_env = expected_env or {} + + conf = { + 'admin_token': 'admin_token1', + 'auth_host': 'keystone.example.com', + 'auth_port': 1234, + 'auth_admin_prefix': '/testadmin', + 'signing_dir': CERTDIR, + } + + self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf) + self.middleware.http_client_class = FakeHTTPConnection + self.middleware._iso8601 = iso8601 + + self.response_status = None + self.response_headers = None + self.middleware.revoked_file_name = tempfile.mkstemp()[1] + cache_timeout = datetime.timedelta(days=1) + self.middleware.token_revocation_list_cache_timeout = cache_timeout + self.middleware.token_revocation_list = jsonutils.dumps( + {"revoked": [], "extra": "success"}) + + signed_list = 'SIGNED_REVOCATION_LIST' + valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST' + globals()[signed_list] = globals()[valid_signed_list] + + super(BaseAuthTokenMiddlewareTest, self).setUp() + + def tearDown(self): + super(BaseAuthTokenMiddlewareTest, self).tearDown() + try: + os.remove(self.middleware.revoked_file_name) + except OSError: + pass + + def start_fake_response(self, status, headers): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + + +class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): + """Auth Token middleware test setup that allows the tests to define + a stack of responses to HTTP requests in the test and get those + responses back in sequence for testing. + + Example:: + + resp1 = FakeHTTPResponse(401, jsonutils.dumps('')) + resp2 = FakeHTTPResponse(200, jsonutils.dumps({ + 'access': { + 'token': {'id': 'admin_token2'}, + }, + }) + FAKE_RESPONSE_STACK.append(resp1) + FAKE_RESPONSE_STACK.append(resp2) + + ... do your testing code here ... + + """ + + def setUp(self, expected_env=None): + super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env) + self.middleware.http_client_class = FakeStackHTTPConnection + + def test_fetch_revocation_list_with_expire(self): + # first response to revocation list should return 401 Unauthorized + # to pretend to be an expired token + resp1 = FakeHTTPResponse(200, jsonutils.dumps({ + 'access': { + 'token': {'id': 'admin_token2'}, + }, + })) + resp2 = FakeHTTPResponse(401, jsonutils.dumps('')) + resp3 = FakeHTTPResponse(200, jsonutils.dumps({ + 'access': { + 'token': {'id': 'admin_token2'}, + }, + })) + resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST) + + # first get_admin_token() call + FAKE_RESPONSE_STACK.append(resp1) + # request revocation list, get "unauthorized" due to simulated expired + # token + FAKE_RESPONSE_STACK.append(resp2) + # request a new admin_token + FAKE_RESPONSE_STACK.append(resp3) + # request revocation list, get the revocation list properly + FAKE_RESPONSE_STACK.append(resp4) + + fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) + self.assertEqual(fetched_list, REVOCATION_LIST) + + +class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): + """Auth Token middleware should understand Diablo keystone responses.""" + def setUp(self): + # pre-diablo only had Tenant ID, which was also the Name + expected_env = { + 'HTTP_X_TENANT_ID': 'tenant_id1', + 'HTTP_X_TENANT_NAME': 'tenant_id1', + # now deprecated (diablo-compat) + 'HTTP_X_TENANT': 'tenant_id1', + } + super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env) + + def test_valid_diablo_response(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertTrue('keystone.token_info' in req.environ) + + +class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest): + def assert_valid_request_200(self, token): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertTrue(req.headers.get('X-Service-Catalog')) + self.assertEqual(body, ['SUCCESS']) + self.assertTrue('keystone.token_info' in req.environ) + + def test_valid_uuid_request(self): + self.assert_valid_request_200(UUID_TOKEN_DEFAULT) + self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT, + FakeHTTPConnection.last_requested_url) + + def test_valid_signed_request(self): + FakeHTTPConnection.last_requested_url = '' + self.assert_valid_request_200(SIGNED_TOKEN_SCOPED) + self.assertEqual(self.middleware.conf['auth_admin_prefix'], + "/testadmin") + #ensure that signed requests do not generate HTTP traffic + self.assertEqual('', FakeHTTPConnection.last_requested_url) + + def assert_unscoped_default_tenant_auto_scopes(self, token): + """Unscoped requests with a default tenant should "auto-scope." + + The implied scope is the user's tenant ID. + + """ + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertEqual(body, ['SUCCESS']) + self.assertTrue('keystone.token_info' in req.environ) + + def test_default_tenant_uuid_token(self): + self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT) + + def test_default_tenant_signed_token(self): + self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED) + + def assert_unscoped_token_receives_401(self, token): + """Unscoped requests with no default tenant ID should be rejected.""" + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + 'Keystone uri=\'https://keystone.example.com:1234\'') + + def test_unscoped_uuid_token_receives_401(self): + self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED) + + def test_unscoped_pki_token_receives_401(self): + self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED) + + def test_revoked_token_receives_401(self): + self.middleware.token_revocation_list = self.get_revocation_list_json() + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = REVOKED_TOKEN + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + + def get_revocation_list_json(self, token_ids=None): + if token_ids is None: + token_ids = [REVOKED_TOKEN_HASH] + revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()} + for x in token_ids]} + return jsonutils.dumps(revocation_list) + + def test_is_signed_token_revoked_returns_false(self): + #explicitly setting an empty revocation list here to document intent + self.middleware.token_revocation_list = jsonutils.dumps( + {"revoked": [], "extra": "success"}) + result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN) + self.assertFalse(result) + + def test_is_signed_token_revoked_returns_true(self): + self.middleware.token_revocation_list = self.get_revocation_list_json() + result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN) + self.assertTrue(result) + + def test_verify_signed_token_raises_exception_for_revoked_token(self): + self.middleware.token_revocation_list = self.get_revocation_list_json() + with self.assertRaises(auth_token.InvalidUserToken): + self.middleware.verify_signed_token(REVOKED_TOKEN) + + def test_verify_signed_token_succeeds_for_unrevoked_token(self): + self.middleware.token_revocation_list = self.get_revocation_list_json() + self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED) + + def test_get_token_revocation_list_fetched_time_returns_min(self): + self.middleware.token_revocation_list_fetched_time = None + self.middleware.revoked_file_name = '' + self.assertEqual(self.middleware.token_revocation_list_fetched_time, + datetime.datetime.min) + + def test_get_token_revocation_list_fetched_time_returns_mtime(self): + self.middleware.token_revocation_list_fetched_time = None + mtime = os.path.getmtime(self.middleware.revoked_file_name) + fetched_time = datetime.datetime.fromtimestamp(mtime) + self.assertEqual(self.middleware.token_revocation_list_fetched_time, + fetched_time) + + def test_get_token_revocation_list_fetched_time_returns_value(self): + expected = self.middleware._token_revocation_list_fetched_time + self.assertEqual(self.middleware.token_revocation_list_fetched_time, + expected) + + def test_get_revocation_list_returns_fetched_list(self): + self.middleware.token_revocation_list_fetched_time = None + os.remove(self.middleware.revoked_file_name) + self.assertEqual(self.middleware.token_revocation_list, + REVOCATION_LIST) + + def test_get_revocation_list_returns_current_list_from_memory(self): + self.assertEqual(self.middleware.token_revocation_list, + self.middleware._token_revocation_list) + + def test_get_revocation_list_returns_current_list_from_disk(self): + in_memory_list = self.middleware.token_revocation_list + self.middleware._token_revocation_list = None + self.assertEqual(self.middleware.token_revocation_list, in_memory_list) + + def test_invalid_revocation_list_raises_service_error(self): + globals()['SIGNED_REVOCATION_LIST'] = "{}" + with self.assertRaises(auth_token.ServiceError): + self.middleware.fetch_revocation_list() + + def test_fetch_revocation_list(self): + fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) + self.assertEqual(fetched_list, REVOCATION_LIST) + + def test_request_invalid_uuid_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = 'invalid-token' + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + 'Keystone uri=\'https://keystone.example.com:1234\'') + + def test_request_invalid_signed_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + 'Keystone uri=\'https://keystone.example.com:1234\'') + + def test_request_no_token(self): + req = webob.Request.blank('/') + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + 'Keystone uri=\'https://keystone.example.com:1234\'') + + def test_request_blank_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = '' + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + 'Keystone uri=\'https://keystone.example.com:1234\'') + + def test_memcache(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED + self.middleware._cache = FakeMemcache() + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.middleware._cache.set_value, None) + + def test_memcache_set_invalid(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = 'invalid-token' + self.middleware._cache = FakeMemcache() + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.middleware._cache.set_value, "invalid") + + def test_memcache_set_expired(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED + self.middleware._cache = FakeMemcache() + expired = datetime.datetime.now() - datetime.timedelta(minutes=1) + self.middleware._cache.token_expiration = float(expired.strftime("%s")) + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(len(self.middleware._cache.set_value), 2) + + def test_nomemcache(self): + self.disable_module('memcache') + + conf = { + 'admin_token': 'admin_token1', + 'auth_host': 'keystone.example.com', + 'auth_port': 1234, + 'memcache_servers': 'localhost:11211', + } + + auth_token.AuthProtocol(FakeApp(), conf) + + def test_request_prevent_service_catalog_injection(self): + req = webob.Request.blank('/') + req.headers['X-Service-Catalog'] = '[]' + req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertFalse(req.headers.get('X-Service-Catalog')) + self.assertEqual(body, ['SUCCESS']) + + def test_will_expire_soon(self): + tenseconds = datetime.datetime.utcnow() + datetime.timedelta( + seconds=10) + self.assertTrue(auth_token.will_expire_soon(tenseconds)) + fortyseconds = datetime.datetime.utcnow() + datetime.timedelta( + seconds=40) + self.assertFalse(auth_token.will_expire_soon(fortyseconds)) |