summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/client_fixtures.py72
-rw-r--r--tests/fakes.py73
-rw-r--r--tests/test_auth_token_middleware.py111
-rw-r--r--tests/test_client.py40
-rw-r--r--tests/test_https.py24
-rw-r--r--tests/test_keyring.py65
-rw-r--r--tests/test_memcache_crypt.py96
-rw-r--r--tests/test_service_catalog.py144
-rw-r--r--tests/utils.py2
-rw-r--r--tests/v2_0/client_fixtures.py163
-rw-r--r--tests/v2_0/test_access.py (renamed from tests/test_access.py)12
-rw-r--r--tests/v2_0/test_client.py84
-rw-r--r--tests/v2_0/test_service_catalog.py50
-rw-r--r--tests/v3/client_fixtures.py234
-rw-r--r--tests/v3/test_access.py106
-rw-r--r--tests/v3/test_auth.py408
-rw-r--r--tests/v3/test_client.py121
-rw-r--r--tests/v3/test_service_catalog.py55
-rw-r--r--tests/v3/utils.py83
19 files changed, 1501 insertions, 442 deletions
diff --git a/tests/client_fixtures.py b/tests/client_fixtures.py
deleted file mode 100644
index f0b5137..0000000
--- a/tests/client_fixtures.py
+++ /dev/null
@@ -1,72 +0,0 @@
-UNSCOPED_TOKEN = {
- u'access': {u'serviceCatalog': {},
- u'token': {u'expires': u'2012-10-03T16:58:01Z',
- u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
- u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
- u'name': u'exampleuser',
- u'roles': [],
- u'roles_links': [],
- u'username': u'exampleuser'}
- }
-}
-
-PROJECT_SCOPED_TOKEN = {
- u'access': {
- u'serviceCatalog': [{
- u'endpoints': [{
- u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'internalURL':
- u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'publicURL':
- u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'region': u'RegionOne'
- }],
- u'endpoints_links': [],
- u'name': u'Volume Service',
- u'type': u'volume'},
- {u'endpoints': [{
- u'adminURL': u'http://admin:9292/v1',
- u'internalURL': u'http://internal:9292/v1',
- u'publicURL': u'http://public.com:9292/v1',
- u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Image Service',
- u'type': u'image'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Compute Service',
- u'type': u'compute'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:8773/services/Admin',
-u'internalURL': u'http://internal:8773/services/Cloud',
-u'publicURL': u'http://public.com:8773/services/Cloud',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'EC2 Service',
- u'type': u'ec2'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:35357/v2.0',
-u'internalURL': u'http://internal:5000/v2.0',
-u'publicURL': u'http://public.com:5000/v2.0',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Identity Service',
- u'type': u'identity'}],
- u'token': {u'expires': u'2012-10-03T16:53:36Z',
- u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
- u'tenant': {u'description': u'',
- u'enabled': True,
- u'id': u'225da22d3ce34b15877ea70b2a575f58',
- u'name': u'exampleproject'}},
- u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
- u'name': u'exampleuser',
- u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
- u'name': u'Member'}],
- u'roles_links': [],
- u'username': u'exampleuser'}
- }
-}
diff --git a/tests/fakes.py b/tests/fakes.py
index feafd52..de7ecff 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -65,37 +65,46 @@ class FakeClient(object):
def authenticate(self, cl_obj):
cl_obj.user_id = '1'
cl_obj.auth_user_id = '1'
- cl_obj.tenant_id = '1'
+ cl_obj.project_id = '1'
cl_obj.auth_tenant_id = '1'
- cl_obj.auth_ref = access.AccessInfo({
- "token": {
- "expires": "2012-02-05T00:00:00",
- "id": "887665443383838",
- "tenant": {
+ cl_obj.auth_ref = access.AccessInfo.factory(None, {
+ "access": {
+ "token": {
+ "expires": "2012-02-05T00:00:00",
+ "id": "887665443383838",
+ "tenant": {
+ "id": "1",
+ "name": "customer-x"
+ }
+ },
+ "serviceCatalog": [{
+ "endpoints": [{
+ "adminURL": "http://swift.admin-nets.local:8080/",
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
+ "publicURL":
+ "http://swift.publicinternets.com/v1/AUTH_1"
+ }],
+ "type": "object-store",
+ "name": "swift"
+ }, {
+ "endpoints": [{
+ "adminURL": "http://cdn.admin-nets.local/v1.1/1",
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:7777/v1.1/1",
+ "publicURL": "http://cdn.publicinternets.com/v1.1/1"
+ }],
+ "type": "object-store",
+ "name": "cdn"
+ }],
+ "user": {
"id": "1",
- "name": "customer-x"}},
- "serviceCatalog": [
- {"endpoints": [
- {"adminURL": "http://swift.admin-nets.local:8080/",
- "region": "RegionOne",
- "internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
- "publicURL":
- "http://swift.publicinternets.com/v1/AUTH_1"}],
- "type": "object-store",
- "name": "swift"},
- {"endpoints": [
- {"adminURL": "http://cdn.admin-nets.local/v1.1/1",
- "region": "RegionOne",
- "internalURL": "http://127.0.0.1:7777/v1.1/1",
- "publicURL": "http://cdn.publicinternets.com/v1.1/1"}],
- "type": "object-store",
- "name": "cdn"}],
- "user": {
- "id": "1",
- "roles": [
- {"tenantId": "1",
- "id": "3",
- "name": "Member"}],
- "name": "joeuser"}
- }
- )
+ "roles": [{
+ "tenantId": "1",
+ "id": "3",
+ "name": "Member"
+ }],
+ "name": "joeuser"
+ }
+ }
+ })
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py
index 06054d0..f11dfee 100644
--- a/tests/test_auth_token_middleware.py
+++ b/tests/test_auth_token_middleware.py
@@ -28,7 +28,6 @@ import webob
from keystoneclient.common import cms
from keystoneclient import utils
from keystoneclient.middleware import auth_token
-from keystoneclient.middleware import memcache_crypt
from keystoneclient.openstack.common import memorycache
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils
@@ -806,7 +805,7 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
+ 'memcached_servers': 'localhost:11211',
}
auth_token.AuthProtocol(FakeApp(), conf)
@@ -817,7 +816,7 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': 'localhost:11211'
+ 'memcached_servers': 'localhost:11211'
}
self.set_middleware(conf=conf)
self.middleware._init_cache(env)
@@ -1013,9 +1012,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def _get_cached_token(self, token):
token_id = cms.cms_hash_token(token)
# NOTE(vish): example tokens are expired so skip the expiration check.
- key = self.middleware._get_cache_key(token_id)
- cached = self.middleware._cache.get(key)
- return self.middleware._unprotect_cache_value(token, cached)
+ return self.middleware._cache_get(token_id, ignore_expires=True)
def test_memcache(self):
req = webob.Request.blank('/')
@@ -1036,7 +1033,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
token = 'invalid-token'
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self._get_cached_token(token), "invalid")
+ self.assertRaises(auth_token.InvalidUserToken,
+ self._get_cached_token, token)
def test_memcache_set_expired(self):
token_cache_time = 10
@@ -1072,7 +1070,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'cache': 'swift.cache',
- 'memcache_servers': ['localhost:11211']
+ 'memcached_servers': ['localhost:11211']
}
self.set_middleware(conf=conf)
self.middleware._init_cache(env)
@@ -1091,98 +1089,47 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'encrypt',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
- encrypted_data = self.middleware._protect_cache_value(
- 'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
- self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
- self.assertEqual(
- TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
- self.middleware._unprotect_cache_value('token', encrypted_data))
- # should return None if unable to decrypt
- self.assertIsNone(
- self.middleware._unprotect_cache_value(
- 'token', '{ENCRYPT:AES256}corrupted'))
- self.assertIsNone(
- self.middleware._unprotect_cache_value('mykey', encrypted_data))
+ token = 'my_token'
+ data = ('this_data', 10e100)
+ self.middleware._init_cache({})
+ self.middleware._cache_store(token, data)
+ self.assertEqual(self.middleware._cache_get(token), data[0])
def test_sign_cache_data(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
- signed_data = self.middleware._protect_cache_value(
- 'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
- expected = '{MAC:SHA1}'
- self.assertEqual(
- signed_data[:10],
- expected)
- self.assertEqual(
- TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
- self.middleware._unprotect_cache_value('mykey', signed_data))
- # should return None on corrupted data
- self.assertIsNone(
- self.middleware._unprotect_cache_value('mykey',
- '{MAC:SHA1}corrupted'))
+ token = 'my_token'
+ data = ('this_data', 10e100)
+ self.middleware._init_cache({})
+ self.middleware._cache_store(token, data)
+ self.assertEqual(self.middleware._cache_get(token), data[0])
def test_no_memcache_protection(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
- data = self.middleware._protect_cache_value('mykey',
- 'This is a test!')
- self.assertEqual(data, 'This is a test!')
- self.assertEqual(
- 'This is a test!',
- self.middleware._unprotect_cache_value('mykey', data))
-
- def test_get_cache_key(self):
- conf = {
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- self.assertEqual(
- 'tokens/mytoken',
- self.middleware._get_cache_key('mytoken'))
- conf = {
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
- 'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
- self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
- conf = {
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
- 'memcache_security_strategy': 'Encrypt',
- 'memcache_secret_key': 'abc!'
- }
- self.set_middleware(conf=conf)
- expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
- self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
+ token = 'my_token'
+ data = ('this_data', 10e100)
+ self.middleware._init_cache({})
+ self.middleware._cache_store(token, data)
+ self.assertEqual(self.middleware._cache_get(token), data[0])
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
@@ -1190,7 +1137,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'Encrypt'
}
self.assertRaises(Exception, self.set_middleware, conf)
@@ -1199,7 +1146,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'whatever'
}
self.assertRaises(Exception, self.set_middleware, conf)
@@ -1208,7 +1155,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'mac'
}
self.assertRaises(Exception, self.set_middleware, conf)
@@ -1216,7 +1163,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': ''
}
@@ -1225,7 +1172,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
- 'memcache_servers': ['localhost:11211'],
+ 'memcached_servers': ['localhost:11211'],
'memcache_security_strategy': 'mAc',
'memcache_secret_key': ''
}
diff --git a/tests/test_client.py b/tests/test_client.py
deleted file mode 100644
index d24d72a..0000000
--- a/tests/test_client.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import json
-import mock
-
-import requests
-
-from keystoneclient.v2_0 import client
-from tests import client_fixtures
-from tests import utils
-
-
-fake_response = utils.TestResponse({
- "status_code": 200,
- "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
-})
-mock_request = mock.Mock(return_value=(fake_response))
-
-
-class KeystoneclientTest(utils.TestCase):
-
- def test_scoped_init(self):
- with mock.patch.object(requests, "request", mock_request):
- cl = client.Client(username='exampleuser',
- password='password',
- auth_url='http://somewhere/')
- self.assertIsNotNone(cl.auth_ref)
- self.assertTrue(cl.auth_ref.scoped)
-
- def test_auth_ref_load(self):
- with mock.patch.object(requests, "request", mock_request):
- cl = client.Client(username='exampleuser',
- password='password',
- auth_url='http://somewhere/')
- cache = json.dumps(cl.auth_ref)
- new_client = client.Client(auth_ref=json.loads(cache))
- self.assertIsNotNone(new_client.auth_ref)
- self.assertTrue(new_client.auth_ref.scoped)
- self.assertEquals(new_client.username, 'exampleuser')
- self.assertIsNone(new_client.password)
- self.assertEqual(new_client.management_url,
- 'http://admin:35357/v2.0')
diff --git a/tests/test_https.py b/tests/test_https.py
index bef1db4..4070df9 100644
--- a/tests/test_https.py
+++ b/tests/test_https.py
@@ -68,3 +68,27 @@ class ClientTest(utils.TestCase):
headers=headers,
data='[1, 2, 3]',
**kwargs)
+
+ def test_post_auth(self):
+ with mock.patch.object(requests, "request", MOCK_REQUEST):
+ cl = client.HTTPClient(
+ username="username", password="password", tenant_id="tenant",
+ auth_url="auth_test", cacert="ca.pem", key="key.pem",
+ cert="cert.pem")
+ cl.management_url = "https://127.0.0.1:5000"
+ cl.auth_token = "token"
+ cl.post("/hi", body=[1, 2, 3])
+ headers = {
+ "X-Auth-Token": "token",
+ "Content-Type": "application/json",
+ "User-Agent": cl.USER_AGENT
+ }
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['cert'] = ('cert.pem', 'key.pem')
+ kwargs['verify'] = 'ca.pem'
+ MOCK_REQUEST.assert_called_with(
+ "POST",
+ "https://127.0.0.1:5000/hi",
+ headers=headers,
+ data='[1, 2, 3]',
+ **kwargs)
diff --git a/tests/test_keyring.py b/tests/test_keyring.py
index 00fb0c2..f3eba82 100644
--- a/tests/test_keyring.py
+++ b/tests/test_keyring.py
@@ -3,7 +3,7 @@ import datetime
from keystoneclient import access
from keystoneclient import client
from keystoneclient.openstack.common import timeutils
-from tests import client_fixtures
+from tests.v2_0 import client_fixtures
from tests import utils
try:
@@ -60,11 +60,12 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertIsNone(keyring_key)
self.assertIsNone(auth_ref)
@@ -73,49 +74,57 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertEqual(keyring_key,
'%s/%s/%s/%s/%s' %
- (AUTH_URL, USERNAME, TENANT, TENANT_ID, TOKEN))
+ (AUTH_URL, TENANT_ID, TENANT, TOKEN, USERNAME))
def test_set_and_get_keyring_expired(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
- cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expired = timeutils.utcnow() - datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expired)
cl.store_auth_ref_into_keyring(keyring_key)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertIsNone(auth_ref)
def test_set_and_get_keyring(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
- cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expires = timeutils.utcnow() + datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expires)
cl.store_auth_ref_into_keyring(keyring_key)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertEqual(auth_ref.auth_token, TOKEN)
self.assertEqual(auth_ref.username, USERNAME)
diff --git a/tests/test_memcache_crypt.py b/tests/test_memcache_crypt.py
index b2281d9..524cd21 100644
--- a/tests/test_memcache_crypt.py
+++ b/tests/test_memcache_crypt.py
@@ -4,48 +4,66 @@ from keystoneclient.middleware import memcache_crypt
class MemcacheCryptPositiveTests(testtools.TestCase):
- def test_generate_aes_key(self):
- self.assertEqual(
- len(memcache_crypt.generate_aes_key('Gimme Da Key', 'hush')), 32)
+ def _setup_keys(self, strategy):
+ return memcache_crypt.derive_keys('token', 'secret', strategy)
- def test_compute_mac(self):
- self.assertEqual(
- memcache_crypt.compute_mac('mykey', 'This is a test!'),
- 'tREu41yR5tEgeBWIuv9ag4AeKA8=')
+ def test_constant_time_compare(self):
+ # make sure it works as a compare, the "constant time" aspect
+ # isn't appropriate to test in unittests
+ ctc = memcache_crypt.constant_time_compare
+ self.assertTrue(ctc('abcd', 'abcd'))
+ self.assertTrue(ctc('', ''))
+ self.assertFalse(ctc('abcd', 'efgh'))
+ self.assertFalse(ctc('abc', 'abcd'))
+ self.assertFalse(ctc('abc', 'abc\x00'))
+ self.assertFalse(ctc('', 'abc'))
+
+ def test_derive_keys(self):
+ keys = memcache_crypt.derive_keys('token', 'secret', 'strategy')
+ self.assertEqual(len(keys['ENCRYPTION']),
+ len(keys['CACHE_KEY']))
+ self.assertEqual(len(keys['CACHE_KEY']),
+ len(keys['MAC']))
+ self.assertNotEqual(keys['ENCRYPTION'],
+ keys['MAC'])
+ self.assertIn('strategy', keys.keys())
+
+ def test_key_strategy_diff(self):
+ k1 = self._setup_keys('MAC')
+ k2 = self._setup_keys('ENCRYPT')
+ self.assertNotEqual(k1, k2)
def test_sign_data(self):
- expected = '{MAC:SHA1}eyJtYWMiOiAiM0FrQmdPZHRybGo1RFFESHA1eUxqcDVq' +\
- 'Si9BPSIsICJzZXJpYWxpemVkX2RhdGEiOiAiXCJUaGlzIGlzIGEgdG' +\
- 'VzdCFcIiJ9'
- self.assertEqual(
- memcache_crypt.sign_data('mykey', 'This is a test!'),
- expected)
-
- def test_verify_signed_data(self):
- signed = memcache_crypt.sign_data('mykey', 'Testz')
- self.assertEqual(
- memcache_crypt.verify_signed_data('mykey', signed),
- 'Testz')
- self.assertEqual(
- memcache_crypt.verify_signed_data('aasSFWE13WER', 'not MACed'),
- 'not MACed')
-
- def test_encrypt_data(self):
- expected = '{ENCRYPT:AES256}'
- self.assertEqual(
- memcache_crypt.encrypt_data('mykey', 'mysecret',
- 'This is a test!')[:16],
- expected)
-
- def test_decrypt_data(self):
- encrypted = memcache_crypt.encrypt_data('mykey', 'mysecret', 'Testz')
- self.assertEqual(
- memcache_crypt.decrypt_data('mykey', 'mysecret', encrypted),
- 'Testz')
- self.assertEqual(
- memcache_crypt.decrypt_data('mykey', 'mysecret',
- 'Not Encrypted!'),
- 'Not Encrypted!')
+ keys = self._setup_keys('MAC')
+ sig = memcache_crypt.sign_data(keys['MAC'], 'data')
+ self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
+
+ def test_encryption(self):
+ keys = self._setup_keys('ENCRYPT')
+ # what you put in is what you get out
+ for data in ['data', '1234567890123456', '\x00\xFF' * 13
+ ] + [chr(x % 256) * x for x in range(768)]:
+ crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
+ decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
+ self.assertEqual(data, decrypt)
+ self.assertRaises(memcache_crypt.DecryptError,
+ memcache_crypt.decrypt_data,
+ keys['ENCRYPTION'], crypt[:-1])
+
+ def test_protect_wrappers(self):
+ data = 'My Pretty Little Data'
+ for strategy in ['MAC', 'ENCRYPT']:
+ keys = self._setup_keys(strategy)
+ protected = memcache_crypt.protect_data(keys, data)
+ self.assertNotEqual(protected, data)
+ if strategy == 'ENCRYPT':
+ self.assertNotIn(data, protected)
+ unprotected = memcache_crypt.unprotect_data(keys, protected)
+ self.assertEqual(data, unprotected)
+ self.assertRaises(memcache_crypt.InvalidMacError,
+ memcache_crypt.unprotect_data,
+ keys, protected[:-1])
+ self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
def test_no_pycrypt(self):
aes = memcache_crypt.AES
diff --git a/tests/test_service_catalog.py b/tests/test_service_catalog.py
deleted file mode 100644
index 09ac19c..0000000
--- a/tests/test_service_catalog.py
+++ /dev/null
@@ -1,144 +0,0 @@
-from keystoneclient import exceptions
-from keystoneclient import service_catalog
-from tests import utils
-
-
-# Taken directly from keystone/content/common/samples/auth.json
-# Do not edit this structure. Instead, grab the latest from there.
-
-SERVICE_CATALOG = {
- "access": {
- "token": {
- "id": "ab48a9efdfedb23ty3494",
- "expires": "2010-11-01T03:32:15-05:00",
- "tenant": {
- "id": "345",
- "name": "My Project"
- }
- },
- "user": {
- "id": "123",
- "name": "jqsmith",
- "roles": [
- {"id": "234",
- "name": "compute:admin"},
- {"id": "235",
- "name": "object-store:admin",
- "tenantId": "1"}
- ],
- "roles_links": []
- },
- "serviceCatalog": [{
- "name": "Cloud Servers",
- "type": "compute",
- "endpoints": [
- {"tenantId": "1",
- "publicURL": "https://compute.north.host/v1/1234",
- "internalURL": "https://compute.north.host/v1/1234",
- "region": "North",
- "versionId": "1.0",
- "versionInfo": "https://compute.north.host/v1.0/",
- "versionList": "https://compute.north.host/"},
- {"tenantId": "2",
- "publicURL": "https://compute.north.host/v1.1/3456",
- "internalURL": "https://compute.north.host/v1.1/3456",
- "region": "North",
- "versionId": "1.1",
- "versionInfo": "https://compute.north.host/v1.1/",
- "versionList": "https://compute.north.host/"}
- ],
- "endpoints_links": []
- },
- {
- "name": "Cloud Files",
- "type": "object-store",
- "endpoints": [
- {"tenantId": "11",
- "publicURL": "https://compute.north.host/v1/blah-blah",
- "internalURL": "https://compute.north.host/v1/blah-blah",
- "region": "South",
- "versionId": "1.0",
- "versionInfo": "uri",
- "versionList": "uri"},
- {"tenantId": "2",
- "publicURL": "https://compute.north.host/v1.1/blah-blah",
- "internalURL":
- "https://compute.north.host/v1.1/blah-blah",
- "region": "South",
- "versionId": "1.1",
- "versionInfo": "https://compute.north.host/v1.1/",
- "versionList": "https://compute.north.host/"}
- ],
- "endpoints_links": [{
- "rel": "next",
- "href": "https://identity.north.host/v2.0/"
- "endpoints?marker=2"
- }]
- },
- {
- "name": "Image Servers",
- "type": "image",
- "endpoints": [
- {"publicURL": "https://image.north.host/v1/",
- "internalURL": "https://image-internal.north.host/v1/",
- "region": "North"},
- {"publicURL": "https://image.south.host/v1/",
- "internalURL": "https://image-internal.south.host/v1/",
- "region": "South"}
- ],
- "endpoints_links": []
- },
- ],
- "serviceCatalog_links": [{
- "rel": "next",
- "href": ("https://identity.host/v2.0/endpoints?"
- "session=2hfh8Ar&marker=2")
- }]
- }
-}
-
-
-class ServiceCatalogTest(utils.TestCase):
- def test_building_a_service_catalog(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
-
- self.assertEquals(sc.url_for(service_type='compute'),
- "https://compute.north.host/v1/1234")
- self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
- "https://compute.north.host/v1/1234")
- self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
- "https://compute.north.host/v1.1/3456")
-
- self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
- "South", service_type='compute')
-
- def test_service_catalog_endpoints(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
- public_ep = sc.get_endpoints(service_type='compute',
- endpoint_type='publicURL')
- self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
- self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
- self.assertEquals(public_ep['compute'][1]['internalURL'],
- "https://compute.north.host/v1.1/3456")
-
- def test_service_catalog_regions(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
- region_name="North")
- url = sc.url_for(service_type='image', endpoint_type='publicURL')
- self.assertEquals(url, "https://image.north.host/v1/")
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
- region_name="South")
- url = sc.url_for(service_type='image', endpoint_type='internalURL')
- self.assertEquals(url, "https://image-internal.south.host/v1/")
-
- def test_token(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
-
- self.assertEquals(sc.get_token(), {
- 'id': 'ab48a9efdfedb23ty3494',
- 'tenant_id': '345',
- 'user_id': '123',
- 'expires': '2010-11-01T03:32:15-05:00'})
- self.assertEquals(sc.catalog['token']['expires'],
- "2010-11-01T03:32:15-05:00")
- self.assertEquals(sc.catalog['token']['tenant']['id'], '345')
diff --git a/tests/utils.py b/tests/utils.py
index 9d9bf8d..264aceb 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -8,6 +8,8 @@ from keystoneclient.v2_0 import client
class TestCase(testtools.TestCase):
+ TEST_DOMAIN_ID = '1'
+ TEST_DOMAIN_NAME = 'aDomain'
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
diff --git a/tests/v2_0/client_fixtures.py b/tests/v2_0/client_fixtures.py
new file mode 100644
index 0000000..70bb43f
--- /dev/null
+++ b/tests/v2_0/client_fixtures.py
@@ -0,0 +1,163 @@
+UNSCOPED_TOKEN = {
+ u'access': {u'serviceCatalog': {},
+ u'token': {u'expires': u'2012-10-03T16:58:01Z',
+ u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
+ u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ u'roles': [],
+ u'roles_links': [],
+ u'username': u'exampleuser'}
+ }
+}
+
+PROJECT_SCOPED_TOKEN = {
+ u'access': {
+ u'serviceCatalog': [{
+ u'endpoints': [{
+ u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'internalURL':
+ u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'publicURL':
+ u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne'
+ }],
+ u'endpoints_links': [],
+ u'name': u'Volume Service',
+ u'type': u'volume'},
+ {u'endpoints': [{
+ u'adminURL': u'http://admin:9292/v1',
+ u'internalURL': u'http://internal:9292/v1',
+ u'publicURL': u'http://public.com:9292/v1',
+ u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Image Service',
+ u'type': u'image'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Compute Service',
+ u'type': u'compute'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:8773/services/Admin',
+u'internalURL': u'http://internal:8773/services/Cloud',
+u'publicURL': u'http://public.com:8773/services/Cloud',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'EC2 Service',
+ u'type': u'ec2'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:35357/v2.0',
+u'internalURL': u'http://internal:5000/v2.0',
+u'publicURL': u'http://public.com:5000/v2.0',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Identity Service',
+ u'type': u'identity'}],
+ u'token': {u'expires': u'2012-10-03T16:53:36Z',
+ u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
+ u'tenant': {u'description': u'',
+ u'enabled': True,
+ u'id': u'225da22d3ce34b15877ea70b2a575f58',
+ u'name': u'exampleproject'}},
+ u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
+ u'name': u'Member'}],
+ u'roles_links': [],
+ u'username': u'exampleuser'}
+ }
+}
+
+AUTH_RESPONSE_BODY = {
+ u'access': {
+ u'token': {
+ u'id': u'ab48a9efdfedb23ty3494',
+ u'expires': u'2010-11-01T03:32:15-05:00',
+ u'tenant': {
+ u'id': u'345',
+ u'name': u'My Project'
+ }
+ },
+ u'user': {
+ u'id': u'123',
+ u'name': u'jqsmith',
+ u'roles': [{
+ u'id': u'234',
+ u'name': u'compute:admin'
+ }, {
+ u'id': u'235',
+ u'name': u'object-store:admin',
+ u'tenantId': u'1'
+ }],
+ u'roles_links': []
+ },
+ u'serviceCatalog': [{
+ u'name': u'Cloud Servers',
+ u'type': u'compute',
+ u'endpoints': [{
+ u'tenantId': u'1',
+ u'publicURL': u'https://compute.north.host/v1/1234',
+ u'internalURL': u'https://compute.north.host/v1/1234',
+ u'region': u'North',
+ u'versionId': u'1.0',
+ u'versionInfo': u'https://compute.north.host/v1.0/',
+ u'versionList': u'https://compute.north.host/u'
+ }, {
+ u'tenantId': u'2',
+ u'publicURL': u'https://compute.north.host/v1.1/3456',
+ u'internalURL': u'https://compute.north.host/v1.1/3456',
+ u'region': u'North',
+ u'versionId': u'1.1',
+ u'versionInfo': u'https://compute.north.host/v1.1/',
+ u'versionList': u'https://compute.north.host/u'
+ }],
+ u'endpoints_links': []
+ }, {
+ u'name': u'Cloud Files',
+ u'type': u'object-store',
+ u'endpoints': [{
+ u'tenantId': u'11',
+ u'publicURL': u'https://swift.north.host/v1/blah',
+ u'internalURL': u'https://swift.north.host/v1/blah',
+ u'region': u'South',
+ u'versionId': u'1.0',
+ u'versionInfo': u'uri',
+ u'versionList': u'uri'
+ }, {
+ u'tenantId': u'2',
+ u'publicURL': u'https://swift.north.host/v1.1/blah',
+ u'internalURL': u'https://compute.north.host/v1.1/blah',
+ u'region': u'South',
+ u'versionId': u'1.1',
+ u'versionInfo': u'https://swift.north.host/v1.1/',
+ u'versionList': u'https://swift.north.host/'
+ }],
+ u'endpoints_links': [{
+ u'rel': u'next',
+ u'href': u'https://identity.north.host/v2.0/'
+ u'endpoints?marker=2'
+ }]
+ }, {
+ u'name': u'Image Servers',
+ u'type': u'image',
+ u'endpoints': [{
+ u'publicURL': u'https://image.north.host/v1/',
+ u'internalURL': u'https://image-internal.north.host/v1/',
+ u'region': u'North'
+ }, {
+ u'publicURL': u'https://image.south.host/v1/',
+ u'internalURL': u'https://image-internal.south.host/v1/',
+ u'region': u'South'
+ }],
+ u'endpoints_links': []
+ }],
+ u'serviceCatalog_links': [{
+ u'rel': u'next',
+ u'href': (u'https://identity.host/v2.0/endpoints?'
+ u'session=2hfh8Ar&marker=2')
+ }]
+ }
+}
diff --git a/tests/test_access.py b/tests/v2_0/test_access.py
index 31f91d6..c1bb4f7 100644
--- a/tests/test_access.py
+++ b/tests/v2_0/test_access.py
@@ -3,7 +3,7 @@ import datetime
from keystoneclient import access
from keystoneclient.openstack.common import timeutils
from tests import utils
-from tests import client_fixtures
+from tests.v2_0 import client_fixtures
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
@@ -11,7 +11,7 @@ PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
class AccessInfoTest(utils.TestCase):
def test_building_unscoped_accessinfo(self):
- auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -30,6 +30,8 @@ class AccessInfoTest(utils.TestCase):
self.assertEquals(auth_ref.management_url, None)
self.assertFalse(auth_ref.scoped)
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['access']['token']['expires']))
@@ -37,13 +39,13 @@ class AccessInfoTest(utils.TestCase):
def test_will_expire_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
UNSCOPED_TOKEN['access']['token']['expires'] = expires.isoformat()
- auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_scoped_accessinfo(self):
- auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -68,3 +70,5 @@ class AccessInfoTest(utils.TestCase):
('http://admin:35357/v2.0',))
self.assertTrue(auth_ref.scoped)
+ self.assertTrue(auth_ref.project_scoped)
+ self.assertFalse(auth_ref.domain_scoped)
diff --git a/tests/v2_0/test_client.py b/tests/v2_0/test_client.py
new file mode 100644
index 0000000..b1e485f
--- /dev/null
+++ b/tests/v2_0/test_client.py
@@ -0,0 +1,84 @@
+import json
+import mock
+
+import requests
+
+from keystoneclient.v2_0 import client
+from tests import utils
+from tests.v2_0 import client_fixtures
+
+
+class KeystoneClientTest(utils.TestCase):
+ def setUp(self):
+ super(KeystoneClientTest, self).setUp()
+
+ scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
+ })
+ self.scoped_mock_req = mock.Mock(return_value=scoped_fake_resp)
+
+ unscoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.UNSCOPED_TOKEN)
+ })
+ self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
+
+ def test_unscoped_init(self):
+ with mock.patch.object(requests, "request", self.unscoped_mock_req):
+ c = client.Client(username='exampleuser',
+ password='password',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.scoped)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+
+ def test_scoped_init(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ c = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertTrue(c.auth_ref.scoped)
+ self.assertTrue(c.auth_ref.project_scoped)
+ self.assertFalse(c.auth_ref.domain_scoped)
+
+ def test_auth_ref_load(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ cl = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ cache = json.dumps(cl.auth_ref)
+ new_client = client.Client(auth_ref=json.loads(cache))
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v2.0')
+
+ def test_auth_ref_load_with_overridden_arguments(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ cl = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ cache = json.dumps(cl.auth_ref)
+ new_auth_url = "http://new-public:5000/v2.0"
+ new_client = client.Client(auth_ref=json.loads(cache),
+ auth_url=new_auth_url)
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertEquals(new_client.auth_url, new_auth_url)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v2.0')
diff --git a/tests/v2_0/test_service_catalog.py b/tests/v2_0/test_service_catalog.py
new file mode 100644
index 0000000..15838b9
--- /dev/null
+++ b/tests/v2_0/test_service_catalog.py
@@ -0,0 +1,50 @@
+from keystoneclient import access
+from keystoneclient import exceptions
+
+from tests import utils
+from tests.v2_0 import client_fixtures
+
+
+class ServiceCatalogTest(utils.TestCase):
+ def setUp(self):
+ super(ServiceCatalogTest, self).setUp()
+ self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
+
+ def test_building_a_service_catalog(self):
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ self.assertEquals(sc.url_for(service_type='compute'),
+ "https://compute.north.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
+ "https://compute.north.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
+ "https://compute.north.host/v1.1/3456")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
+ "South", service_type='compute')
+
+ def test_service_catalog_endpoints(self):
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+ public_ep = sc.get_endpoints(service_type='compute',
+ endpoint_type='publicURL')
+ self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
+ self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
+ self.assertEquals(public_ep['compute'][1]['internalURL'],
+ "https://compute.north.host/v1.1/3456")
+
+ def test_service_catalog_regions(self):
+ self.AUTH_RESPONSE_BODY['access']['region_name'] = "North"
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='publicURL')
+ self.assertEquals(url, "https://image.north.host/v1/")
+
+ self.AUTH_RESPONSE_BODY['access']['region_name'] = "South"
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='internalURL')
+ self.assertEquals(url, "https://image-internal.south.host/v1/")
diff --git a/tests/v3/client_fixtures.py b/tests/v3/client_fixtures.py
new file mode 100644
index 0000000..fb72dc9
--- /dev/null
+++ b/tests/v3/client_fixtures.py
@@ -0,0 +1,234 @@
+UNSCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': {},
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ }
+ }
+}
+
+DOMAIN_SCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': {},
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ },
+ u'domain': {
+ u'id': u'8e9283b7ba0b1038840c3842058b86ab',
+ u'name': u'anotherdomain'
+ },
+ }
+}
+
+PROJECT_SCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': [{
+ u'endpoints': [{
+ u'url':
+ u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url':
+ u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url':
+ u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'volume'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'image'
+ }, {
+ u'endpoints': [{
+ u'url':
+ u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url':
+ u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url':
+ u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'compute'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:8773/services/Cloud',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:8773/services/Cloud',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:8773/services/Admin',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'ec2'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:5000/v3',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:5000/v3',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:35357/v3',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'identity'
+ }],
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ },
+ u'project': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'225da22d3ce34b15877ea70b2a575f58',
+ u'name': u'exampleproject',
+ },
+ }
+}
+
+AUTH_RESPONSE_HEADERS = {
+ u'X-Subject-Token': u'3e2813b7ba0b4006840c3825860b86ed'
+}
+
+AUTH_RESPONSE_BODY = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'project': {
+ u'domain': {
+ u'id': u'123',
+ u'name': u'aDomain'
+ },
+ u'id': u'345',
+ u'name': u'aTenant'
+ },
+ u'user': {
+ u'domain': {
+ u'id': u'1',
+ u'name': u'aDomain'
+ },
+ u'id': u'567',
+ u'name': u'test'
+ },
+ u'issued_at': u'2010-10-31T03:32:15-05:00',
+ u'catalog': [{
+ u'endpoints': [{
+ u'url': u'https://compute.north.host/novapi/public',
+ u'region': u'North',
+ u'interface': u'public'
+ }, {
+ u'url': u'https://compute.north.host/novapi/internal',
+ u'region': u'North',
+ u'interface': u'internal'
+ }, {
+ u'url': u'https://compute.north.host/novapi/admin',
+ u'region': u'North',
+ u'interface': u'admin'
+ }],
+ u'type': u'compute'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://swift.north.host/swiftapi/public',
+ u'region': u'South',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://swift.north.host/swiftapi/internal',
+ u'region': u'South',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://swift.north.host/swiftapi/admin',
+ u'region': u'South',
+ u'interface': u'admin'
+ }],
+ u'type': u'object-store'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://glance.north.host/glanceapi/public',
+ u'region': u'North',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://glance.north.host/glanceapi/internal',
+ u'region': u'North',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://glance.north.host/glanceapi/admin',
+ u'region': u'North',
+ u'interface': u'admin'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/public',
+ u'region': u'South',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/internal',
+ u'region': u'South',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/admin',
+ u'region': u'South',
+ u'interface': u'admin'
+ }],
+ u'type': u'image'
+ }]
+ }
+}
diff --git a/tests/v3/test_access.py b/tests/v3/test_access.py
new file mode 100644
index 0000000..050a373
--- /dev/null
+++ b/tests/v3/test_access.py
@@ -0,0 +1,106 @@
+import datetime
+
+from keystoneclient import access
+from keystoneclient.openstack.common import timeutils
+from tests import utils
+from tests.v3 import client_fixtures
+
+TOKEN_RESPONSE = utils.TestResponse({
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+})
+UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
+DOMAIN_SCOPED_TOKEN = client_fixtures.DOMAIN_SCOPED_TOKEN
+PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
+
+
+class AccessInfoTest(utils.TestCase):
+ def test_building_unscoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=UNSCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertFalse(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.project_name, None)
+ self.assertEquals(auth_ref.project_id, None)
+
+ self.assertEquals(auth_ref.auth_url, None)
+ self.assertEquals(auth_ref.management_url, None)
+
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
+
+ self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
+ UNSCOPED_TOKEN['token']['expires_at']))
+
+ def test_will_expire_soon(self):
+ expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
+ UNSCOPED_TOKEN['token']['expires_at'] = expires.isoformat()
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=UNSCOPED_TOKEN)
+ self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
+ self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
+ self.assertFalse(auth_ref.will_expire_soon())
+
+ def test_building_domain_scoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=DOMAIN_SCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertFalse(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.domain_name, 'anotherdomain')
+ self.assertEquals(auth_ref.domain_id,
+ '8e9283b7ba0b1038840c3842058b86ab')
+
+ self.assertEquals(auth_ref.project_name, None)
+ self.assertEquals(auth_ref.project_id, None)
+
+ self.assertTrue(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
+
+ def test_building_project_scoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=PROJECT_SCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertTrue(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.domain_name, None)
+ self.assertEquals(auth_ref.domain_id, None)
+
+ self.assertEquals(auth_ref.project_name, 'exampleproject')
+ self.assertEquals(auth_ref.project_id,
+ '225da22d3ce34b15877ea70b2a575f58')
+
+ self.assertEquals(auth_ref.tenant_name, auth_ref.project_name)
+ self.assertEquals(auth_ref.tenant_id, auth_ref.project_id)
+
+ self.assertEquals(auth_ref.auth_url,
+ ('http://public.com:5000/v3',))
+ self.assertEquals(auth_ref.management_url,
+ ('http://admin:35357/v3',))
+
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertTrue(auth_ref.project_scoped)
diff --git a/tests/v3/test_auth.py b/tests/v3/test_auth.py
new file mode 100644
index 0000000..41512f8
--- /dev/null
+++ b/tests/v3/test_auth.py
@@ -0,0 +1,408 @@
+import copy
+import json
+import requests
+
+from keystoneclient import exceptions
+from keystoneclient.v3 import client
+
+from tests.v3 import utils
+
+
+class AuthenticateAgainstKeystoneTests(utils.TestCase):
+ def setUp(self):
+ super(AuthenticateAgainstKeystoneTests, self).setUp()
+ self.TEST_RESPONSE_DICT = {
+ "token": {
+ "methods": [
+ "token",
+ "password"
+ ],
+
+ "expires_at": "2020-01-01T00:00:10.000123Z",
+ "project": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_TENANT_ID,
+ "name": self.TEST_TENANT_NAME
+ },
+ "user": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_USER,
+ "name": self.TEST_USER
+ },
+ "issued_at": "2013-05-29T16:55:21.468960Z",
+ "catalog": self.TEST_SERVICE_CATALOG
+ },
+ }
+ self.TEST_REQUEST_BODY = {
+ "auth": {
+ "identity": {
+ "methods": ["password"],
+ "password": {
+ "user": {
+ "domain": {
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "name": self.TEST_USER,
+ "password": self.TEST_TOKEN
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "id": self.TEST_TENANT_ID
+ },
+ }
+ }
+ }
+ self.TEST_REQUEST_HEADERS = {
+ 'Content-Type': 'application/json',
+ 'User-Agent': 'python-keystoneclient'
+ }
+ self.TEST_RESPONSE_HEADERS = {
+ 'X-Subject-Token': self.TEST_TOKEN
+ }
+
+ def test_authenticate_success(self):
+ TEST_TOKEN = "abcdef"
+ self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token, TEST_TOKEN)
+
+ def test_authenticate_failure(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ ident['password']['user']['password'] = 'bad_key'
+ resp = utils.TestResponse({
+ "status_code": 401,
+ "text": json.dumps({
+ "unauthorized": {
+ "message": "Unauthorized",
+ "code": "401",
+ },
+ }),
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ # Workaround for issue with assertRaises on python2.6
+ # where with assertRaises(exceptions.Unauthorized): doesn't work
+ # right
+ def client_create_wrapper():
+ client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password="bad_key",
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+
+ self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
+
+ def test_auth_redirect(self):
+ correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True)
+ dict_responses = [
+ {
+ "headers": {
+ 'location': self.TEST_ADMIN_URL + "/auth/tokens",
+ 'X-Subject-Token': self.TEST_TOKEN,
+ },
+ "status_code": 305,
+ "text": "Use proxy",
+ },
+ {
+ "headers": {'X-Subject-Token': self.TEST_TOKEN},
+ "status_code": 200,
+ "text": correct_response,
+ },
+ ]
+ responses = [(utils.TestResponse(resp))
+ for resp in dict_responses]
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn(responses[0])
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_ADMIN_URL + "/auth/tokens",
+ **kwargs).AndReturn(responses[1])
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_domain_username_password_scoped(self):
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_userid_password_domain_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+
+ scope = self.TEST_REQUEST_BODY['auth']['scope']
+ del scope['project']
+ scope['domain'] = {}
+ scope['domain']['id'] = self.TEST_DOMAIN_ID
+
+ token = self.TEST_RESPONSE_DICT['token']
+ del token['project']
+ token['domain'] = {}
+ token['domain']['id'] = self.TEST_DOMAIN_ID
+ token['domain']['name'] = self.TEST_DOMAIN_NAME
+
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ domain_id=self.TEST_DOMAIN_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_domain_id,
+ self.TEST_DOMAIN_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_userid_password_project_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_tenant_id,
+ self.TEST_TENANT_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_password_unscoped(self):
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ del self.TEST_REQUEST_BODY['auth']['scope']
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+ self.assertFalse('catalog' in cs.service_catalog.catalog)
+
+ def test_authenticate_success_token_domain_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+
+ scope = self.TEST_REQUEST_BODY['auth']['scope']
+ del scope['project']
+ scope['domain'] = {}
+ scope['domain']['id'] = self.TEST_DOMAIN_ID
+
+ token = self.TEST_RESPONSE_DICT['token']
+ del token['project']
+ token['domain'] = {}
+ token['domain']['id'] = self.TEST_DOMAIN_ID
+ token['domain']['name'] = self.TEST_DOMAIN_NAME
+
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ domain_id=self.TEST_DOMAIN_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_domain_id,
+ self.TEST_DOMAIN_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_token_project_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_tenant_id,
+ self.TEST_TENANT_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_token_unscoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+ del self.TEST_REQUEST_BODY['auth']['scope']
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+ self.assertFalse('catalog' in cs.service_catalog.catalog)
diff --git a/tests/v3/test_client.py b/tests/v3/test_client.py
new file mode 100644
index 0000000..3198722
--- /dev/null
+++ b/tests/v3/test_client.py
@@ -0,0 +1,121 @@
+import json
+import mock
+
+import requests
+
+from keystoneclient.v3 import client
+
+from tests import utils
+from tests.v3 import client_fixtures
+
+
+class KeystoneClientTest(utils.TestCase):
+ def setUp(self):
+ super(KeystoneClientTest, self).setUp()
+
+ domain_scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.DOMAIN_SCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.domain_scoped_mock_req = mock.Mock(
+ return_value=domain_scoped_fake_resp)
+
+ project_scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.project_scoped_mock_req = mock.Mock(
+ return_value=project_scoped_fake_resp)
+
+ unscoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.UNSCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
+
+ def test_unscoped_init(self):
+ with mock.patch.object(requests, "request", self.unscoped_mock_req):
+ c = client.Client(user_domain_name='exampledomain',
+ username='exampleuser',
+ password='password',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ def test_domain_scoped_init(self):
+ with mock.patch.object(requests,
+ "request",
+ self.domain_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ domain_name='exampledomain',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertTrue(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+ self.assertEquals(c.auth_domain_id,
+ '8e9283b7ba0b1038840c3842058b86ab')
+
+ def test_project_scoped_init(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ user_domain_name='exampledomain',
+ project_name='exampleproject',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertTrue(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+ self.assertEquals(c.auth_tenant_id,
+ '225da22d3ce34b15877ea70b2a575f58')
+
+ def test_auth_ref_load(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ project_id='225da22d3ce34b15877ea70b2a575f58',
+ auth_url='http://somewhere/')
+ cache = json.dumps(c.auth_ref)
+ new_client = client.Client(auth_ref=json.loads(cache))
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v3')
+
+ def test_auth_ref_load_with_overridden_arguments(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ project_id='225da22d3ce34b15877ea70b2a575f58',
+ auth_url='http://somewhere/')
+ cache = json.dumps(c.auth_ref)
+ new_auth_url = "http://new-public:5000/v3"
+ new_client = client.Client(auth_ref=json.loads(cache),
+ auth_url=new_auth_url)
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertEquals(new_client.auth_url, new_auth_url)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v3')
diff --git a/tests/v3/test_service_catalog.py b/tests/v3/test_service_catalog.py
new file mode 100644
index 0000000..d0dac3a
--- /dev/null
+++ b/tests/v3/test_service_catalog.py
@@ -0,0 +1,55 @@
+from keystoneclient import access
+from keystoneclient import exceptions
+
+from tests.v3 import client_fixtures
+from tests.v3 import utils
+
+
+class ServiceCatalogTest(utils.TestCase):
+ def setUp(self):
+ super(ServiceCatalogTest, self).setUp()
+ self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
+ self.RESPONSE = utils.TestResponse({
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+
+ def test_building_a_service_catalog(self):
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ self.assertEquals(sc.url_for(service_type='compute'),
+ "https://compute.north.host/novapi/public")
+ self.assertEquals(sc.url_for(service_type='compute',
+ endpoint_type='internal'),
+ "https://compute.north.host/novapi/internal")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
+ "South", service_type='compute')
+
+ def test_service_catalog_endpoints(self):
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ public_ep = sc.get_endpoints(service_type='compute',
+ endpoint_type='public')
+ self.assertEquals(public_ep['compute'][0]['region'], 'North')
+ self.assertEquals(public_ep['compute'][0]['url'],
+ "https://compute.north.host/novapi/public")
+
+ def test_service_catalog_regions(self):
+ self.AUTH_RESPONSE_BODY['token']['region_name'] = "North"
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='public')
+ self.assertEquals(url, "http://glance.north.host/glanceapi/public")
+
+ self.AUTH_RESPONSE_BODY['token']['region_name'] = "South"
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+ url = sc.url_for(service_type='image', endpoint_type='internal')
+ self.assertEquals(url, "http://glance.south.host/glanceapi/internal")
diff --git a/tests/v3/utils.py b/tests/v3/utils.py
index 6ae7c0e..b3b359c 100644
--- a/tests/v3/utils.py
+++ b/tests/v3/utils.py
@@ -1,8 +1,8 @@
import copy
import json
-import uuid
import time
import urlparse
+import uuid
import mox
import requests
@@ -32,6 +32,9 @@ class TestClient(client.Client):
class TestCase(testtools.TestCase):
+ TEST_DOMAIN_ID = '1'
+ TEST_DOMAIN_NAME = 'aDomain'
+ TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
TEST_USER = 'test'
@@ -43,6 +46,84 @@ class TestCase(testtools.TestCase):
'verify': True,
}
+ TEST_SERVICE_CATALOG = [{
+ "endpoints": [{
+ "url": "http://cdn.admin-nets.local:8774/v1.0/",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://cdn.admin-nets.local:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "nova_compat"
+ }, {
+ "endpoints": [{
+ "url": "http://nova/novapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://nova/novapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://nova/novapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "compute"
+ }, {
+ "endpoints": [{
+ "url": "http://glance/glanceapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://glance/glanceapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://glance/glanceapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "image",
+ "name": "glance"
+ }, {
+ "endpoints": [{
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://127.0.0.1:35357/v3",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "identity"
+ }, {
+ "endpoints": [{
+ "url": "http://swift/swiftapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://swift/swiftapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://swift/swiftapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "object-store"
+ }]
+
def setUp(self):
super(TestCase, self).setUp()
self.mox = mox.Mox()