diff options
author | Morgan Fainberg <m@metacloud.com> | 2013-12-08 21:11:59 -0800 |
---|---|---|
committer | Morgan Fainberg <m@metacloud.com> | 2014-01-13 09:53:00 -0800 |
commit | 9837137580688d0c8509c2a5337004d55f2b88e0 (patch) | |
tree | cd23282133c265a7d0bab984fc83412a8b0b3336 /keystone/tests/test_backend.py | |
parent | 83db9722c201eca9fa47d93fce7d09dd6f28e053 (diff) | |
download | keystone-9837137580688d0c8509c2a5337004d55f2b88e0.tar.gz |
Convert Token KVS backend to new KeyValueStore Impl
This patchset converts the current token KVS backend to the new
dogpile.cache based KeyValueStore implementation. The changeset
provides the same public interface as the previous KVS driver but
drastically reworks the internal mechanisms to be more similar to
the memcache based driver so that direct access to the in-mem db
is not required to effectively use the KeyValueStore.
KVS token backend is not deprecated anymore. However, it is still
inadvisable to use the basic-inmemory KVS backend for anything
outside of testing.
DocImpact
bp: dogpile-kvs-backends
Change-Id: Ib278636d4ffa3f7152287a48d02be598c50f698a
Diffstat (limited to 'keystone/tests/test_backend.py')
-rw-r--r-- | keystone/tests/test_backend.py | 107 |
1 files changed, 86 insertions, 21 deletions
diff --git a/keystone/tests/test_backend.py b/keystone/tests/test_backend.py index 20bfaaec5..11c808025 100644 --- a/keystone/tests/test_backend.py +++ b/keystone/tests/test_backend.py @@ -20,14 +20,16 @@ import hashlib import mock import uuid +from six import moves + from keystone.catalog import core from keystone import config from keystone import exception from keystone.openstack.common import timeutils from keystone import tests from keystone.tests import default_fixtures -from six import moves - +from keystone.tests import test_utils +from keystone.token import provider CONF = config.CONF DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id @@ -2902,27 +2904,49 @@ class TokenTests(object): self.assertRaises(exception.TokenNotFound, self.token_api.delete_token, token_id) - def create_token_sample_data(self, tenant_id=None, trust_id=None, - user_id="testuserid"): - token_id = self._create_token_id() + def create_token_sample_data(self, token_id=None, tenant_id=None, + trust_id=None, user_id=None, expires=None): + if token_id is None: + token_id = self._create_token_id() + if user_id is None: + user_id = 'testuserid' + # FIXME(morganfainberg): These tokens look nothing like "Real" tokens. + # This should be updated when token_api is updated to merge in the + # issue_token logic from the providers (token issuance should be a + # pipeline). The fix should be in implementation of blueprint: + # token-issuance-pipeline data = {'id': token_id, 'a': 'b', 'user': {'id': user_id}} if tenant_id is not None: data['tenant'] = {'id': tenant_id, 'name': tenant_id} if tenant_id is NULL_OBJECT: data['tenant'] = None + if expires is not None: + data['expires'] = expires if trust_id is not None: data['trust_id'] = trust_id + data.setdefault('access', {}).setdefault('trust', {}) + # Testuserid2 is used here since a trustee will be different in + # the cases of impersonation and therefore should not match the + # token's user_id. + data['access']['trust']['trustee_user_id'] = 'testuserid2' + data['token_version'] = provider.V2 + # Issue token stores a copy of all token data at token['token_data']. + # This emulates that assumption as part of the test. + data['token_data'] = copy.deepcopy(data) new_token = self.token_api.create_token(token_id, data) - return new_token['id'] + return new_token['id'], data def test_delete_tokens(self): tokens = self.token_api._list_tokens('testuserid') self.assertEqual(len(tokens), 0) - token_id1 = self.create_token_sample_data('testtenantid') - token_id2 = self.create_token_sample_data('testtenantid') - token_id3 = self.create_token_sample_data(tenant_id='testtenantid', - user_id="testuserid1") + token_id1, data = self.create_token_sample_data( + tenant_id='testtenantid') + token_id2, data = self.create_token_sample_data( + tenant_id='testtenantid') + token_id3, data = self.create_token_sample_data( + tenant_id='testtenantid', + user_id='testuserid1') tokens = self.token_api._list_tokens('testuserid') self.assertEqual(len(tokens), 2) self.assertIn(token_id2, tokens) @@ -2941,11 +2965,13 @@ class TokenTests(object): def test_delete_tokens_trust(self): tokens = self.token_api._list_tokens(user_id='testuserid') self.assertEqual(len(tokens), 0) - token_id1 = self.create_token_sample_data(tenant_id='testtenantid', - trust_id='testtrustid') - token_id2 = self.create_token_sample_data(tenant_id='testtenantid', - user_id="testuserid1", - trust_id="testtrustid1") + token_id1, data = self.create_token_sample_data( + tenant_id='testtenantid', + trust_id='testtrustid') + token_id2, data = self.create_token_sample_data( + tenant_id='testtenantid', + user_id='testuserid1', + trust_id='testtrustid1') tokens = self.token_api._list_tokens('testuserid') self.assertEqual(len(tokens), 1) self.assertIn(token_id1, tokens) @@ -2959,11 +2985,11 @@ class TokenTests(object): def _test_token_list(self, token_list_fn): tokens = token_list_fn('testuserid') self.assertEqual(len(tokens), 0) - token_id1 = self.create_token_sample_data() + token_id1, data = self.create_token_sample_data() tokens = token_list_fn('testuserid') self.assertEqual(len(tokens), 1) self.assertIn(token_id1, tokens) - token_id2 = self.create_token_sample_data() + token_id2, data = self.create_token_sample_data() tokens = token_list_fn('testuserid') self.assertEqual(len(tokens), 2) self.assertIn(token_id2, tokens) @@ -2980,10 +3006,10 @@ class TokenTests(object): # tenant-specific tokens tenant1 = uuid.uuid4().hex tenant2 = uuid.uuid4().hex - token_id3 = self.create_token_sample_data(tenant_id=tenant1) - token_id4 = self.create_token_sample_data(tenant_id=tenant2) + token_id3, data = self.create_token_sample_data(tenant_id=tenant1) + token_id4, data = self.create_token_sample_data(tenant_id=tenant2) # test for existing but empty tenant (LP:1078497) - token_id5 = self.create_token_sample_data(tenant_id=NULL_OBJECT) + token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT) tokens = token_list_fn('testuserid') self.assertEqual(len(tokens), 3) self.assertNotIn(token_id1, tokens) @@ -3008,7 +3034,7 @@ class TokenTests(object): def test_token_list_trust(self): trust_id = uuid.uuid4().hex - token_id5 = self.create_token_sample_data(trust_id=trust_id) + token_id5, data = self.create_token_sample_data(trust_id=trust_id) tokens = self.token_api._list_tokens('testuserid', trust_id=trust_id) self.assertEqual(len(tokens), 1) self.assertIn(token_id5, tokens) @@ -3174,6 +3200,45 @@ class TokenTests(object): for t in self.token_api.list_revoked_tokens(): self.assertIn('expires', t) + def test_create_unicode_token_id(self): + token_id = unicode(self._create_token_id()) + self.create_token_sample_data(token_id=token_id) + self.token_api.get_token(token_id) + + def test_create_unicode_user_id(self): + user_id = unicode(uuid.uuid4().hex) + token_id, data = self.create_token_sample_data(user_id=user_id) + self.token_api.get_token(token_id) + + def test_list_tokens_unicode_user_id(self): + user_id = unicode(uuid.uuid4().hex) + self.token_api.list_tokens(user_id) + + def test_token_expire_timezone(self): + + @test_utils.timezone + def _create_token(expire_time): + token_id = uuid.uuid4().hex + user_id = unicode(uuid.uuid4().hex) + return self.create_token_sample_data(token_id=token_id, + user_id=user_id, + expires=expire_time) + + for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: + test_utils.TZ = 'UTC' + d + expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) + token_id, data_in = _create_token(expire_time) + data_get = self.token_api.get_token(token_id) + + self.assertEqual(data_in['id'], data_get['id'], + 'TZ=%s' % test_utils.TZ) + + expire_time_expired = ( + timeutils.utcnow() + datetime.timedelta(minutes=-1)) + token_id, data_in = _create_token(expire_time_expired) + self.assertRaises(exception.TokenNotFound, + self.token_api.get_token, data_in['id']) + class TokenCacheInvalidation(object): def _create_test_data(self): |