diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/client_fixtures.py | 72 | ||||
-rw-r--r-- | tests/fakes.py | 73 | ||||
-rw-r--r-- | tests/test_auth_token_middleware.py | 111 | ||||
-rw-r--r-- | tests/test_client.py | 40 | ||||
-rw-r--r-- | tests/test_https.py | 24 | ||||
-rw-r--r-- | tests/test_keyring.py | 65 | ||||
-rw-r--r-- | tests/test_memcache_crypt.py | 96 | ||||
-rw-r--r-- | tests/test_service_catalog.py | 144 | ||||
-rw-r--r-- | tests/utils.py | 2 | ||||
-rw-r--r-- | tests/v2_0/client_fixtures.py | 163 | ||||
-rw-r--r-- | tests/v2_0/test_access.py (renamed from tests/test_access.py) | 12 | ||||
-rw-r--r-- | tests/v2_0/test_client.py | 84 | ||||
-rw-r--r-- | tests/v2_0/test_service_catalog.py | 50 | ||||
-rw-r--r-- | tests/v3/client_fixtures.py | 234 | ||||
-rw-r--r-- | tests/v3/test_access.py | 106 | ||||
-rw-r--r-- | tests/v3/test_auth.py | 408 | ||||
-rw-r--r-- | tests/v3/test_client.py | 121 | ||||
-rw-r--r-- | tests/v3/test_service_catalog.py | 55 | ||||
-rw-r--r-- | tests/v3/utils.py | 83 |
19 files changed, 1501 insertions, 442 deletions
diff --git a/tests/client_fixtures.py b/tests/client_fixtures.py deleted file mode 100644 index f0b5137..0000000 --- a/tests/client_fixtures.py +++ /dev/null @@ -1,72 +0,0 @@ -UNSCOPED_TOKEN = { - u'access': {u'serviceCatalog': {}, - u'token': {u'expires': u'2012-10-03T16:58:01Z', - u'id': u'3e2813b7ba0b4006840c3825860b86ed'}, - u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a', - u'name': u'exampleuser', - u'roles': [], - u'roles_links': [], - u'username': u'exampleuser'} - } -} - -PROJECT_SCOPED_TOKEN = { - u'access': { - u'serviceCatalog': [{ - u'endpoints': [{ - u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58', - u'internalURL': - u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58', - u'publicURL': - u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58', - u'region': u'RegionOne' - }], - u'endpoints_links': [], - u'name': u'Volume Service', - u'type': u'volume'}, - {u'endpoints': [{ - u'adminURL': u'http://admin:9292/v1', - u'internalURL': u'http://internal:9292/v1', - u'publicURL': u'http://public.com:9292/v1', - u'region': u'RegionOne'}], - u'endpoints_links': [], - u'name': u'Image Service', - u'type': u'image'}, - {u'endpoints': [{ -u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58', -u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58', -u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58', -u'region': u'RegionOne'}], - u'endpoints_links': [], - u'name': u'Compute Service', - u'type': u'compute'}, - {u'endpoints': [{ -u'adminURL': u'http://admin:8773/services/Admin', -u'internalURL': u'http://internal:8773/services/Cloud', -u'publicURL': u'http://public.com:8773/services/Cloud', -u'region': u'RegionOne'}], - u'endpoints_links': [], - u'name': u'EC2 Service', - u'type': u'ec2'}, - {u'endpoints': [{ -u'adminURL': u'http://admin:35357/v2.0', -u'internalURL': u'http://internal:5000/v2.0', -u'publicURL': u'http://public.com:5000/v2.0', -u'region': u'RegionOne'}], - u'endpoints_links': [], - u'name': u'Identity Service', - u'type': u'identity'}], - u'token': {u'expires': u'2012-10-03T16:53:36Z', - u'id': u'04c7d5ffaeef485f9dc69c06db285bdb', - u'tenant': {u'description': u'', - u'enabled': True, - u'id': u'225da22d3ce34b15877ea70b2a575f58', - u'name': u'exampleproject'}}, - u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a', - u'name': u'exampleuser', - u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74', - u'name': u'Member'}], - u'roles_links': [], - u'username': u'exampleuser'} - } -} diff --git a/tests/fakes.py b/tests/fakes.py index feafd52..de7ecff 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -65,37 +65,46 @@ class FakeClient(object): def authenticate(self, cl_obj): cl_obj.user_id = '1' cl_obj.auth_user_id = '1' - cl_obj.tenant_id = '1' + cl_obj.project_id = '1' cl_obj.auth_tenant_id = '1' - cl_obj.auth_ref = access.AccessInfo({ - "token": { - "expires": "2012-02-05T00:00:00", - "id": "887665443383838", - "tenant": { + cl_obj.auth_ref = access.AccessInfo.factory(None, { + "access": { + "token": { + "expires": "2012-02-05T00:00:00", + "id": "887665443383838", + "tenant": { + "id": "1", + "name": "customer-x" + } + }, + "serviceCatalog": [{ + "endpoints": [{ + "adminURL": "http://swift.admin-nets.local:8080/", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:8080/v1/AUTH_1", + "publicURL": + "http://swift.publicinternets.com/v1/AUTH_1" + }], + "type": "object-store", + "name": "swift" + }, { + "endpoints": [{ + "adminURL": "http://cdn.admin-nets.local/v1.1/1", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:7777/v1.1/1", + "publicURL": "http://cdn.publicinternets.com/v1.1/1" + }], + "type": "object-store", + "name": "cdn" + }], + "user": { "id": "1", - "name": "customer-x"}}, - "serviceCatalog": [ - {"endpoints": [ - {"adminURL": "http://swift.admin-nets.local:8080/", - "region": "RegionOne", - "internalURL": "http://127.0.0.1:8080/v1/AUTH_1", - "publicURL": - "http://swift.publicinternets.com/v1/AUTH_1"}], - "type": "object-store", - "name": "swift"}, - {"endpoints": [ - {"adminURL": "http://cdn.admin-nets.local/v1.1/1", - "region": "RegionOne", - "internalURL": "http://127.0.0.1:7777/v1.1/1", - "publicURL": "http://cdn.publicinternets.com/v1.1/1"}], - "type": "object-store", - "name": "cdn"}], - "user": { - "id": "1", - "roles": [ - {"tenantId": "1", - "id": "3", - "name": "Member"}], - "name": "joeuser"} - } - ) + "roles": [{ + "tenantId": "1", + "id": "3", + "name": "Member" + }], + "name": "joeuser" + } + } + }) diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py index 06054d0..f11dfee 100644 --- a/tests/test_auth_token_middleware.py +++ b/tests/test_auth_token_middleware.py @@ -28,7 +28,6 @@ import webob from keystoneclient.common import cms from keystoneclient import utils from keystoneclient.middleware import auth_token -from keystoneclient.middleware import memcache_crypt from keystoneclient.openstack.common import memorycache from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import timeutils @@ -806,7 +805,7 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): 'admin_token': 'admin_token1', 'auth_host': 'keystone.example.com', 'auth_port': 1234, - 'memcache_servers': 'localhost:11211', + 'memcached_servers': 'localhost:11211', } auth_token.AuthProtocol(FakeApp(), conf) @@ -817,7 +816,7 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': 'localhost:11211' + 'memcached_servers': 'localhost:11211' } self.set_middleware(conf=conf) self.middleware._init_cache(env) @@ -1013,9 +1012,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): def _get_cached_token(self, token): token_id = cms.cms_hash_token(token) # NOTE(vish): example tokens are expired so skip the expiration check. - key = self.middleware._get_cache_key(token_id) - cached = self.middleware._cache.get(key) - return self.middleware._unprotect_cache_value(token, cached) + return self.middleware._cache_get(token_id, ignore_expires=True) def test_memcache(self): req = webob.Request.blank('/') @@ -1036,7 +1033,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): token = 'invalid-token' req.headers['X-Auth-Token'] = token self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self._get_cached_token(token), "invalid") + self.assertRaises(auth_token.InvalidUserToken, + self._get_cached_token, token) def test_memcache_set_expired(self): token_cache_time = 10 @@ -1072,7 +1070,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', 'cache': 'swift.cache', - 'memcache_servers': ['localhost:11211'] + 'memcached_servers': ['localhost:11211'] } self.set_middleware(conf=conf) self.middleware._init_cache(env) @@ -1091,98 +1089,47 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'encrypt', 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - encrypted_data = self.middleware._protect_cache_value( - 'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) - self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16]) - self.assertEqual( - TOKEN_RESPONSES[self.token_dict['uuid_token_default']], - self.middleware._unprotect_cache_value('token', encrypted_data)) - # should return None if unable to decrypt - self.assertIsNone( - self.middleware._unprotect_cache_value( - 'token', '{ENCRYPT:AES256}corrupted')) - self.assertIsNone( - self.middleware._unprotect_cache_value('mykey', encrypted_data)) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_sign_cache_data(self): conf = { 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mac', 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - signed_data = self.middleware._protect_cache_value( - 'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']]) - expected = '{MAC:SHA1}' - self.assertEqual( - signed_data[:10], - expected) - self.assertEqual( - TOKEN_RESPONSES[self.token_dict['uuid_token_default']], - self.middleware._unprotect_cache_value('mykey', signed_data)) - # should return None on corrupted data - self.assertIsNone( - self.middleware._unprotect_cache_value('mykey', - '{MAC:SHA1}corrupted')) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_no_memcache_protection(self): conf = { 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_secret_key': 'mysecret' } self.set_middleware(conf=conf) - data = self.middleware._protect_cache_value('mykey', - 'This is a test!') - self.assertEqual(data, 'This is a test!') - self.assertEqual( - 'This is a test!', - self.middleware._unprotect_cache_value('mykey', data)) - - def test_get_cache_key(self): - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_secret_key': 'mysecret' - } - self.set_middleware(conf=conf) - self.assertEqual( - 'tokens/mytoken', - self.middleware._get_cache_key('mytoken')) - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_security_strategy': 'mac', - 'memcache_secret_key': 'mysecret' - } - self.set_middleware(conf=conf) - expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret') - self.assertEqual(self.middleware._get_cache_key('mytoken'), expected) - conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], - 'memcache_security_strategy': 'Encrypt', - 'memcache_secret_key': 'abc!' - } - self.set_middleware(conf=conf) - expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!') - self.assertEqual(self.middleware._get_cache_key('mytoken'), expected) + token = 'my_token' + data = ('this_data', 10e100) + self.middleware._init_cache({}) + self.middleware._cache_store(token, data) + self.assertEqual(self.middleware._cache_get(token), data[0]) def test_assert_valid_memcache_protection_config(self): # test missing memcache_secret_key @@ -1190,7 +1137,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'Encrypt' } self.assertRaises(Exception, self.set_middleware, conf) @@ -1199,7 +1146,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'whatever' } self.assertRaises(Exception, self.set_middleware, conf) @@ -1208,7 +1155,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mac' } self.assertRaises(Exception, self.set_middleware, conf) @@ -1216,7 +1163,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'Encrypt', 'memcache_secret_key': '' } @@ -1225,7 +1172,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): 'auth_host': 'keystone.example.com', 'auth_port': 1234, 'auth_admin_prefix': '/testadmin', - 'memcache_servers': ['localhost:11211'], + 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mAc', 'memcache_secret_key': '' } diff --git a/tests/test_client.py b/tests/test_client.py deleted file mode 100644 index d24d72a..0000000 --- a/tests/test_client.py +++ /dev/null @@ -1,40 +0,0 @@ -import json -import mock - -import requests - -from keystoneclient.v2_0 import client -from tests import client_fixtures -from tests import utils - - -fake_response = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN) -}) -mock_request = mock.Mock(return_value=(fake_response)) - - -class KeystoneclientTest(utils.TestCase): - - def test_scoped_init(self): - with mock.patch.object(requests, "request", mock_request): - cl = client.Client(username='exampleuser', - password='password', - auth_url='http://somewhere/') - self.assertIsNotNone(cl.auth_ref) - self.assertTrue(cl.auth_ref.scoped) - - def test_auth_ref_load(self): - with mock.patch.object(requests, "request", mock_request): - cl = client.Client(username='exampleuser', - password='password', - auth_url='http://somewhere/') - cache = json.dumps(cl.auth_ref) - new_client = client.Client(auth_ref=json.loads(cache)) - self.assertIsNotNone(new_client.auth_ref) - self.assertTrue(new_client.auth_ref.scoped) - self.assertEquals(new_client.username, 'exampleuser') - self.assertIsNone(new_client.password) - self.assertEqual(new_client.management_url, - 'http://admin:35357/v2.0') diff --git a/tests/test_https.py b/tests/test_https.py index bef1db4..4070df9 100644 --- a/tests/test_https.py +++ b/tests/test_https.py @@ -68,3 +68,27 @@ class ClientTest(utils.TestCase): headers=headers, data='[1, 2, 3]', **kwargs) + + def test_post_auth(self): + with mock.patch.object(requests, "request", MOCK_REQUEST): + cl = client.HTTPClient( + username="username", password="password", tenant_id="tenant", + auth_url="auth_test", cacert="ca.pem", key="key.pem", + cert="cert.pem") + cl.management_url = "https://127.0.0.1:5000" + cl.auth_token = "token" + cl.post("/hi", body=[1, 2, 3]) + headers = { + "X-Auth-Token": "token", + "Content-Type": "application/json", + "User-Agent": cl.USER_AGENT + } + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['cert'] = ('cert.pem', 'key.pem') + kwargs['verify'] = 'ca.pem' + MOCK_REQUEST.assert_called_with( + "POST", + "https://127.0.0.1:5000/hi", + headers=headers, + data='[1, 2, 3]', + **kwargs) diff --git a/tests/test_keyring.py b/tests/test_keyring.py index 00fb0c2..f3eba82 100644 --- a/tests/test_keyring.py +++ b/tests/test_keyring.py @@ -3,7 +3,7 @@ import datetime from keystoneclient import access from keystoneclient import client from keystoneclient.openstack.common import timeutils -from tests import client_fixtures +from tests.v2_0 import client_fixtures from tests import utils try: @@ -60,11 +60,12 @@ class KeyringTest(utils.TestCase): cl = client.HTTPClient(username=USERNAME, password=PASSWORD, tenant_id=TENANT_ID, auth_url=AUTH_URL) - (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL, - USERNAME, - TENANT, - TENANT_ID, - TOKEN) + (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring( + auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) self.assertIsNone(keyring_key) self.assertIsNone(auth_ref) @@ -73,49 +74,57 @@ class KeyringTest(utils.TestCase): cl = client.HTTPClient(username=USERNAME, password=PASSWORD, tenant_id=TENANT_ID, auth_url=AUTH_URL) - keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME, - TENANT, TENANT_ID, - TOKEN) + keyring_key = cl._build_keyring_key(auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) self.assertEqual(keyring_key, '%s/%s/%s/%s/%s' % - (AUTH_URL, USERNAME, TENANT, TENANT_ID, TOKEN)) + (AUTH_URL, TENANT_ID, TENANT, TOKEN, USERNAME)) def test_set_and_get_keyring_expired(self): cl = client.HTTPClient(username=USERNAME, password=PASSWORD, tenant_id=TENANT_ID, auth_url=AUTH_URL, use_keyring=True) - keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME, - TENANT, TENANT_ID, - TOKEN) + keyring_key = cl._build_keyring_key(auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) - cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access']) + cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN) expired = timeutils.utcnow() - datetime.timedelta(minutes=30) cl.auth_ref['token']['expires'] = timeutils.isotime(expired) cl.store_auth_ref_into_keyring(keyring_key) - (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL, - USERNAME, - TENANT, - TENANT_ID, - TOKEN) + (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring( + auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) self.assertIsNone(auth_ref) def test_set_and_get_keyring(self): cl = client.HTTPClient(username=USERNAME, password=PASSWORD, tenant_id=TENANT_ID, auth_url=AUTH_URL, use_keyring=True) - keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME, - TENANT, TENANT_ID, - TOKEN) + keyring_key = cl._build_keyring_key(auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) - cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access']) + cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN) expires = timeutils.utcnow() + datetime.timedelta(minutes=30) cl.auth_ref['token']['expires'] = timeutils.isotime(expires) cl.store_auth_ref_into_keyring(keyring_key) - (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL, - USERNAME, - TENANT, - TENANT_ID, - TOKEN) + (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring( + auth_url=AUTH_URL, + username=USERNAME, + tenant_name=TENANT, + tenant_id=TENANT_ID, + token=TOKEN) self.assertEqual(auth_ref.auth_token, TOKEN) self.assertEqual(auth_ref.username, USERNAME) diff --git a/tests/test_memcache_crypt.py b/tests/test_memcache_crypt.py index b2281d9..524cd21 100644 --- a/tests/test_memcache_crypt.py +++ b/tests/test_memcache_crypt.py @@ -4,48 +4,66 @@ from keystoneclient.middleware import memcache_crypt class MemcacheCryptPositiveTests(testtools.TestCase): - def test_generate_aes_key(self): - self.assertEqual( - len(memcache_crypt.generate_aes_key('Gimme Da Key', 'hush')), 32) + def _setup_keys(self, strategy): + return memcache_crypt.derive_keys('token', 'secret', strategy) - def test_compute_mac(self): - self.assertEqual( - memcache_crypt.compute_mac('mykey', 'This is a test!'), - 'tREu41yR5tEgeBWIuv9ag4AeKA8=') + def test_constant_time_compare(self): + # make sure it works as a compare, the "constant time" aspect + # isn't appropriate to test in unittests + ctc = memcache_crypt.constant_time_compare + self.assertTrue(ctc('abcd', 'abcd')) + self.assertTrue(ctc('', '')) + self.assertFalse(ctc('abcd', 'efgh')) + self.assertFalse(ctc('abc', 'abcd')) + self.assertFalse(ctc('abc', 'abc\x00')) + self.assertFalse(ctc('', 'abc')) + + def test_derive_keys(self): + keys = memcache_crypt.derive_keys('token', 'secret', 'strategy') + self.assertEqual(len(keys['ENCRYPTION']), + len(keys['CACHE_KEY'])) + self.assertEqual(len(keys['CACHE_KEY']), + len(keys['MAC'])) + self.assertNotEqual(keys['ENCRYPTION'], + keys['MAC']) + self.assertIn('strategy', keys.keys()) + + def test_key_strategy_diff(self): + k1 = self._setup_keys('MAC') + k2 = self._setup_keys('ENCRYPT') + self.assertNotEqual(k1, k2) def test_sign_data(self): - expected = '{MAC:SHA1}eyJtYWMiOiAiM0FrQmdPZHRybGo1RFFESHA1eUxqcDVq' +\ - 'Si9BPSIsICJzZXJpYWxpemVkX2RhdGEiOiAiXCJUaGlzIGlzIGEgdG' +\ - 'VzdCFcIiJ9' - self.assertEqual( - memcache_crypt.sign_data('mykey', 'This is a test!'), - expected) - - def test_verify_signed_data(self): - signed = memcache_crypt.sign_data('mykey', 'Testz') - self.assertEqual( - memcache_crypt.verify_signed_data('mykey', signed), - 'Testz') - self.assertEqual( - memcache_crypt.verify_signed_data('aasSFWE13WER', 'not MACed'), - 'not MACed') - - def test_encrypt_data(self): - expected = '{ENCRYPT:AES256}' - self.assertEqual( - memcache_crypt.encrypt_data('mykey', 'mysecret', - 'This is a test!')[:16], - expected) - - def test_decrypt_data(self): - encrypted = memcache_crypt.encrypt_data('mykey', 'mysecret', 'Testz') - self.assertEqual( - memcache_crypt.decrypt_data('mykey', 'mysecret', encrypted), - 'Testz') - self.assertEqual( - memcache_crypt.decrypt_data('mykey', 'mysecret', - 'Not Encrypted!'), - 'Not Encrypted!') + keys = self._setup_keys('MAC') + sig = memcache_crypt.sign_data(keys['MAC'], 'data') + self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64) + + def test_encryption(self): + keys = self._setup_keys('ENCRYPT') + # what you put in is what you get out + for data in ['data', '1234567890123456', '\x00\xFF' * 13 + ] + [chr(x % 256) * x for x in range(768)]: + crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data) + decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt) + self.assertEqual(data, decrypt) + self.assertRaises(memcache_crypt.DecryptError, + memcache_crypt.decrypt_data, + keys['ENCRYPTION'], crypt[:-1]) + + def test_protect_wrappers(self): + data = 'My Pretty Little Data' + for strategy in ['MAC', 'ENCRYPT']: + keys = self._setup_keys(strategy) + protected = memcache_crypt.protect_data(keys, data) + self.assertNotEqual(protected, data) + if strategy == 'ENCRYPT': + self.assertNotIn(data, protected) + unprotected = memcache_crypt.unprotect_data(keys, protected) + self.assertEqual(data, unprotected) + self.assertRaises(memcache_crypt.InvalidMacError, + memcache_crypt.unprotect_data, + keys, protected[:-1]) + self.assertIsNone(memcache_crypt.unprotect_data(keys, None)) def test_no_pycrypt(self): aes = memcache_crypt.AES diff --git a/tests/test_service_catalog.py b/tests/test_service_catalog.py deleted file mode 100644 index 09ac19c..0000000 --- a/tests/test_service_catalog.py +++ /dev/null @@ -1,144 +0,0 @@ -from keystoneclient import exceptions -from keystoneclient import service_catalog -from tests import utils - - -# Taken directly from keystone/content/common/samples/auth.json -# Do not edit this structure. Instead, grab the latest from there. - -SERVICE_CATALOG = { - "access": { - "token": { - "id": "ab48a9efdfedb23ty3494", - "expires": "2010-11-01T03:32:15-05:00", - "tenant": { - "id": "345", - "name": "My Project" - } - }, - "user": { - "id": "123", - "name": "jqsmith", - "roles": [ - {"id": "234", - "name": "compute:admin"}, - {"id": "235", - "name": "object-store:admin", - "tenantId": "1"} - ], - "roles_links": [] - }, - "serviceCatalog": [{ - "name": "Cloud Servers", - "type": "compute", - "endpoints": [ - {"tenantId": "1", - "publicURL": "https://compute.north.host/v1/1234", - "internalURL": "https://compute.north.host/v1/1234", - "region": "North", - "versionId": "1.0", - "versionInfo": "https://compute.north.host/v1.0/", - "versionList": "https://compute.north.host/"}, - {"tenantId": "2", - "publicURL": "https://compute.north.host/v1.1/3456", - "internalURL": "https://compute.north.host/v1.1/3456", - "region": "North", - "versionId": "1.1", - "versionInfo": "https://compute.north.host/v1.1/", - "versionList": "https://compute.north.host/"} - ], - "endpoints_links": [] - }, - { - "name": "Cloud Files", - "type": "object-store", - "endpoints": [ - {"tenantId": "11", - "publicURL": "https://compute.north.host/v1/blah-blah", - "internalURL": "https://compute.north.host/v1/blah-blah", - "region": "South", - "versionId": "1.0", - "versionInfo": "uri", - "versionList": "uri"}, - {"tenantId": "2", - "publicURL": "https://compute.north.host/v1.1/blah-blah", - "internalURL": - "https://compute.north.host/v1.1/blah-blah", - "region": "South", - "versionId": "1.1", - "versionInfo": "https://compute.north.host/v1.1/", - "versionList": "https://compute.north.host/"} - ], - "endpoints_links": [{ - "rel": "next", - "href": "https://identity.north.host/v2.0/" - "endpoints?marker=2" - }] - }, - { - "name": "Image Servers", - "type": "image", - "endpoints": [ - {"publicURL": "https://image.north.host/v1/", - "internalURL": "https://image-internal.north.host/v1/", - "region": "North"}, - {"publicURL": "https://image.south.host/v1/", - "internalURL": "https://image-internal.south.host/v1/", - "region": "South"} - ], - "endpoints_links": [] - }, - ], - "serviceCatalog_links": [{ - "rel": "next", - "href": ("https://identity.host/v2.0/endpoints?" - "session=2hfh8Ar&marker=2") - }] - } -} - - -class ServiceCatalogTest(utils.TestCase): - def test_building_a_service_catalog(self): - sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access']) - - self.assertEquals(sc.url_for(service_type='compute'), - "https://compute.north.host/v1/1234") - self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'), - "https://compute.north.host/v1/1234") - self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'), - "https://compute.north.host/v1.1/3456") - - self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region", - "South", service_type='compute') - - def test_service_catalog_endpoints(self): - sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access']) - public_ep = sc.get_endpoints(service_type='compute', - endpoint_type='publicURL') - self.assertEquals(public_ep['compute'][1]['tenantId'], '2') - self.assertEquals(public_ep['compute'][1]['versionId'], '1.1') - self.assertEquals(public_ep['compute'][1]['internalURL'], - "https://compute.north.host/v1.1/3456") - - def test_service_catalog_regions(self): - sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'], - region_name="North") - url = sc.url_for(service_type='image', endpoint_type='publicURL') - self.assertEquals(url, "https://image.north.host/v1/") - sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'], - region_name="South") - url = sc.url_for(service_type='image', endpoint_type='internalURL') - self.assertEquals(url, "https://image-internal.south.host/v1/") - - def test_token(self): - sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access']) - - self.assertEquals(sc.get_token(), { - 'id': 'ab48a9efdfedb23ty3494', - 'tenant_id': '345', - 'user_id': '123', - 'expires': '2010-11-01T03:32:15-05:00'}) - self.assertEquals(sc.catalog['token']['expires'], - "2010-11-01T03:32:15-05:00") - self.assertEquals(sc.catalog['token']['tenant']['id'], '345') diff --git a/tests/utils.py b/tests/utils.py index 9d9bf8d..264aceb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,6 +8,8 @@ from keystoneclient.v2_0 import client class TestCase(testtools.TestCase): + TEST_DOMAIN_ID = '1' + TEST_DOMAIN_NAME = 'aDomain' TEST_TENANT_ID = '1' TEST_TENANT_NAME = 'aTenant' TEST_TOKEN = 'aToken' diff --git a/tests/v2_0/client_fixtures.py b/tests/v2_0/client_fixtures.py new file mode 100644 index 0000000..70bb43f --- /dev/null +++ b/tests/v2_0/client_fixtures.py @@ -0,0 +1,163 @@ +UNSCOPED_TOKEN = { + u'access': {u'serviceCatalog': {}, + u'token': {u'expires': u'2012-10-03T16:58:01Z', + u'id': u'3e2813b7ba0b4006840c3825860b86ed'}, + u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a', + u'name': u'exampleuser', + u'roles': [], + u'roles_links': [], + u'username': u'exampleuser'} + } +} + +PROJECT_SCOPED_TOKEN = { + u'access': { + u'serviceCatalog': [{ + u'endpoints': [{ + u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'internalURL': + u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'publicURL': + u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne' + }], + u'endpoints_links': [], + u'name': u'Volume Service', + u'type': u'volume'}, + {u'endpoints': [{ + u'adminURL': u'http://admin:9292/v1', + u'internalURL': u'http://internal:9292/v1', + u'publicURL': u'http://public.com:9292/v1', + u'region': u'RegionOne'}], + u'endpoints_links': [], + u'name': u'Image Service', + u'type': u'image'}, + {u'endpoints': [{ +u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58', +u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58', +u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58', +u'region': u'RegionOne'}], + u'endpoints_links': [], + u'name': u'Compute Service', + u'type': u'compute'}, + {u'endpoints': [{ +u'adminURL': u'http://admin:8773/services/Admin', +u'internalURL': u'http://internal:8773/services/Cloud', +u'publicURL': u'http://public.com:8773/services/Cloud', +u'region': u'RegionOne'}], + u'endpoints_links': [], + u'name': u'EC2 Service', + u'type': u'ec2'}, + {u'endpoints': [{ +u'adminURL': u'http://admin:35357/v2.0', +u'internalURL': u'http://internal:5000/v2.0', +u'publicURL': u'http://public.com:5000/v2.0', +u'region': u'RegionOne'}], + u'endpoints_links': [], + u'name': u'Identity Service', + u'type': u'identity'}], + u'token': {u'expires': u'2012-10-03T16:53:36Z', + u'id': u'04c7d5ffaeef485f9dc69c06db285bdb', + u'tenant': {u'description': u'', + u'enabled': True, + u'id': u'225da22d3ce34b15877ea70b2a575f58', + u'name': u'exampleproject'}}, + u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a', + u'name': u'exampleuser', + u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74', + u'name': u'Member'}], + u'roles_links': [], + u'username': u'exampleuser'} + } +} + +AUTH_RESPONSE_BODY = { + u'access': { + u'token': { + u'id': u'ab48a9efdfedb23ty3494', + u'expires': u'2010-11-01T03:32:15-05:00', + u'tenant': { + u'id': u'345', + u'name': u'My Project' + } + }, + u'user': { + u'id': u'123', + u'name': u'jqsmith', + u'roles': [{ + u'id': u'234', + u'name': u'compute:admin' + }, { + u'id': u'235', + u'name': u'object-store:admin', + u'tenantId': u'1' + }], + u'roles_links': [] + }, + u'serviceCatalog': [{ + u'name': u'Cloud Servers', + u'type': u'compute', + u'endpoints': [{ + u'tenantId': u'1', + u'publicURL': u'https://compute.north.host/v1/1234', + u'internalURL': u'https://compute.north.host/v1/1234', + u'region': u'North', + u'versionId': u'1.0', + u'versionInfo': u'https://compute.north.host/v1.0/', + u'versionList': u'https://compute.north.host/u' + }, { + u'tenantId': u'2', + u'publicURL': u'https://compute.north.host/v1.1/3456', + u'internalURL': u'https://compute.north.host/v1.1/3456', + u'region': u'North', + u'versionId': u'1.1', + u'versionInfo': u'https://compute.north.host/v1.1/', + u'versionList': u'https://compute.north.host/u' + }], + u'endpoints_links': [] + }, { + u'name': u'Cloud Files', + u'type': u'object-store', + u'endpoints': [{ + u'tenantId': u'11', + u'publicURL': u'https://swift.north.host/v1/blah', + u'internalURL': u'https://swift.north.host/v1/blah', + u'region': u'South', + u'versionId': u'1.0', + u'versionInfo': u'uri', + u'versionList': u'uri' + }, { + u'tenantId': u'2', + u'publicURL': u'https://swift.north.host/v1.1/blah', + u'internalURL': u'https://compute.north.host/v1.1/blah', + u'region': u'South', + u'versionId': u'1.1', + u'versionInfo': u'https://swift.north.host/v1.1/', + u'versionList': u'https://swift.north.host/' + }], + u'endpoints_links': [{ + u'rel': u'next', + u'href': u'https://identity.north.host/v2.0/' + u'endpoints?marker=2' + }] + }, { + u'name': u'Image Servers', + u'type': u'image', + u'endpoints': [{ + u'publicURL': u'https://image.north.host/v1/', + u'internalURL': u'https://image-internal.north.host/v1/', + u'region': u'North' + }, { + u'publicURL': u'https://image.south.host/v1/', + u'internalURL': u'https://image-internal.south.host/v1/', + u'region': u'South' + }], + u'endpoints_links': [] + }], + u'serviceCatalog_links': [{ + u'rel': u'next', + u'href': (u'https://identity.host/v2.0/endpoints?' + u'session=2hfh8Ar&marker=2') + }] + } +} diff --git a/tests/test_access.py b/tests/v2_0/test_access.py index 31f91d6..c1bb4f7 100644 --- a/tests/test_access.py +++ b/tests/v2_0/test_access.py @@ -3,7 +3,7 @@ import datetime from keystoneclient import access from keystoneclient.openstack.common import timeutils from tests import utils -from tests import client_fixtures +from tests.v2_0 import client_fixtures UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN @@ -11,7 +11,7 @@ PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN class AccessInfoTest(utils.TestCase): def test_building_unscoped_accessinfo(self): - auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access']) + auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN) self.assertTrue(auth_ref) self.assertIn('token', auth_ref) @@ -30,6 +30,8 @@ class AccessInfoTest(utils.TestCase): self.assertEquals(auth_ref.management_url, None) self.assertFalse(auth_ref.scoped) + self.assertFalse(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) self.assertEquals(auth_ref.expires, timeutils.parse_isotime( UNSCOPED_TOKEN['access']['token']['expires'])) @@ -37,13 +39,13 @@ class AccessInfoTest(utils.TestCase): def test_will_expire_soon(self): expires = timeutils.utcnow() + datetime.timedelta(minutes=5) UNSCOPED_TOKEN['access']['token']['expires'] = expires.isoformat() - auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access']) + auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN) self.assertFalse(auth_ref.will_expire_soon(stale_duration=120)) self.assertTrue(auth_ref.will_expire_soon(stale_duration=300)) self.assertFalse(auth_ref.will_expire_soon()) def test_building_scoped_accessinfo(self): - auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access']) + auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN) self.assertTrue(auth_ref) self.assertIn('token', auth_ref) @@ -68,3 +70,5 @@ class AccessInfoTest(utils.TestCase): ('http://admin:35357/v2.0',)) self.assertTrue(auth_ref.scoped) + self.assertTrue(auth_ref.project_scoped) + self.assertFalse(auth_ref.domain_scoped) diff --git a/tests/v2_0/test_client.py b/tests/v2_0/test_client.py new file mode 100644 index 0000000..b1e485f --- /dev/null +++ b/tests/v2_0/test_client.py @@ -0,0 +1,84 @@ +import json +import mock + +import requests + +from keystoneclient.v2_0 import client +from tests import utils +from tests.v2_0 import client_fixtures + + +class KeystoneClientTest(utils.TestCase): + def setUp(self): + super(KeystoneClientTest, self).setUp() + + scoped_fake_resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN) + }) + self.scoped_mock_req = mock.Mock(return_value=scoped_fake_resp) + + unscoped_fake_resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(client_fixtures.UNSCOPED_TOKEN) + }) + self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp) + + def test_unscoped_init(self): + with mock.patch.object(requests, "request", self.unscoped_mock_req): + c = client.Client(username='exampleuser', + password='password', + auth_url='http://somewhere/') + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.scoped) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + + def test_scoped_init(self): + with mock.patch.object(requests, "request", self.scoped_mock_req): + c = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url='http://somewhere/') + self.assertIsNotNone(c.auth_ref) + self.assertTrue(c.auth_ref.scoped) + self.assertTrue(c.auth_ref.project_scoped) + self.assertFalse(c.auth_ref.domain_scoped) + + def test_auth_ref_load(self): + with mock.patch.object(requests, "request", self.scoped_mock_req): + cl = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url='http://somewhere/') + cache = json.dumps(cl.auth_ref) + new_client = client.Client(auth_ref=json.loads(cache)) + self.assertIsNotNone(new_client.auth_ref) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v2.0') + + def test_auth_ref_load_with_overridden_arguments(self): + with mock.patch.object(requests, "request", self.scoped_mock_req): + cl = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url='http://somewhere/') + cache = json.dumps(cl.auth_ref) + new_auth_url = "http://new-public:5000/v2.0" + new_client = client.Client(auth_ref=json.loads(cache), + auth_url=new_auth_url) + self.assertIsNotNone(new_client.auth_ref) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertEquals(new_client.auth_url, new_auth_url) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v2.0') diff --git a/tests/v2_0/test_service_catalog.py b/tests/v2_0/test_service_catalog.py new file mode 100644 index 0000000..15838b9 --- /dev/null +++ b/tests/v2_0/test_service_catalog.py @@ -0,0 +1,50 @@ +from keystoneclient import access +from keystoneclient import exceptions + +from tests import utils +from tests.v2_0 import client_fixtures + + +class ServiceCatalogTest(utils.TestCase): + def setUp(self): + super(ServiceCatalogTest, self).setUp() + self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY + + def test_building_a_service_catalog(self): + auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + self.assertEquals(sc.url_for(service_type='compute'), + "https://compute.north.host/v1/1234") + self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'), + "https://compute.north.host/v1/1234") + self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'), + "https://compute.north.host/v1.1/3456") + + self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region", + "South", service_type='compute') + + def test_service_catalog_endpoints(self): + auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + public_ep = sc.get_endpoints(service_type='compute', + endpoint_type='publicURL') + self.assertEquals(public_ep['compute'][1]['tenantId'], '2') + self.assertEquals(public_ep['compute'][1]['versionId'], '1.1') + self.assertEquals(public_ep['compute'][1]['internalURL'], + "https://compute.north.host/v1.1/3456") + + def test_service_catalog_regions(self): + self.AUTH_RESPONSE_BODY['access']['region_name'] = "North" + auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + url = sc.url_for(service_type='image', endpoint_type='publicURL') + self.assertEquals(url, "https://image.north.host/v1/") + + self.AUTH_RESPONSE_BODY['access']['region_name'] = "South" + auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + url = sc.url_for(service_type='image', endpoint_type='internalURL') + self.assertEquals(url, "https://image-internal.south.host/v1/") diff --git a/tests/v3/client_fixtures.py b/tests/v3/client_fixtures.py new file mode 100644 index 0000000..fb72dc9 --- /dev/null +++ b/tests/v3/client_fixtures.py @@ -0,0 +1,234 @@ +UNSCOPED_TOKEN = { + u'token': { + u'methods': [ + u'password' + ], + u'catalog': {}, + u'expires_at': u'2010-11-01T03:32:15-05:00', + u'user': { + u'domain': { + u'id': u'4e6893b7ba0b4006840c3845660b86ed', + u'name': u'exampledomain' + }, + u'id': u'c4da488862bd435c9e6c0275a0d0e49a', + u'name': u'exampleuser', + } + } +} + +DOMAIN_SCOPED_TOKEN = { + u'token': { + u'methods': [ + u'password' + ], + u'catalog': {}, + u'expires_at': u'2010-11-01T03:32:15-05:00', + u'user': { + u'domain': { + u'id': u'4e6893b7ba0b4006840c3845660b86ed', + u'name': u'exampledomain' + }, + u'id': u'c4da488862bd435c9e6c0275a0d0e49a', + u'name': u'exampleuser', + }, + u'domain': { + u'id': u'8e9283b7ba0b1038840c3842058b86ab', + u'name': u'anotherdomain' + }, + } +} + +PROJECT_SCOPED_TOKEN = { + u'token': { + u'methods': [ + u'password' + ], + u'catalog': [{ + u'endpoints': [{ + u'url': + u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'public' + }, { + u'url': + u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'internal' + }, { + u'url': + u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'admin' + }], + u'type': u'volume' + }, { + u'endpoints': [{ + u'url': u'http://public.com:9292/v1', + u'region': u'RegionOne', + u'interface': u'public' + }, { + u'url': u'http://internal:9292/v1', + u'region': u'RegionOne', + u'interface': u'internal' + }, { + u'url': u'http://admin:9292/v1', + u'region': u'RegionOne', + u'interface': u'admin' + }], + u'type': u'image' + }, { + u'endpoints': [{ + u'url': + u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'public' + }, { + u'url': + u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'internal' + }, { + u'url': + u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58', + u'region': u'RegionOne', + u'interface': u'admin' + }], + u'type': u'compute' + }, { + u'endpoints': [{ + u'url': u'http://public.com:8773/services/Cloud', + u'region': u'RegionOne', + u'interface': u'public' + }, { + u'url': u'http://internal:8773/services/Cloud', + u'region': u'RegionOne', + u'interface': u'internal' + }, { + u'url': u'http://admin:8773/services/Admin', + u'region': u'RegionOne', + u'interface': u'admin' + }], + u'type': u'ec2' + }, { + u'endpoints': [{ + u'url': u'http://public.com:5000/v3', + u'region': u'RegionOne', + u'interface': u'public' + }, { + u'url': u'http://internal:5000/v3', + u'region': u'RegionOne', + u'interface': u'internal' + }, { + u'url': u'http://admin:35357/v3', + u'region': u'RegionOne', + u'interface': u'admin' + }], + u'type': u'identity' + }], + u'expires_at': u'2010-11-01T03:32:15-05:00', + u'user': { + u'domain': { + u'id': u'4e6893b7ba0b4006840c3845660b86ed', + u'name': u'exampledomain' + }, + u'id': u'c4da488862bd435c9e6c0275a0d0e49a', + u'name': u'exampleuser', + }, + u'project': { + u'domain': { + u'id': u'4e6893b7ba0b4006840c3845660b86ed', + u'name': u'exampledomain' + }, + u'id': u'225da22d3ce34b15877ea70b2a575f58', + u'name': u'exampleproject', + }, + } +} + +AUTH_RESPONSE_HEADERS = { + u'X-Subject-Token': u'3e2813b7ba0b4006840c3825860b86ed' +} + +AUTH_RESPONSE_BODY = { + u'token': { + u'methods': [ + u'password' + ], + u'expires_at': u'2010-11-01T03:32:15-05:00', + u'project': { + u'domain': { + u'id': u'123', + u'name': u'aDomain' + }, + u'id': u'345', + u'name': u'aTenant' + }, + u'user': { + u'domain': { + u'id': u'1', + u'name': u'aDomain' + }, + u'id': u'567', + u'name': u'test' + }, + u'issued_at': u'2010-10-31T03:32:15-05:00', + u'catalog': [{ + u'endpoints': [{ + u'url': u'https://compute.north.host/novapi/public', + u'region': u'North', + u'interface': u'public' + }, { + u'url': u'https://compute.north.host/novapi/internal', + u'region': u'North', + u'interface': u'internal' + }, { + u'url': u'https://compute.north.host/novapi/admin', + u'region': u'North', + u'interface': u'admin' + }], + u'type': u'compute' + }, { + u'endpoints': [{ + u'url': u'http://swift.north.host/swiftapi/public', + u'region': u'South', + u'interface': u'public' + }, { + u'url': u'http://swift.north.host/swiftapi/internal', + u'region': u'South', + u'interface': u'internal' + }, { + u'url': u'http://swift.north.host/swiftapi/admin', + u'region': u'South', + u'interface': u'admin' + }], + u'type': u'object-store' + }, { + u'endpoints': [{ + u'url': u'http://glance.north.host/glanceapi/public', + u'region': u'North', + u'interface': u'public' + }, { + u'url': u'http://glance.north.host/glanceapi/internal', + u'region': u'North', + u'interface': u'internal' + }, { + u'url': u'http://glance.north.host/glanceapi/admin', + u'region': u'North', + u'interface': u'admin' + }, { + u'url': u'http://glance.south.host/glanceapi/public', + u'region': u'South', + u'interface': u'public' + }, { + u'url': u'http://glance.south.host/glanceapi/internal', + u'region': u'South', + u'interface': u'internal' + }, { + u'url': u'http://glance.south.host/glanceapi/admin', + u'region': u'South', + u'interface': u'admin' + }], + u'type': u'image' + }] + } +} diff --git a/tests/v3/test_access.py b/tests/v3/test_access.py new file mode 100644 index 0000000..050a373 --- /dev/null +++ b/tests/v3/test_access.py @@ -0,0 +1,106 @@ +import datetime + +from keystoneclient import access +from keystoneclient.openstack.common import timeutils +from tests import utils +from tests.v3 import client_fixtures + +TOKEN_RESPONSE = utils.TestResponse({ + "headers": client_fixtures.AUTH_RESPONSE_HEADERS +}) +UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN +DOMAIN_SCOPED_TOKEN = client_fixtures.DOMAIN_SCOPED_TOKEN +PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN + + +class AccessInfoTest(utils.TestCase): + def test_building_unscoped_accessinfo(self): + auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE, + body=UNSCOPED_TOKEN) + + self.assertTrue(auth_ref) + self.assertIn('methods', auth_ref) + self.assertIn('catalog', auth_ref) + self.assertFalse(auth_ref['catalog']) + + self.assertEquals(auth_ref.auth_token, + '3e2813b7ba0b4006840c3825860b86ed') + self.assertEquals(auth_ref.username, 'exampleuser') + self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a') + + self.assertEquals(auth_ref.project_name, None) + self.assertEquals(auth_ref.project_id, None) + + self.assertEquals(auth_ref.auth_url, None) + self.assertEquals(auth_ref.management_url, None) + + self.assertFalse(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) + + self.assertEquals(auth_ref.expires, timeutils.parse_isotime( + UNSCOPED_TOKEN['token']['expires_at'])) + + def test_will_expire_soon(self): + expires = timeutils.utcnow() + datetime.timedelta(minutes=5) + UNSCOPED_TOKEN['token']['expires_at'] = expires.isoformat() + auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE, + body=UNSCOPED_TOKEN) + self.assertFalse(auth_ref.will_expire_soon(stale_duration=120)) + self.assertTrue(auth_ref.will_expire_soon(stale_duration=300)) + self.assertFalse(auth_ref.will_expire_soon()) + + def test_building_domain_scoped_accessinfo(self): + auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE, + body=DOMAIN_SCOPED_TOKEN) + + self.assertTrue(auth_ref) + self.assertIn('methods', auth_ref) + self.assertIn('catalog', auth_ref) + self.assertFalse(auth_ref['catalog']) + + self.assertEquals(auth_ref.auth_token, + '3e2813b7ba0b4006840c3825860b86ed') + self.assertEquals(auth_ref.username, 'exampleuser') + self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a') + + self.assertEquals(auth_ref.domain_name, 'anotherdomain') + self.assertEquals(auth_ref.domain_id, + '8e9283b7ba0b1038840c3842058b86ab') + + self.assertEquals(auth_ref.project_name, None) + self.assertEquals(auth_ref.project_id, None) + + self.assertTrue(auth_ref.domain_scoped) + self.assertFalse(auth_ref.project_scoped) + + def test_building_project_scoped_accessinfo(self): + auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE, + body=PROJECT_SCOPED_TOKEN) + + self.assertTrue(auth_ref) + self.assertIn('methods', auth_ref) + self.assertIn('catalog', auth_ref) + self.assertTrue(auth_ref['catalog']) + + self.assertEquals(auth_ref.auth_token, + '3e2813b7ba0b4006840c3825860b86ed') + self.assertEquals(auth_ref.username, 'exampleuser') + self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a') + + self.assertEquals(auth_ref.domain_name, None) + self.assertEquals(auth_ref.domain_id, None) + + self.assertEquals(auth_ref.project_name, 'exampleproject') + self.assertEquals(auth_ref.project_id, + '225da22d3ce34b15877ea70b2a575f58') + + self.assertEquals(auth_ref.tenant_name, auth_ref.project_name) + self.assertEquals(auth_ref.tenant_id, auth_ref.project_id) + + self.assertEquals(auth_ref.auth_url, + ('http://public.com:5000/v3',)) + self.assertEquals(auth_ref.management_url, + ('http://admin:35357/v3',)) + + self.assertFalse(auth_ref.domain_scoped) + self.assertTrue(auth_ref.project_scoped) diff --git a/tests/v3/test_auth.py b/tests/v3/test_auth.py new file mode 100644 index 0000000..41512f8 --- /dev/null +++ b/tests/v3/test_auth.py @@ -0,0 +1,408 @@ +import copy +import json +import requests + +from keystoneclient import exceptions +from keystoneclient.v3 import client + +from tests.v3 import utils + + +class AuthenticateAgainstKeystoneTests(utils.TestCase): + def setUp(self): + super(AuthenticateAgainstKeystoneTests, self).setUp() + self.TEST_RESPONSE_DICT = { + "token": { + "methods": [ + "token", + "password" + ], + + "expires_at": "2020-01-01T00:00:10.000123Z", + "project": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_TENANT_ID, + "name": self.TEST_TENANT_NAME + }, + "user": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_USER, + "name": self.TEST_USER + }, + "issued_at": "2013-05-29T16:55:21.468960Z", + "catalog": self.TEST_SERVICE_CATALOG + }, + } + self.TEST_REQUEST_BODY = { + "auth": { + "identity": { + "methods": ["password"], + "password": { + "user": { + "domain": { + "name": self.TEST_DOMAIN_NAME + }, + "name": self.TEST_USER, + "password": self.TEST_TOKEN + } + } + }, + "scope": { + "project": { + "id": self.TEST_TENANT_ID + }, + } + } + } + self.TEST_REQUEST_HEADERS = { + 'Content-Type': 'application/json', + 'User-Agent': 'python-keystoneclient' + } + self.TEST_RESPONSE_HEADERS = { + 'X-Subject-Token': self.TEST_TOKEN + } + + def test_authenticate_success(self): + TEST_TOKEN = "abcdef" + self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password']['user']['domain'] + del ident['password']['user']['name'] + ident['password']['user']['id'] = self.TEST_USER + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(user_id=self.TEST_USER, + password=self.TEST_TOKEN, + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_token, TEST_TOKEN) + + def test_authenticate_failure(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + ident['password']['user']['password'] = 'bad_key' + resp = utils.TestResponse({ + "status_code": 401, + "text": json.dumps({ + "unauthorized": { + "message": "Unauthorized", + "code": "401", + }, + }), + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + # Workaround for issue with assertRaises on python2.6 + # where with assertRaises(exceptions.Unauthorized): doesn't work + # right + def client_create_wrapper(): + client.Client(user_domain_name=self.TEST_DOMAIN_NAME, + username=self.TEST_USER, + password="bad_key", + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + + self.assertRaises(exceptions.Unauthorized, client_create_wrapper) + + def test_auth_redirect(self): + correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True) + dict_responses = [ + { + "headers": { + 'location': self.TEST_ADMIN_URL + "/auth/tokens", + 'X-Subject-Token': self.TEST_TOKEN, + }, + "status_code": 305, + "text": "Use proxy", + }, + { + "headers": {'X-Subject-Token': self.TEST_TOKEN}, + "status_code": 200, + "text": correct_response, + }, + ] + responses = [(utils.TestResponse(resp)) + for resp in dict_responses] + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn(responses[0]) + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_ADMIN_URL + "/auth/tokens", + **kwargs).AndReturn(responses[1]) + self.mox.ReplayAll() + + cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, + username=self.TEST_USER, + password=self.TEST_TOKEN, + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_domain_username_password_scoped(self): + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, + username=self.TEST_USER, + password=self.TEST_TOKEN, + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_userid_password_domain_scoped(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password']['user']['domain'] + del ident['password']['user']['name'] + ident['password']['user']['id'] = self.TEST_USER + + scope = self.TEST_REQUEST_BODY['auth']['scope'] + del scope['project'] + scope['domain'] = {} + scope['domain']['id'] = self.TEST_DOMAIN_ID + + token = self.TEST_RESPONSE_DICT['token'] + del token['project'] + token['domain'] = {} + token['domain']['id'] = self.TEST_DOMAIN_ID + token['domain']['name'] = self.TEST_DOMAIN_NAME + + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(user_id=self.TEST_USER, + password=self.TEST_TOKEN, + domain_id=self.TEST_DOMAIN_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_domain_id, + self.TEST_DOMAIN_ID) + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_userid_password_project_scoped(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password']['user']['domain'] + del ident['password']['user']['name'] + ident['password']['user']['id'] = self.TEST_USER + + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(user_id=self.TEST_USER, + password=self.TEST_TOKEN, + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_tenant_id, + self.TEST_TENANT_ID) + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_password_unscoped(self): + del self.TEST_RESPONSE_DICT['token']['catalog'] + del self.TEST_REQUEST_BODY['auth']['scope'] + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, + username=self.TEST_USER, + password=self.TEST_TOKEN, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertFalse('catalog' in cs.service_catalog.catalog) + + def test_authenticate_success_token_domain_scoped(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password'] + ident['methods'] = ['token'] + ident['token'] = {} + ident['token']['id'] = self.TEST_TOKEN + + scope = self.TEST_REQUEST_BODY['auth']['scope'] + del scope['project'] + scope['domain'] = {} + scope['domain']['id'] = self.TEST_DOMAIN_ID + + token = self.TEST_RESPONSE_DICT['token'] + del token['project'] + token['domain'] = {} + token['domain']['id'] = self.TEST_DOMAIN_ID + token['domain']['name'] = self.TEST_DOMAIN_NAME + + self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(token=self.TEST_TOKEN, + domain_id=self.TEST_DOMAIN_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_domain_id, + self.TEST_DOMAIN_ID) + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_token_project_scoped(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password'] + ident['methods'] = ['token'] + ident['token'] = {} + ident['token']['id'] = self.TEST_TOKEN + self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(token=self.TEST_TOKEN, + project_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_tenant_id, + self.TEST_TENANT_ID) + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["token"]["catalog"][3] + ['endpoints'][2]["url"]) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + + def test_authenticate_success_token_unscoped(self): + ident = self.TEST_REQUEST_BODY['auth']['identity'] + del ident['password'] + ident['methods'] = ['token'] + ident['token'] = {} + ident['token']['id'] = self.TEST_TOKEN + del self.TEST_REQUEST_BODY['auth']['scope'] + del self.TEST_RESPONSE_DICT['token']['catalog'] + self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN + resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(self.TEST_RESPONSE_DICT), + "headers": self.TEST_RESPONSE_HEADERS, + }) + + kwargs = copy.copy(self.TEST_REQUEST_BASE) + kwargs['headers'] = self.TEST_REQUEST_HEADERS + kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) + requests.request('POST', + self.TEST_URL + "/auth/tokens", + **kwargs).AndReturn((resp)) + self.mox.ReplayAll() + + cs = client.Client(token=self.TEST_TOKEN, + auth_url=self.TEST_URL) + self.assertEqual(cs.auth_token, + self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertFalse('catalog' in cs.service_catalog.catalog) diff --git a/tests/v3/test_client.py b/tests/v3/test_client.py new file mode 100644 index 0000000..3198722 --- /dev/null +++ b/tests/v3/test_client.py @@ -0,0 +1,121 @@ +import json +import mock + +import requests + +from keystoneclient.v3 import client + +from tests import utils +from tests.v3 import client_fixtures + + +class KeystoneClientTest(utils.TestCase): + def setUp(self): + super(KeystoneClientTest, self).setUp() + + domain_scoped_fake_resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(client_fixtures.DOMAIN_SCOPED_TOKEN), + "headers": client_fixtures.AUTH_RESPONSE_HEADERS + }) + self.domain_scoped_mock_req = mock.Mock( + return_value=domain_scoped_fake_resp) + + project_scoped_fake_resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN), + "headers": client_fixtures.AUTH_RESPONSE_HEADERS + }) + self.project_scoped_mock_req = mock.Mock( + return_value=project_scoped_fake_resp) + + unscoped_fake_resp = utils.TestResponse({ + "status_code": 200, + "text": json.dumps(client_fixtures.UNSCOPED_TOKEN), + "headers": client_fixtures.AUTH_RESPONSE_HEADERS + }) + self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp) + + def test_unscoped_init(self): + with mock.patch.object(requests, "request", self.unscoped_mock_req): + c = client.Client(user_domain_name='exampledomain', + username='exampleuser', + password='password', + auth_url='http://somewhere/') + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + + def test_domain_scoped_init(self): + with mock.patch.object(requests, + "request", + self.domain_scoped_mock_req): + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + domain_name='exampledomain', + auth_url='http://somewhere/') + self.assertIsNotNone(c.auth_ref) + self.assertTrue(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + self.assertEquals(c.auth_domain_id, + '8e9283b7ba0b1038840c3842058b86ab') + + def test_project_scoped_init(self): + with mock.patch.object(requests, + "request", + self.project_scoped_mock_req): + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + user_domain_name='exampledomain', + project_name='exampleproject', + auth_url='http://somewhere/') + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertTrue(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + self.assertEquals(c.auth_tenant_id, + '225da22d3ce34b15877ea70b2a575f58') + + def test_auth_ref_load(self): + with mock.patch.object(requests, + "request", + self.project_scoped_mock_req): + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + project_id='225da22d3ce34b15877ea70b2a575f58', + auth_url='http://somewhere/') + cache = json.dumps(c.auth_ref) + new_client = client.Client(auth_ref=json.loads(cache)) + self.assertIsNotNone(new_client.auth_ref) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v3') + + def test_auth_ref_load_with_overridden_arguments(self): + with mock.patch.object(requests, + "request", + self.project_scoped_mock_req): + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + project_id='225da22d3ce34b15877ea70b2a575f58', + auth_url='http://somewhere/') + cache = json.dumps(c.auth_ref) + new_auth_url = "http://new-public:5000/v3" + new_client = client.Client(auth_ref=json.loads(cache), + auth_url=new_auth_url) + self.assertIsNotNone(new_client.auth_ref) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertEquals(new_client.auth_url, new_auth_url) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v3') diff --git a/tests/v3/test_service_catalog.py b/tests/v3/test_service_catalog.py new file mode 100644 index 0000000..d0dac3a --- /dev/null +++ b/tests/v3/test_service_catalog.py @@ -0,0 +1,55 @@ +from keystoneclient import access +from keystoneclient import exceptions + +from tests.v3 import client_fixtures +from tests.v3 import utils + + +class ServiceCatalogTest(utils.TestCase): + def setUp(self): + super(ServiceCatalogTest, self).setUp() + self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY + self.RESPONSE = utils.TestResponse({ + "headers": client_fixtures.AUTH_RESPONSE_HEADERS + }) + + def test_building_a_service_catalog(self): + auth_ref = access.AccessInfo.factory(self.RESPONSE, + self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + self.assertEquals(sc.url_for(service_type='compute'), + "https://compute.north.host/novapi/public") + self.assertEquals(sc.url_for(service_type='compute', + endpoint_type='internal'), + "https://compute.north.host/novapi/internal") + + self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region", + "South", service_type='compute') + + def test_service_catalog_endpoints(self): + auth_ref = access.AccessInfo.factory(self.RESPONSE, + self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + public_ep = sc.get_endpoints(service_type='compute', + endpoint_type='public') + self.assertEquals(public_ep['compute'][0]['region'], 'North') + self.assertEquals(public_ep['compute'][0]['url'], + "https://compute.north.host/novapi/public") + + def test_service_catalog_regions(self): + self.AUTH_RESPONSE_BODY['token']['region_name'] = "North" + auth_ref = access.AccessInfo.factory(self.RESPONSE, + self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + + url = sc.url_for(service_type='image', endpoint_type='public') + self.assertEquals(url, "http://glance.north.host/glanceapi/public") + + self.AUTH_RESPONSE_BODY['token']['region_name'] = "South" + auth_ref = access.AccessInfo.factory(self.RESPONSE, + self.AUTH_RESPONSE_BODY) + sc = auth_ref.service_catalog + url = sc.url_for(service_type='image', endpoint_type='internal') + self.assertEquals(url, "http://glance.south.host/glanceapi/internal") diff --git a/tests/v3/utils.py b/tests/v3/utils.py index 6ae7c0e..b3b359c 100644 --- a/tests/v3/utils.py +++ b/tests/v3/utils.py @@ -1,8 +1,8 @@ import copy import json -import uuid import time import urlparse +import uuid import mox import requests @@ -32,6 +32,9 @@ class TestClient(client.Client): class TestCase(testtools.TestCase): + TEST_DOMAIN_ID = '1' + TEST_DOMAIN_NAME = 'aDomain' + TEST_TENANT_ID = '1' TEST_TENANT_NAME = 'aTenant' TEST_TOKEN = 'aToken' TEST_USER = 'test' @@ -43,6 +46,84 @@ class TestCase(testtools.TestCase): 'verify': True, } + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "url": "http://cdn.admin-nets.local:8774/v1.0/", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:8774/v1.0", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "interface": "admin" + }], + "type": "nova_compat" + }, { + "endpoints": [{ + "url": "http://nova/novapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://nova/novapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://nova/novapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "compute" + }, { + "endpoints": [{ + "url": "http://glance/glanceapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://glance/glanceapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://glance/glanceapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://127.0.0.1:35357/v3", + "region": "RegionOne", + "interface": "admin" + }], + "type": "identity" + }, { + "endpoints": [{ + "url": "http://swift/swiftapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://swift/swiftapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://swift/swiftapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "object-store" + }] + def setUp(self): super(TestCase, self).setUp() self.mox = mox.Mox() |