summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keystoneclient/access.py406
-rw-r--r--keystoneclient/client.py293
-rw-r--r--keystoneclient/service_catalog.py211
-rw-r--r--keystoneclient/v2_0/client.py59
-rw-r--r--keystoneclient/v3/client.py180
-rw-r--r--tests/client_fixtures.py72
-rw-r--r--tests/fakes.py73
-rw-r--r--tests/test_client.py40
-rw-r--r--tests/test_https.py24
-rw-r--r--tests/test_keyring.py65
-rw-r--r--tests/test_service_catalog.py144
-rw-r--r--tests/utils.py2
-rw-r--r--tests/v2_0/client_fixtures.py163
-rw-r--r--tests/v2_0/test_access.py (renamed from tests/test_access.py)12
-rw-r--r--tests/v2_0/test_client.py84
-rw-r--r--tests/v2_0/test_service_catalog.py50
-rw-r--r--tests/v3/client_fixtures.py234
-rw-r--r--tests/v3/test_access.py106
-rw-r--r--tests/v3/test_auth.py408
-rw-r--r--tests/v3/test_client.py121
-rw-r--r--tests/v3/test_service_catalog.py55
-rw-r--r--tests/v3/utils.py83
22 files changed, 2383 insertions, 502 deletions
diff --git a/keystoneclient/access.py b/keystoneclient/access.py
index 346315f..c8b4ffc 100644
--- a/keystoneclient/access.py
+++ b/keystoneclient/access.py
@@ -20,13 +20,41 @@ import datetime
from keystoneclient.openstack.common import timeutils
from keystoneclient import service_catalog
+
# gap, in seconds, to determine whether the given token is about to expire
STALE_TOKEN_DURATION = 30
class AccessInfo(dict):
- """An object for encapsulating a raw authentication token from keystone
- and helper methods for extracting useful values from that token."""
+ """Encapsulates a raw authentication token from keystone.
+
+ Provides helper methods for extracting useful values from that token.
+
+ """
+
+ @classmethod
+ def factory(cls, resp=None, body=None, **kwargs):
+ """Create AccessInfo object given a successful auth response & body
+ or a user-provided dict.
+ """
+ if body is not None or len(kwargs):
+ if AccessInfoV3.is_valid(body, **kwargs):
+ token = None
+ if resp:
+ token = resp.headers['X-Subject-Token']
+ if body:
+ return AccessInfoV3(token, **body['token'])
+ else:
+ return AccessInfoV3(token, **kwargs)
+ elif AccessInfoV2.is_valid(body, **kwargs):
+ if body:
+ return AccessInfoV2(**body['access'])
+ else:
+ return AccessInfoV2(**kwargs)
+ else:
+ raise NotImplementedError('Unrecognized auth response')
+ else:
+ return AccessInfoV2(**kwargs)
def __init__(self, *args, **kwargs):
super(AccessInfo, self).__init__(*args, **kwargs)
@@ -37,7 +65,7 @@ class AccessInfo(dict):
return 'serviceCatalog' in self
def will_expire_soon(self, stale_duration=None):
- """ Determines if expiration is about to occur.
+ """Determines if expiration is about to occur.
:return: boolean : true if expiration is within the given duration
@@ -51,123 +79,387 @@ class AccessInfo(dict):
seconds=stale_duration))
return norm_expires < soon
- @property
- def expires(self):
- """ Returns the token expiration (as datetime object)
+ @classmethod
+ def is_valid(cls, body, **kwargs):
+ """Determines if processing v2 or v3 token given a successful
+ auth body or a user-provided dict.
- :returns: datetime
+ :return: boolean : true if auth body matches implementing class
+ """
+ raise NotImplementedError()
+
+ def has_service_catalog(self):
+ """Returns true if the authorization token has a service catalog.
+ :returns: boolean
"""
- return timeutils.parse_isotime(self['token']['expires'])
+ raise NotImplementedError()
@property
def auth_token(self):
- """ Returns the token_id associated with the auth request, to be used
+ """Returns the token_id associated with the auth request, to be used
in headers for authenticating OpenStack API requests.
:returns: str
"""
- return self['token'].get('id', None)
+ raise NotImplementedError()
+
+ @property
+ def expires(self):
+ """Returns the token expiration (as datetime object)
+
+ :returns: datetime
+ """
+ raise NotImplementedError()
@property
def username(self):
- """ Returns the username associated with the authentication request.
+ """Returns the username associated with the authentication request.
Follows the pattern defined in the V2 API of first looking for 'name',
returning that if available, and falling back to 'username' if name
is unavailable.
:returns: str
"""
- name = self['user'].get('name', None)
- if name:
- return name
- else:
- return self['user'].get('username', None)
+ raise NotImplementedError()
@property
def user_id(self):
- """ Returns the user id associated with the authentication request.
+ """Returns the user id associated with the authentication request.
:returns: str
"""
- return self['user'].get('id', None)
+ raise NotImplementedError()
@property
- def tenant_name(self):
- """ Returns the tenant (project) name associated with the
- authentication request.
+ def user_domain_id(self):
+ """Returns the domain id of the user associated with the authentication
+ request.
+ For v2, it always returns 'default' which maybe different from the
+ Keystone configuration.
:returns: str
"""
- tenant_dict = self['token'].get('tenant', None)
- if tenant_dict:
- return tenant_dict.get('name', None)
- return None
+ raise NotImplementedError()
+
+ @property
+ def domain_name(self):
+ """Returns the domain name associated with the authentication token.
+
+ :returns: str or None (if no domain associated with the token)
+ """
+ raise NotImplementedError()
+
+ @property
+ def domain_id(self):
+ """Returns the domain id associated with the authentication token.
+
+ :returns: str or None (if no domain associated with the token)
+ """
+ raise NotImplementedError()
@property
def project_name(self):
- """ Synonym for tenant_name """
- return self.tenant_name
+ """Returns the project name associated with the authentication request.
+
+ :returns: str or None (if no project associated with the token)
+ """
+ raise NotImplementedError()
+
+ @property
+ def tenant_name(self):
+ """Synonym for project_name"""
+ return self.project_name
@property
def scoped(self):
""" Returns true if the authorization token was scoped to a tenant
- (project), and contains a populated service catalog.
+ (project), and contains a populated service catalog.
+
+ This is deprecated, use project_scoped instead.
:returns: bool
"""
- if ('serviceCatalog' in self
- and self['serviceCatalog']
- and 'tenant' in self['token']):
- return True
- return False
+ raise NotImplementedError()
@property
- def tenant_id(self):
- """ Returns the tenant (project) id associated with the authentication
- request, or None if the authentication request wasn't scoped to a
- tenant (project).
+ def project_scoped(self):
+ """ Returns true if the authorization token was scoped to a tenant
+ (project).
- :returns: str
+ :returns: bool
"""
- tenant_dict = self['token'].get('tenant', None)
- if tenant_dict:
- return tenant_dict.get('id', None)
- return None
+ raise NotImplementedError()
+
+ @property
+ def domain_scoped(self):
+ """ Returns true if the authorization token was scoped to a domain.
+
+ :returns: bool
+ """
+ raise NotImplementedError()
@property
def project_id(self):
- """ Synonym for project_id """
- return self.tenant_id
+ """Returns the project ID associated with the authentication
+ request, or None if the authentication request wasn't scoped to a
+ project.
- def _get_identity_endpoint(self, endpoint_type):
- if not self.get('serviceCatalog'):
- return
+ :returns: str or None ((if no project associated with the token)
+ """
+ raise NotImplementedError()
- identity_services = [x for x in self['serviceCatalog']
- if x['type'] == 'identity']
- return tuple(endpoint[endpoint_type]
- for svc in identity_services
- for endpoint in svc['endpoints']
- if endpoint_type in endpoint)
+ @property
+ def tenant_id(self):
+ """Synonym for project_id """
+ return self.project_id
+
+ @property
+ def project_domain_id(self):
+ """Returns the domain id of the project associated with the
+ authentication request.
+
+ For v2, it always returns 'default' which maybe different from the
+ keystone configuration.
+ :returns: str
+ """
+ raise NotImplementedError()
@property
def auth_url(self):
- """ Returns a tuple of URLs from publicURL and adminURL for the service
+ """Returns a tuple of URLs from publicURL and adminURL for the service
'identity' from the service catalog associated with the authorization
request. If the authentication request wasn't scoped to a tenant
(project), this property will return None.
:returns: tuple of urls
"""
- return self._get_identity_endpoint('publicURL')
+ raise NotImplementedError()
@property
def management_url(self):
- """ Returns the first adminURL for 'identity' from the service catalog
+ """Returns the first adminURL for 'identity' from the service catalog
associated with the authorization request, or None if the
authentication request wasn't scoped to a tenant (project).
:returns: tuple of urls
"""
- return self._get_identity_endpoint('adminURL')
+ raise NotImplementedError()
+
+ @property
+ def version(self):
+ """Returns the version of the auth token from identity service.
+
+ :returns: str
+ """
+ return self.get('version')
+
+
+class AccessInfoV2(AccessInfo):
+ """An object for encapsulating a raw v2 auth token from identity
+ service.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(AccessInfo, self).__init__(*args, **kwargs)
+ self.update(version='v2.0')
+ self.service_catalog = service_catalog.ServiceCatalog.factory(
+ resource_dict=self,
+ token=self['token']['id'],
+ region_name=self.get('region_name'))
+
+ @classmethod
+ def is_valid(cls, body, **kwargs):
+ if body:
+ return 'access' in body
+ elif kwargs:
+ return kwargs.get('version') == 'v2.0'
+ else:
+ return False
+
+ def has_service_catalog(self):
+ return 'serviceCatalog' in self
+
+ @property
+ def auth_token(self):
+ return self['token']['id']
+
+ @property
+ def expires(self):
+ return timeutils.parse_isotime(self['token']['expires'])
+
+ @property
+ def username(self):
+ return self['user'].get('name', self['user'].get('username'))
+
+ @property
+ def user_id(self):
+ return self['user']['id']
+
+ @property
+ def user_domain_id(self):
+ return 'default'
+
+ @property
+ def domain_name(self):
+ return None
+
+ @property
+ def domain_id(self):
+ return None
+
+ @property
+ def project_name(self):
+ tenant_dict = self['token'].get('tenant', None)
+ if tenant_dict:
+ return tenant_dict.get('name', None)
+
+ @property
+ def scoped(self):
+ if ('serviceCatalog' in self
+ and self['serviceCatalog']
+ and 'tenant' in self['token']):
+ return True
+ return False
+
+ @property
+ def project_scoped(self):
+ return 'tenant' in self['token']
+
+ @property
+ def domain_scoped(self):
+ return False
+
+ @property
+ def project_id(self):
+ tenant_dict = self['token'].get('tenant', None)
+ if tenant_dict:
+ return tenant_dict.get('id', None)
+ return None
+
+ @property
+ def project_domain_id(self):
+ if self.project_id:
+ return 'default'
+
+ @property
+ def auth_url(self):
+ if self.service_catalog:
+ return self.service_catalog.get_urls(service_type='identity',
+ endpoint_type='publicURL')
+ else:
+ return None
+
+ @property
+ def management_url(self):
+ if self.service_catalog:
+ return self.service_catalog.get_urls(service_type='identity',
+ endpoint_type='adminURL')
+ else:
+ return None
+
+
+class AccessInfoV3(AccessInfo):
+ """An object for encapsulating a raw v3 auth token from identity
+ service.
+ """
+
+ def __init__(self, token, *args, **kwargs):
+ super(AccessInfo, self).__init__(*args, **kwargs)
+ self.update(version='v3')
+ self.service_catalog = service_catalog.ServiceCatalog.factory(
+ resource_dict=self,
+ token=token,
+ region_name=self.get('region_name'))
+ if token:
+ self.update(auth_token=token)
+
+ @classmethod
+ def is_valid(cls, body, **kwargs):
+ if body:
+ return 'token' in body
+ elif kwargs:
+ return kwargs.get('version') == 'v3'
+ else:
+ return False
+
+ def has_service_catalog(self):
+ return 'catalog' in self
+
+ @property
+ def auth_token(self):
+ return self['auth_token']
+
+ @property
+ def expires(self):
+ return timeutils.parse_isotime(self['expires_at'])
+
+ @property
+ def user_id(self):
+ return self['user']['id']
+
+ @property
+ def user_domain_id(self):
+ return self['user']['domain']['id']
+
+ @property
+ def username(self):
+ return self['user']['name']
+
+ @property
+ def domain_name(self):
+ domain = self.get('domain')
+ if domain:
+ return domain['name']
+
+ @property
+ def domain_id(self):
+ domain = self.get('domain')
+ if domain:
+ return domain['id']
+
+ @property
+ def project_id(self):
+ project = self.get('project')
+ if project:
+ return project['id']
+
+ @property
+ def project_domain_id(self):
+ project = self.get('project')
+ if project:
+ return project['domain']['id']
+
+ @property
+ def project_name(self):
+ project = self.get('project')
+ if project:
+ return project['name']
+
+ @property
+ def scoped(self):
+ return ('catalog' in self and self['catalog'] and 'project' in self)
+
+ @property
+ def project_scoped(self):
+ return 'project' in self
+
+ @property
+ def domain_scoped(self):
+ return 'domain' in self
+
+ @property
+ def auth_url(self):
+ if self.service_catalog:
+ return self.service_catalog.get_urls(service_type='identity',
+ endpoint_type='public')
+ else:
+ return None
+
+ @property
+ def management_url(self):
+ if self.service_catalog:
+ return self.service_catalog.get_urls(service_type='identity',
+ endpoint_type='admin')
+ else:
+ return None
diff --git a/keystoneclient/client.py b/keystoneclient/client.py
index bc2cae6..f017bc8 100644
--- a/keystoneclient/client.py
+++ b/keystoneclient/client.py
@@ -48,40 +48,160 @@ class HTTPClient(object):
endpoint=None, token=None, cacert=None, key=None,
cert=None, insecure=False, original_ip=None, debug=False,
auth_ref=None, use_keyring=False, force_new_token=False,
- stale_duration=None):
+ stale_duration=None, user_id=None, user_domain_id=None,
+ user_domain_name=None, domain_id=None, domain_name=None,
+ project_id=None, project_name=None, project_domain_id=None,
+ project_domain_name=None):
"""Construct a new http client
- @param: timeout the request libary timeout in seconds (default None)
+ :param string user_id: User ID for authentication. (optional)
+ :param string username: Username for authentication. (optional)
+ :param string user_domain_id: User's domain ID for authentication.
+ (optional)
+ :param string user_domain_name: User's domain name for authentication.
+ (optional)
+ :param string password: Password for authentication. (optional)
+ :param string domain_id: Domain ID for domain scoping. (optional)
+ :param string domain_name: Domain name for domain scoping. (optional)
+ :param string project_id: Project ID for project scoping. (optional)
+ :param string project_name: Project name for project scoping.
+ (optional)
+ :param string project_domain_id: Project's domain ID for project
+ scoping. (optional)
+ :param string project_domain_name: Project's domain name for project
+ scoping. (optional)
+ :param string auth_url: Identity service endpoint for authorization.
+ :param string region_name: Name of a region to select when choosing an
+ endpoint from the service catalog.
+ :param integer timeout: Allows customization of the timeout for client
+ http requests. (optional)
+ :param string endpoint: A user-supplied endpoint URL for the identity
+ service. Lazy-authentication is possible for
+ API service calls if endpoint is set at
+ instantiation. (optional)
+ :param string token: Token for authentication. (optional)
+ :param string cacert: Path to the Privacy Enhanced Mail (PEM) file
+ which contains the trusted authority X.509
+ certificates needed to established SSL connection
+ with the identity service. (optional)
+ :param string key: Path to the Privacy Enhanced Mail (PEM) file which
+ contains the unencrypted client private key needed
+ to established two-way SSL connection with the
+ identity service. (optional)
+ :param string cert: Path to the Privacy Enhanced Mail (PEM) file which
+ contains the corresponding X.509 client certificate
+ needed to established two-way SSL connection with
+ the identity service. (optional)
+ :param boolean insecure: Does not perform X.509 certificate validation
+ when establishing SSL connection with identity
+ service. default: False (optional)
+ :param string original_ip: The original IP of the requesting user
+ which will be sent to identity service in a
+ 'Forwarded' header. (optional)
+ :param boolean debug: Enables debug logging of all request and
+ responses to identity service.
+ default False (optional)
+ :param dict auth_ref: To allow for consumers of the client to manage
+ their own caching strategy, you may initialize a
+ client with a previously captured auth_reference
+ (token). If there are keyword arguments passed
+ that also exist in auth_ref, the value from the
+ argument will take precedence.
+ :param boolean use_keyring: Enables caching auth_ref into keyring.
+ default: False (optional)
+ :param boolean force_new_token: Keyring related parameter, forces
+ request for new token.
+ default: False (optional)
+ :param integer stale_duration: Gap in seconds to determine if token
+ from keyring is about to expire.
+ default: 30 (optional)
+ :param string tenant_name: Tenant name. (optional)
+ The tenant_name keyword argument is
+ deprecated, use project_name instead.
+ :param string tenant_id: Tenant id. (optional)
+ The tenant_id keyword argument is
+ deprecated, use project_id instead.
"""
- self.version = 'v2.0'
# set baseline defaults
+
+ self.user_id = None
self.username = None
- self.tenant_id = None
- self.tenant_name = None
+ self.user_domain_id = None
+ self.user_domain_name = None
+
+ self.domain_id = None
+ self.domain_name = None
+
+ self.project_id = None
+ self.project_name = None
+ self.project_domain_id = None
+ self.project_domain_name = None
+
self.auth_url = None
self.management_url = None
- if timeout is not None:
- self.timeout = float(timeout)
- else:
- self.timeout = None
+ self.timeout = float(timeout) if timeout is not None else None
+
# if loading from a dictionary passed in via auth_ref,
# load values from AccessInfo parsing that dictionary
- self.auth_ref = access.AccessInfo(**auth_ref) if auth_ref else None
- if self.auth_ref:
+ if auth_ref:
+ self.auth_ref = access.AccessInfo.factory(**auth_ref)
+ self.version = self.auth_ref.version
+ self.user_id = self.auth_ref.user_id
self.username = self.auth_ref.username
- self.tenant_id = self.auth_ref.tenant_id
- self.tenant_name = self.auth_ref.tenant_name
+ self.user_domain_id = self.auth_ref.user_domain_id
+ self.domain_id = self.auth_ref.domain_id
+ self.domain_name = self.auth_ref.domain_name
+ self.project_id = self.auth_ref.project_id
+ self.project_name = self.auth_ref.project_name
+ self.project_domain_id = self.auth_ref.project_domain_id
self.auth_url = self.auth_ref.auth_url[0]
self.management_url = self.auth_ref.management_url[0]
+ self.auth_token = self.auth_ref.auth_token
+ else:
+ self.auth_ref = None
+
# allow override of the auth_ref defaults from explicit
# values provided to the client
- if username:
- self.username = username
+
+ # apply deprecated variables first, so modern variables override them
if tenant_id:
- self.tenant_id = tenant_id
+ self.project_id = tenant_id
if tenant_name:
- self.tenant_name = tenant_name
+ self.project_name = tenant_name
+
+ # user-related attributes
+ self.password = password
+ if user_id:
+ self.user_id = user_id
+ if username:
+ self.username = username
+ if user_domain_id:
+ self.user_domain_id = user_domain_id
+ elif not (user_id or user_domain_name):
+ self.user_domain_id = 'default'
+ if user_domain_name:
+ self.user_domain_name = user_domain_name
+
+ # domain-related attributes
+ if domain_id:
+ self.domain_id = domain_id
+ if domain_name:
+ self.domain_name = domain_name
+
+ # project-related attributes
+ if project_id:
+ self.project_id = project_id
+ if project_name:
+ self.project_name = project_name
+ if project_domain_id:
+ self.project_domain_id = project_domain_id
+ elif not (project_id or project_domain_name):
+ self.project_domain_id = 'default'
+ if project_domain_name:
+ self.project_domain_name = project_domain_name
+
+ # endpoint selection
if auth_url:
self.auth_url = auth_url.rstrip('/')
if token:
@@ -90,9 +210,9 @@ class HTTPClient(object):
self.auth_token_from_user = None
if endpoint:
self.management_url = endpoint.rstrip('/')
- self.password = password
- self.original_ip = original_ip
self.region_name = region_name
+
+ self.original_ip = original_ip
if cacert:
self.verify_cert = cacert
else:
@@ -138,20 +258,55 @@ class HTTPClient(object):
def auth_token(self):
del self.auth_token_from_user
+ @property
+ def service_catalog(self):
+ """Returns this client's service catalog."""
+ return self.auth_ref.service_catalog
+
+ def has_service_catalog(self):
+ """Returns True if this client provides a service catalog."""
+ return self.auth_ref.has_service_catalog()
+
+ @property
+ def tenant_id(self):
+ """Provide read-only backwards compatibility for tenant_id.
+ This is deprecated, use project_id instead.
+ """
+ return self.project_id
+
+ @property
+ def tenant_name(self):
+ """Provide read-only backwards compatibility for tenant_name.
+ This is deprecated, use project_name instead.
+ """
+ return self.project_name
+
def authenticate(self, username=None, password=None, tenant_name=None,
- tenant_id=None, auth_url=None, token=None):
- """ Authenticate user.
+ tenant_id=None, auth_url=None, token=None,
+ user_id=None, domain_name=None, domain_id=None,
+ project_name=None, project_id=None, user_domain_id=None,
+ user_domain_name=None, project_domain_id=None,
+ project_domain_name=None):
+ """Authenticate user.
Uses the data provided at instantiation to authenticate against
- the Keystone server. This may use either a username and password
+ the Identity server. This may use either a username and password
or token for authentication. If a tenant name or id was provided
then the resulting authenticated client will be scoped to that
tenant and contain a service catalog of available endpoints.
With the v2.0 API, if a tenant name or ID is not provided, the
- authenication token returned will be 'unscoped' and limited in
+ authentication token returned will be 'unscoped' and limited in
capabilities until a fully-scoped token is acquired.
+ With the v3 API, if a domain name or id was provided then the resulting
+ authenticated client will be scoped to that domain. If a project name
+ or ID is not provided, and the authenticating user has a default
+ project configured, the authentication token returned will be 'scoped'
+ to the default project. Otherwise, the authentication token returned
+ will be 'unscoped' and limited in capabilities until a fully-scoped
+ token is acquired.
+
If successful, sets the self.auth_ref and self.auth_token with
the returned token. If not already set, will also set
self.management_url from the details provided in the token.
@@ -173,10 +328,18 @@ class HTTPClient(object):
"""
auth_url = auth_url or self.auth_url
+ user_id = user_id or self.user_id
username = username or self.username
password = password or self.password
- tenant_name = tenant_name or self.tenant_name
- tenant_id = tenant_id or self.tenant_id
+
+ user_domain_id = user_domain_id or self.user_domain_id
+ user_domain_name = user_domain_name or self.user_domain_name
+ domain_id = domain_id or self.domain_id
+ domain_name = domain_name or self.domain_name
+ project_id = project_id or tenant_id or self.project_id
+ project_name = project_name or tenant_name or self.project_name
+ project_domain_id = project_domain_id or self.project_domain_id
+ project_domain_name = project_domain_name or self.project_domain_name
if not token:
token = self.auth_token_from_user
@@ -184,21 +347,27 @@ class HTTPClient(object):
self.auth_ref.will_expire_soon(self.stale_duration)):
token = self.auth_ref.auth_token
- (keyring_key, auth_ref) = self.get_auth_ref_from_keyring(auth_url,
- username,
- tenant_name,
- tenant_id,
- token)
+ kwargs = {
+ 'auth_url': auth_url,
+ 'user_id': user_id,
+ 'username': username,
+ 'user_domain_id': user_domain_id,
+ 'user_domain_name': user_domain_name,
+ 'domain_id': domain_id,
+ 'domain_name': domain_name,
+ 'project_id': project_id,
+ 'project_name': project_name,
+ 'project_domain_id': project_domain_id,
+ 'project_domain_name': project_domain_name,
+ 'token': token
+ }
+ (keyring_key, auth_ref) = self.get_auth_ref_from_keyring(**kwargs)
new_token_needed = False
if auth_ref is None or self.force_new_token:
new_token_needed = True
- raw_token = self.get_raw_token_from_identity_service(auth_url,
- username,
- password,
- tenant_name,
- tenant_id,
- token)
- self.auth_ref = access.AccessInfo(**raw_token)
+ kwargs['password'] = password
+ resp, body = self.get_raw_token_from_identity_service(**kwargs)
+ self.auth_ref = access.AccessInfo.factory(resp, body)
else:
self.auth_ref = auth_ref
self.process_token()
@@ -206,23 +375,18 @@ class HTTPClient(object):
self.store_auth_ref_into_keyring(keyring_key)
return True
- def _build_keyring_key(self, auth_url, username, tenant_name,
- tenant_id, token):
- """ Create a unique key for keyring.
+ def _build_keyring_key(self, **kwargs):
+ """Create a unique key for keyring.
Used to store and retrieve auth_ref from keyring.
+ Returns a slash-separated string of values ordered by key name.
+
"""
- keys = [auth_url, username, tenant_name, tenant_id, token]
- for index, key in enumerate(keys):
- if key is None:
- keys[index] = '?'
- keyring_key = '/'.join(keys)
- return keyring_key
+ return '/'.join([kwargs[k] or '?' for k in sorted(kwargs.keys())])
- def get_auth_ref_from_keyring(self, auth_url, username, tenant_name,
- tenant_id, token):
- """ Retrieve auth_ref from keyring.
+ def get_auth_ref_from_keyring(self, **kwargs):
+ """Retrieve auth_ref from keyring.
If auth_ref is found in keyring, (keyring_key, auth_ref) is returned.
Otherwise, (keyring_key, None) is returned.
@@ -234,9 +398,7 @@ class HTTPClient(object):
keyring_key = None
auth_ref = None
if self.use_keyring:
- keyring_key = self._build_keyring_key(auth_url, username,
- tenant_name, tenant_id,
- token)
+ keyring_key = self._build_keyring_key(**kwargs)
try:
auth_ref = keyring.get_password("keystoneclient_auth",
keyring_key)
@@ -252,7 +414,7 @@ class HTTPClient(object):
return (keyring_key, auth_ref)
def store_auth_ref_into_keyring(self, keyring_key):
- """ Store auth_ref into keyring.
+ """Store auth_ref into keyring.
"""
if self.use_keyring:
@@ -264,30 +426,36 @@ class HTTPClient(object):
_logger.warning("Failed to store token into keyring %s" % (e))
def process_token(self):
- """ Extract and process information from the new auth_ref.
+ """Extract and process information from the new auth_ref.
"""
raise NotImplementedError
def get_raw_token_from_identity_service(self, auth_url, username=None,
password=None, tenant_name=None,
- tenant_id=None, token=None):
- """ Authenticate against the Identity API and get a token.
+ tenant_id=None, token=None,
+ user_id=None, user_domain_id=None,
+ user_domain_name=None,
+ domain_id=None, domain_name=None,
+ project_id=None, project_name=None,
+ project_domain_id=None,
+ project_domain_name=None):
+ """Authenticate against the Identity API and get a token.
Not implemented here because auth protocols should be API
version-specific.
Expected to authenticate or validate an existing authentication
reference already associated with the client. Invoking this call
- *always* makes a call to the Keystone.
+ *always* makes a call to the Identity service.
- :returns: ``raw token``
+ :returns: (``resp``, ``body``)
"""
raise NotImplementedError
def _extract_service_catalog(self, url, body):
- """ Set the client's service catalog from the response data.
+ """Set the client's service catalog from the response data.
Not implemented here because data returned may be API
version-specific.
@@ -334,7 +502,7 @@ class HTTPClient(object):
return self.auth_ref.has_service_catalog()
def request(self, url, method, **kwargs):
- """ Send an http request with the specified characteristics.
+ """Send an http request with the specified characteristics.
Wrapper around requests.request to handle tasks such as
setting headers, JSON encoding/decoding, and error handling.
@@ -392,9 +560,10 @@ class HTTPClient(object):
return resp, body
def _cs_request(self, url, method, **kwargs):
- """ Makes an authenticated request to keystone endpoint by
+ """Makes an authenticated request to keystone endpoint by
concatenating self.management_url and url and passing in method and
- any associated kwargs. """
+ any associated kwargs.
+ """
is_management = kwargs.pop('management', True)
diff --git a/keystoneclient/service_catalog.py b/keystoneclient/service_catalog.py
index 2938af3..b73f67a 100644
--- a/keystoneclient/service_catalog.py
+++ b/keystoneclient/service_catalog.py
@@ -23,9 +23,15 @@ from keystoneclient import exceptions
class ServiceCatalog(object):
"""Helper methods for dealing with a Keystone Service Catalog."""
- def __init__(self, resource_dict, region_name=None):
- self.catalog = resource_dict
- self.region_name = region_name
+ @classmethod
+ def factory(cls, resource_dict, token=None, region_name=None):
+ """Create ServiceCatalog object given a auth token."""
+ if ServiceCatalogV3.is_valid(resource_dict):
+ return ServiceCatalogV3(token, resource_dict, region_name)
+ elif ServiceCatalogV2.is_valid(resource_dict):
+ return ServiceCatalogV2(resource_dict, region_name)
+ else:
+ raise NotImplementedError('Unrecognized auth response')
def get_token(self):
"""Fetch token details from service catalog.
@@ -36,8 +42,71 @@ class ServiceCatalog(object):
- `expires`: Token's expiration
- `user_id`: Authenticated user's ID
- `tenant_id`: Authorized project's ID
+ - `domain_id`: Authorized domain's ID
+
+ """
+ raise NotImplementedError()
+
+ def get_endpoints(self, service_type=None, endpoint_type=None):
+ """Fetch and filter endpoints for the specified service(s).
+
+ Returns endpoints for the specified service (or all) and
+ that contain the specified type (or all).
+ """
+ raise NotImplementedError()
+
+ def get_urls(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='publicURL'):
+ """Fetch endpoint urls from the service catalog.
+
+ Fetch the endpoints from the service catalog for a particular
+ endpoint attribute. If no attribute is given, return the first
+ endpoint of the specified type.
+
+ :param string attr: Endpoint attribute name.
+ :param string filter_value: Endpoint attribute value.
+ :param string service_type: Service type of the endpoint.
+ :param string endpoint_type: Type of endpoint.
+ Possible values: public or publicURL,
+ internal or internalURL,
+ admin or adminURL
+ :param string region_name: Region of the endpoint.
+
+ :returns: tuple of urls or None (if no match found)
+ """
+ raise NotImplementedError()
+
+ def url_for(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='publicURL'):
+ """Fetch an endpoint from the service catalog.
+
+ Fetch the specified endpoint from the service catalog for
+ a particular endpoint attribute. If no attribute is given, return
+ the first endpoint of the specified type.
+ Valid endpoint types: `public` or `publicURL`,
+ `internal` or `internalURL`,
+ `admin` or 'adminURL`
"""
+ raise NotImplementedError()
+
+
+class ServiceCatalogV2(ServiceCatalog):
+ """An object for encapsulating the service catalog using raw v2 auth token
+ from Keystone."""
+
+ def __init__(self, resource_dict, region_name=None):
+ self.catalog = resource_dict
+ self.region_name = region_name
+
+ @classmethod
+ def is_valid(cls, resource_dict):
+ # This class is also used for reading token info of an unscoped token.
+ # Unscoped token does not have 'serviceCatalog' in V2, checking this
+ # will not work. Use 'token' attribute instead.
+ return 'token' in resource_dict
+
+ def get_token(self):
token = {'id': self.catalog['token']['id'],
'expires': self.catalog['token']['expires']}
try:
@@ -48,23 +117,50 @@ class ServiceCatalog(object):
pass
return token
- def url_for(self, attr=None, filter_value=None,
- service_type='identity', endpoint_type='publicURL'):
- """Fetch an endpoint from the service catalog.
+ def get_endpoints(self, service_type=None, endpoint_type=None):
+ if endpoint_type and 'URL' not in endpoint_type:
+ endpoint_type = endpoint_type + 'URL'
- Fetch the specified endpoint from the service catalog for
- a particular endpoint attribute. If no attribute is given, return
- the first endpoint of the specified type.
+ sc = {}
+ for service in self.catalog.get('serviceCatalog', []):
+ if service_type and service_type != service['type']:
+ continue
+ sc[service['type']] = []
+ for endpoint in service['endpoints']:
+ if endpoint_type and endpoint_type not in endpoint.keys():
+ continue
+ sc[service['type']].append(endpoint)
+ return sc
- Valid endpoint types: `publicURL`, `internalURL`, `adminURL`
+ def get_urls(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='publicURL'):
+ sc_endpoints = self.get_endpoints(service_type=service_type,
+ endpoint_type=endpoint_type)
+ endpoints = sc_endpoints.get(service_type)
+ if not endpoints:
+ return
- See tests for a sample service catalog.
- """
+ if endpoint_type and 'URL' not in endpoint_type:
+ endpoint_type = endpoint_type + 'URL'
+
+ return tuple(endpoint[endpoint_type]
+ for endpoint in endpoints
+ if (endpoint_type in endpoint
+ and (not self.region_name
+ or endpoint.get('region') == self.region_name)
+ and (not filter_value
+ or endpoint.get(attr) == filter_value)))
+
+ def url_for(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='publicURL'):
catalog = self.catalog.get('serviceCatalog', [])
if not catalog:
raise exceptions.EmptyCatalog('The service catalog is empty.')
+ if 'URL' not in endpoint_type:
+ endpoint_type = endpoint_type + 'URL'
+
for service in catalog:
if service['type'] != service_type:
continue
@@ -80,19 +176,96 @@ class ServiceCatalog(object):
raise exceptions.EndpointNotFound('%s endpoint for %s not found.' %
(endpoint_type, service_type))
- def get_endpoints(self, service_type=None, endpoint_type=None):
- """Fetch and filter endpoints for the specified service(s).
- Returns endpoints for the specified service (or all) and
- that contain the specified type (or all).
- """
+class ServiceCatalogV3(ServiceCatalog):
+ """An object for encapsulating the service catalog using raw v3 auth token
+ from Keystone."""
+
+ def __init__(self, token, resource_dict, region_name=None):
+ self._auth_token = token
+ self.catalog = resource_dict
+ self.region_name = region_name
+
+ @classmethod
+ def is_valid(cls, resource_dict):
+ # This class is also used for reading token info of an unscoped token.
+ # Unscoped token does not have 'catalog', checking this
+ # will not work. Use 'methods' attribute instead.
+ return 'methods' in resource_dict
+
+ def get_token(self):
+ token = {'id': self._auth_token,
+ 'expires': self.catalog['expires_at']}
+ try:
+ token['user_id'] = self.catalog['user']['id']
+ domain = self.catalog.get('domain')
+ if domain:
+ token['domain_id'] = domain['id']
+ project = self.catalog.get('project')
+ if project:
+ token['tenant_id'] = project['id']
+ except Exception:
+ # just leave the domain, project and user out if it doesn't exist
+ pass
+ return token
+
+ def get_endpoints(self, service_type=None, endpoint_type=None):
+ if endpoint_type:
+ endpoint_type = endpoint_type.rstrip('URL')
sc = {}
- for service in self.catalog.get('serviceCatalog', []):
+ for service in self.catalog.get('catalog', []):
if service_type and service_type != service['type']:
continue
sc[service['type']] = []
for endpoint in service['endpoints']:
- if endpoint_type and endpoint_type not in endpoint.keys():
+ if endpoint_type and endpoint_type != endpoint['interface']:
continue
sc[service['type']].append(endpoint)
return sc
+
+ def get_urls(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='public'):
+ if endpoint_type:
+ endpoint_type = endpoint_type.rstrip('URL')
+ sc_endpoints = self.get_endpoints(service_type=service_type,
+ endpoint_type=endpoint_type)
+ endpoints = sc_endpoints.get(service_type)
+ if not endpoints:
+ return None
+
+ urls = list()
+ for endpoint in endpoints:
+ if (endpoint['interface'] == endpoint_type
+ and (not self.region_name
+ or endpoint.get('region') == self.region_name)
+ and (not filter_value
+ or endpoint.get(attr) == filter_value)):
+ urls.append(endpoint['url'])
+ return tuple(urls)
+
+ def url_for(self, attr=None, filter_value=None,
+ service_type='identity', endpoint_type='public'):
+ catalog = self.catalog.get('catalog', [])
+
+ if not catalog:
+ raise exceptions.EmptyCatalog('The service catalog is empty.')
+
+ if endpoint_type:
+ endpoint_type = endpoint_type.rstrip('URL')
+
+ for service in catalog:
+ if service['type'] != service_type:
+ continue
+
+ endpoints = service['endpoints']
+ for endpoint in endpoints:
+ if endpoint.get('interface') != endpoint_type:
+ continue
+ if (self.region_name and
+ endpoint.get('region') != self.region_name):
+ continue
+ if not filter_value or endpoint.get(attr) == filter_value:
+ return endpoint['url']
+
+ raise exceptions.EndpointNotFound('%s endpoint for %s not found.' %
+ (endpoint_type, service_type))
diff --git a/keystoneclient/v2_0/client.py b/keystoneclient/v2_0/client.py
index 9f7553f..504945a 100644
--- a/keystoneclient/v2_0/client.py
+++ b/keystoneclient/v2_0/client.py
@@ -48,16 +48,21 @@ class Client(client.HTTPClient):
:param string original_ip: The original IP of the requesting user
which will be sent to Keystone in a
'Forwarded' header. (optional)
- :param string cert: If provided, used as a local certificate to communicate
- with the keystone endpoint. If provided, requires the
- additional parameter key. (optional)
- :param string key: The key associated with the certificate for secure
- keystone communication. (optional)
- :param string cacert: the ca-certs to verify the secure communications
- with keystone. (optional)
- :param boolean insecure: If using an SSL endpoint, allows for the certicate
- to be unsigned - does not verify the certificate
- chain. default: False (optional)
+ :param string cert: Path to the Privacy Enhanced Mail (PEM) file which
+ contains the corresponding X.509 client certificate
+ needed to established two-way SSL connection with
+ the identity service. (optional)
+ :param string key: Path to the Privacy Enhanced Mail (PEM) file which
+ contains the unencrypted client private key needed
+ to established two-way SSL connection with the
+ identity service. (optional)
+ :param string cacert: Path to the Privacy Enhanced Mail (PEM) file which
+ contains the trusted authority X.509 certificates
+ needed to established SSL connection with the
+ identity service. (optional)
+ :param boolean insecure: Does not perform X.509 certificate validation
+ when establishing SSL connection with identity
+ service. default: False (optional)
:param dict auth_ref: To allow for consumers of the client to manage their
own caching strategy, you may initialize a client
with a previously captured auth_reference (token)
@@ -119,6 +124,7 @@ class Client(client.HTTPClient):
def __init__(self, **kwargs):
""" Initialize a new client for the Keystone v2.0 API. """
super(Client, self).__init__(**kwargs)
+ self.version = 'v2.0'
self.endpoints = endpoints.EndpointManager(self)
self.roles = roles.RoleManager(self)
self.services = services.ServiceManager(self)
@@ -140,28 +146,33 @@ class Client(client.HTTPClient):
# if we got a response without a service catalog, set the local
# list of tenants for introspection, and leave to client user
# to determine what to do. Otherwise, load up the service catalog
- if self.auth_ref.scoped:
+ if self.auth_ref.project_scoped:
if not self.auth_ref.tenant_id:
raise exceptions.AuthorizationFailure(
"Token didn't provide tenant_id")
- if not self.auth_ref.user_id:
- raise exceptions.AuthorizationFailure(
- "Token didn't provide user_id")
if self.management_url is None and self.auth_ref.management_url:
self.management_url = self.auth_ref.management_url[0]
- self.tenant_name = self.auth_ref.tenant_name
- self.tenant_id = self.auth_ref.tenant_id
- self.user_id = self.auth_ref.user_id
+ self.project_name = self.auth_ref.tenant_name
+ self.project_id = self.auth_ref.tenant_id
- self.auth_user_id = self.auth_ref.user_id
+ if not self.auth_ref.user_id:
+ raise exceptions.AuthorizationFailure(
+ "Token didn't provide user_id")
+
+ self.user_id = self.auth_ref.user_id
+
+ self.auth_domain_id = self.auth_ref.domain_id
self.auth_tenant_id = self.auth_ref.tenant_id
+ self.auth_user_id = self.auth_ref.user_id
def get_raw_token_from_identity_service(self, auth_url, username=None,
password=None, tenant_name=None,
- tenant_id=None, token=None):
- """ Authenticate against the Keystone API.
+ tenant_id=None, token=None,
+ project_name=None, project_id=None,
+ **kwargs):
+ """ Authenticate against the v2 Identity API.
- :returns: ``raw token`` if authentication was successful.
+ :returns: (``resp``, ``body``) if authentication was successful.
:raises: AuthorizationFailure if unable to authenticate or validate
the existing authorization token
:raises: ValueError if insufficient parameters are used.
@@ -170,8 +181,8 @@ class Client(client.HTTPClient):
try:
return self._base_authN(auth_url,
username=username,
- tenant_id=tenant_id,
- tenant_name=tenant_name,
+ tenant_id=project_id or tenant_id,
+ tenant_name=project_name or tenant_name,
password=password,
token=token)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
@@ -202,4 +213,4 @@ class Client(client.HTTPClient):
elif tenant_name:
params['auth']['tenantName'] = tenant_name
resp, body = self.request(url, 'POST', body=params, headers=headers)
- return body['access']
+ return resp, body
diff --git a/keystoneclient/v3/client.py b/keystoneclient/v3/client.py
index c0b613e..6c11ab1 100644
--- a/keystoneclient/v3/client.py
+++ b/keystoneclient/v3/client.py
@@ -15,10 +15,11 @@
import json
import logging
+from keystoneclient import exceptions
from keystoneclient.v2_0 import client
from keystoneclient.v3 import credentials
-from keystoneclient.v3 import endpoints
from keystoneclient.v3 import domains
+from keystoneclient.v3 import endpoints
from keystoneclient.v3 import groups
from keystoneclient.v3 import policies
from keystoneclient.v3 import projects
@@ -33,40 +34,60 @@ _logger = logging.getLogger(__name__)
class Client(client.Client):
"""Client for the OpenStack Identity API v3.
+ :param string user_id: User ID for authentication. (optional)
:param string username: Username for authentication. (optional)
+ :param string user_domain_id: User's domain ID for authentication.
+ (optional)
+ :param string user_domain_name: User's domain name for authentication.
+ (optional)
:param string password: Password for authentication. (optional)
:param string token: Token for authentication. (optional)
- :param string tenant_name: Tenant id. (optional)
- :param string tenant_id: Tenant name. (optional)
- :param string auth_url: Keystone service endpoint for authorization.
+ :param string domain_id: Domain ID for domain scoping. (optional)
+ :param string domain_name: Domain name for domain scoping. (optional)
+ :param string project_id: Project ID for project scoping. (optional)
+ :param string project_name: Project name for project scoping. (optional)
+ :param string project_domain_id: Project's domain ID for project
+ scoping. (optional)
+ :param string project_domain_name: Project's domain name for project
+ scoping. (optional)
+ :param string tenant_name: Tenant name. (optional)
+ The tenant_name keyword argument is deprecated,
+ use project_name instead.
+ :param string tenant_id: Tenant id. (optional)
+ The tenant_id keyword argument is deprecated,
+ use project_id instead.
+ :param string auth_url: Identity service endpoint for authorization.
:param string region_name: Name of a region to select when choosing an
endpoint from the service catalog.
- :param string endpoint: A user-supplied endpoint URL for the keystone
+ :param string endpoint: A user-supplied endpoint URL for the identity
service. Lazy-authentication is possible for API
service calls if endpoint is set at
- instantiation.(optional)
+ instantiation. (optional)
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
Example::
>>> from keystoneclient.v3 import client
- >>> keystone = client.Client(username=USER,
+ >>> keystone = client.Client(user_domain_name=DOMAIN_NAME,
+ ... username=USER,
... password=PASS,
- ... tenant_name=TENANT_NAME,
+ ... project_domain_name=PROJECT_DOMAIN_NAME,
+ ... project_name=PROJECT_NAME,
... auth_url=KEYSTONE_URL)
...
- >>> keystone.tenants.list()
+ >>> keystone.projects.list()
...
>>> user = keystone.users.get(USER_ID)
>>> user.delete()
"""
- def __init__(self, endpoint=None, **kwargs):
- """ Initialize a new client for the Keystone v3.0 API. """
- super(Client, self).__init__(endpoint=endpoint, **kwargs)
+ def __init__(self, **kwargs):
+ """ Initialize a new client for the Keystone v3 API. """
+ super(Client, self).__init__(**kwargs)
+ self.version = 'v3'
self.credentials = credentials.CredentialManager(self)
self.endpoints = endpoints.EndpointManager(self)
self.domains = domains.DomainManager(self)
@@ -77,12 +98,133 @@ class Client(client.Client):
self.services = services.ServiceManager(self)
self.users = users.UserManager(self)
- # NOTE(gabriel): If we have a pre-defined endpoint then we can
- # get away with lazy auth. Otherwise auth immediately.
- if endpoint:
- self.management_url = endpoint
- else:
- self.authenticate()
-
def serialize(self, entity):
return json.dumps(entity, sort_keys=True)
+
+ def process_token(self):
+ """ Extract and process information from the new auth_ref.
+
+ And set the relevant authentication information.
+ """
+ super(Client, self).process_token()
+ if self.auth_ref.domain_scoped:
+ if not self.auth_ref.domain_id:
+ raise exceptions.AuthorizationFailure(
+ "Token didn't provide domain_id")
+ if self.management_url is None and self.auth_ref.management_url:
+ self.management_url = self.auth_ref.management_url[0]
+ self.domain_name = self.auth_ref.domain_name
+ self.domain_id = self.auth_ref.domain_id
+
+ def get_raw_token_from_identity_service(self, auth_url, user_id=None,
+ username=None,
+ user_domain_id=None,
+ user_domain_name=None,
+ password=None,
+ domain_id=None, domain_name=None,
+ project_id=None, project_name=None,
+ project_domain_id=None,
+ project_domain_name=None,
+ token=None, **kwargs):
+ """ Authenticate against the v3 Identity API.
+
+ :returns: (``resp``, ``body``) if authentication was successful.
+ :raises: AuthorizationFailure if unable to authenticate or validate
+ the existing authorization token
+ :raises: Unauthorized if authentication fails due to invalid token
+
+ """
+ try:
+ return self._do_auth(
+ auth_url,
+ user_id=user_id,
+ username=username,
+ user_domain_id=user_domain_id,
+ user_domain_name=user_domain_name,
+ password=password,
+ domain_id=domain_id,
+ domain_name=domain_name,
+ project_id=project_id,
+ project_name=project_name,
+ project_domain_id=project_domain_id,
+ project_domain_name=project_domain_name,
+ token=token)
+ except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
+ _logger.debug('Authorization failed.')
+ raise
+ except Exception as e:
+ raise exceptions.AuthorizationFailure('Authorization failed: '
+ '%s' % e)
+
+ def _do_auth(self, auth_url, user_id=None, username=None,
+ user_domain_id=None, user_domain_name=None, password=None,
+ domain_id=None, domain_name=None,
+ project_id=None, project_name=None, project_domain_id=None,
+ project_domain_name=None, token=None):
+ headers = {}
+ url = auth_url + "/auth/tokens"
+ body = {'auth': {'identity': {}}}
+ ident = body['auth']['identity']
+
+ if token:
+ headers['X-Auth-Token'] = token
+
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = token
+
+ if password:
+ ident['methods'] = ['password']
+ ident['password'] = {}
+ ident['password']['user'] = {}
+ user = ident['password']['user']
+ user['password'] = password
+
+ if user_id:
+ user['id'] = user_id
+ elif username:
+ user['name'] = username
+
+ if user_domain_id or user_domain_name:
+ user['domain'] = {}
+ if user_domain_id:
+ user['domain']['id'] = user_domain_id
+ elif user_domain_name:
+ user['domain']['name'] = user_domain_name
+
+ if (domain_id or domain_name) and (project_id or project_name):
+ raise ValueError('Authentication cannot be scoped to both domain'
+ ' and project.')
+
+ if domain_id or domain_name:
+ body['auth']['scope'] = {}
+ scope = body['auth']['scope']
+ scope['domain'] = {}
+
+ if domain_id:
+ scope['domain']['id'] = domain_id
+ elif domain_name:
+ scope['domain']['name'] = domain_name
+
+ if project_id or project_name:
+ body['auth']['scope'] = {}
+ scope = body['auth']['scope']
+ scope['project'] = {}
+
+ if project_id:
+ scope['project']['id'] = project_id
+ elif project_name:
+ scope['project']['name'] = project_name
+
+ if project_domain_id or project_domain_name:
+ scope['project']['domain'] = {}
+ if project_domain_id:
+ scope['project']['domain']['id'] = project_domain_id
+ elif project_domain_name:
+ scope['project']['domain']['name'] = project_domain_name
+
+ if not (ident or token):
+ raise ValueError('Authentication method required (e.g. password)')
+
+ resp, body = self.request(url, 'POST', body=body, headers=headers)
+ return resp, body
diff --git a/tests/client_fixtures.py b/tests/client_fixtures.py
deleted file mode 100644
index f0b5137..0000000
--- a/tests/client_fixtures.py
+++ /dev/null
@@ -1,72 +0,0 @@
-UNSCOPED_TOKEN = {
- u'access': {u'serviceCatalog': {},
- u'token': {u'expires': u'2012-10-03T16:58:01Z',
- u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
- u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
- u'name': u'exampleuser',
- u'roles': [],
- u'roles_links': [],
- u'username': u'exampleuser'}
- }
-}
-
-PROJECT_SCOPED_TOKEN = {
- u'access': {
- u'serviceCatalog': [{
- u'endpoints': [{
- u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'internalURL':
- u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'publicURL':
- u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
- u'region': u'RegionOne'
- }],
- u'endpoints_links': [],
- u'name': u'Volume Service',
- u'type': u'volume'},
- {u'endpoints': [{
- u'adminURL': u'http://admin:9292/v1',
- u'internalURL': u'http://internal:9292/v1',
- u'publicURL': u'http://public.com:9292/v1',
- u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Image Service',
- u'type': u'image'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Compute Service',
- u'type': u'compute'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:8773/services/Admin',
-u'internalURL': u'http://internal:8773/services/Cloud',
-u'publicURL': u'http://public.com:8773/services/Cloud',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'EC2 Service',
- u'type': u'ec2'},
- {u'endpoints': [{
-u'adminURL': u'http://admin:35357/v2.0',
-u'internalURL': u'http://internal:5000/v2.0',
-u'publicURL': u'http://public.com:5000/v2.0',
-u'region': u'RegionOne'}],
- u'endpoints_links': [],
- u'name': u'Identity Service',
- u'type': u'identity'}],
- u'token': {u'expires': u'2012-10-03T16:53:36Z',
- u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
- u'tenant': {u'description': u'',
- u'enabled': True,
- u'id': u'225da22d3ce34b15877ea70b2a575f58',
- u'name': u'exampleproject'}},
- u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
- u'name': u'exampleuser',
- u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
- u'name': u'Member'}],
- u'roles_links': [],
- u'username': u'exampleuser'}
- }
-}
diff --git a/tests/fakes.py b/tests/fakes.py
index feafd52..de7ecff 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -65,37 +65,46 @@ class FakeClient(object):
def authenticate(self, cl_obj):
cl_obj.user_id = '1'
cl_obj.auth_user_id = '1'
- cl_obj.tenant_id = '1'
+ cl_obj.project_id = '1'
cl_obj.auth_tenant_id = '1'
- cl_obj.auth_ref = access.AccessInfo({
- "token": {
- "expires": "2012-02-05T00:00:00",
- "id": "887665443383838",
- "tenant": {
+ cl_obj.auth_ref = access.AccessInfo.factory(None, {
+ "access": {
+ "token": {
+ "expires": "2012-02-05T00:00:00",
+ "id": "887665443383838",
+ "tenant": {
+ "id": "1",
+ "name": "customer-x"
+ }
+ },
+ "serviceCatalog": [{
+ "endpoints": [{
+ "adminURL": "http://swift.admin-nets.local:8080/",
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
+ "publicURL":
+ "http://swift.publicinternets.com/v1/AUTH_1"
+ }],
+ "type": "object-store",
+ "name": "swift"
+ }, {
+ "endpoints": [{
+ "adminURL": "http://cdn.admin-nets.local/v1.1/1",
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:7777/v1.1/1",
+ "publicURL": "http://cdn.publicinternets.com/v1.1/1"
+ }],
+ "type": "object-store",
+ "name": "cdn"
+ }],
+ "user": {
"id": "1",
- "name": "customer-x"}},
- "serviceCatalog": [
- {"endpoints": [
- {"adminURL": "http://swift.admin-nets.local:8080/",
- "region": "RegionOne",
- "internalURL": "http://127.0.0.1:8080/v1/AUTH_1",
- "publicURL":
- "http://swift.publicinternets.com/v1/AUTH_1"}],
- "type": "object-store",
- "name": "swift"},
- {"endpoints": [
- {"adminURL": "http://cdn.admin-nets.local/v1.1/1",
- "region": "RegionOne",
- "internalURL": "http://127.0.0.1:7777/v1.1/1",
- "publicURL": "http://cdn.publicinternets.com/v1.1/1"}],
- "type": "object-store",
- "name": "cdn"}],
- "user": {
- "id": "1",
- "roles": [
- {"tenantId": "1",
- "id": "3",
- "name": "Member"}],
- "name": "joeuser"}
- }
- )
+ "roles": [{
+ "tenantId": "1",
+ "id": "3",
+ "name": "Member"
+ }],
+ "name": "joeuser"
+ }
+ }
+ })
diff --git a/tests/test_client.py b/tests/test_client.py
deleted file mode 100644
index d24d72a..0000000
--- a/tests/test_client.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import json
-import mock
-
-import requests
-
-from keystoneclient.v2_0 import client
-from tests import client_fixtures
-from tests import utils
-
-
-fake_response = utils.TestResponse({
- "status_code": 200,
- "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
-})
-mock_request = mock.Mock(return_value=(fake_response))
-
-
-class KeystoneclientTest(utils.TestCase):
-
- def test_scoped_init(self):
- with mock.patch.object(requests, "request", mock_request):
- cl = client.Client(username='exampleuser',
- password='password',
- auth_url='http://somewhere/')
- self.assertIsNotNone(cl.auth_ref)
- self.assertTrue(cl.auth_ref.scoped)
-
- def test_auth_ref_load(self):
- with mock.patch.object(requests, "request", mock_request):
- cl = client.Client(username='exampleuser',
- password='password',
- auth_url='http://somewhere/')
- cache = json.dumps(cl.auth_ref)
- new_client = client.Client(auth_ref=json.loads(cache))
- self.assertIsNotNone(new_client.auth_ref)
- self.assertTrue(new_client.auth_ref.scoped)
- self.assertEquals(new_client.username, 'exampleuser')
- self.assertIsNone(new_client.password)
- self.assertEqual(new_client.management_url,
- 'http://admin:35357/v2.0')
diff --git a/tests/test_https.py b/tests/test_https.py
index bef1db4..4070df9 100644
--- a/tests/test_https.py
+++ b/tests/test_https.py
@@ -68,3 +68,27 @@ class ClientTest(utils.TestCase):
headers=headers,
data='[1, 2, 3]',
**kwargs)
+
+ def test_post_auth(self):
+ with mock.patch.object(requests, "request", MOCK_REQUEST):
+ cl = client.HTTPClient(
+ username="username", password="password", tenant_id="tenant",
+ auth_url="auth_test", cacert="ca.pem", key="key.pem",
+ cert="cert.pem")
+ cl.management_url = "https://127.0.0.1:5000"
+ cl.auth_token = "token"
+ cl.post("/hi", body=[1, 2, 3])
+ headers = {
+ "X-Auth-Token": "token",
+ "Content-Type": "application/json",
+ "User-Agent": cl.USER_AGENT
+ }
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['cert'] = ('cert.pem', 'key.pem')
+ kwargs['verify'] = 'ca.pem'
+ MOCK_REQUEST.assert_called_with(
+ "POST",
+ "https://127.0.0.1:5000/hi",
+ headers=headers,
+ data='[1, 2, 3]',
+ **kwargs)
diff --git a/tests/test_keyring.py b/tests/test_keyring.py
index 9ac8d75..8aaa667 100644
--- a/tests/test_keyring.py
+++ b/tests/test_keyring.py
@@ -4,7 +4,7 @@ import unittest
from keystoneclient import access
from keystoneclient import client
from keystoneclient.openstack.common import timeutils
-from tests import client_fixtures
+from tests.v2_0 import client_fixtures
from tests import utils
try:
@@ -65,11 +65,12 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertIsNone(keyring_key)
self.assertIsNone(auth_ref)
@@ -78,49 +79,57 @@ class KeyringTest(utils.TestCase):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertEqual(keyring_key,
'%s/%s/%s/%s/%s' %
- (AUTH_URL, USERNAME, TENANT, TENANT_ID, TOKEN))
+ (AUTH_URL, TENANT_ID, TENANT, TOKEN, USERNAME))
def test_set_and_get_keyring_expired(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
- cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expired = timeutils.utcnow() - datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expired)
cl.store_auth_ref_into_keyring(keyring_key)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertIsNone(auth_ref)
def test_set_and_get_keyring(self):
cl = client.HTTPClient(username=USERNAME, password=PASSWORD,
tenant_id=TENANT_ID, auth_url=AUTH_URL,
use_keyring=True)
- keyring_key = cl._build_keyring_key(AUTH_URL, USERNAME,
- TENANT, TENANT_ID,
- TOKEN)
+ keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
- cl.auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
expires = timeutils.utcnow() + datetime.timedelta(minutes=30)
cl.auth_ref['token']['expires'] = timeutils.isotime(expires)
cl.store_auth_ref_into_keyring(keyring_key)
- (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(AUTH_URL,
- USERNAME,
- TENANT,
- TENANT_ID,
- TOKEN)
+ (keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
+ auth_url=AUTH_URL,
+ username=USERNAME,
+ tenant_name=TENANT,
+ tenant_id=TENANT_ID,
+ token=TOKEN)
self.assertEqual(auth_ref.auth_token, TOKEN)
self.assertEqual(auth_ref.username, USERNAME)
diff --git a/tests/test_service_catalog.py b/tests/test_service_catalog.py
deleted file mode 100644
index 09ac19c..0000000
--- a/tests/test_service_catalog.py
+++ /dev/null
@@ -1,144 +0,0 @@
-from keystoneclient import exceptions
-from keystoneclient import service_catalog
-from tests import utils
-
-
-# Taken directly from keystone/content/common/samples/auth.json
-# Do not edit this structure. Instead, grab the latest from there.
-
-SERVICE_CATALOG = {
- "access": {
- "token": {
- "id": "ab48a9efdfedb23ty3494",
- "expires": "2010-11-01T03:32:15-05:00",
- "tenant": {
- "id": "345",
- "name": "My Project"
- }
- },
- "user": {
- "id": "123",
- "name": "jqsmith",
- "roles": [
- {"id": "234",
- "name": "compute:admin"},
- {"id": "235",
- "name": "object-store:admin",
- "tenantId": "1"}
- ],
- "roles_links": []
- },
- "serviceCatalog": [{
- "name": "Cloud Servers",
- "type": "compute",
- "endpoints": [
- {"tenantId": "1",
- "publicURL": "https://compute.north.host/v1/1234",
- "internalURL": "https://compute.north.host/v1/1234",
- "region": "North",
- "versionId": "1.0",
- "versionInfo": "https://compute.north.host/v1.0/",
- "versionList": "https://compute.north.host/"},
- {"tenantId": "2",
- "publicURL": "https://compute.north.host/v1.1/3456",
- "internalURL": "https://compute.north.host/v1.1/3456",
- "region": "North",
- "versionId": "1.1",
- "versionInfo": "https://compute.north.host/v1.1/",
- "versionList": "https://compute.north.host/"}
- ],
- "endpoints_links": []
- },
- {
- "name": "Cloud Files",
- "type": "object-store",
- "endpoints": [
- {"tenantId": "11",
- "publicURL": "https://compute.north.host/v1/blah-blah",
- "internalURL": "https://compute.north.host/v1/blah-blah",
- "region": "South",
- "versionId": "1.0",
- "versionInfo": "uri",
- "versionList": "uri"},
- {"tenantId": "2",
- "publicURL": "https://compute.north.host/v1.1/blah-blah",
- "internalURL":
- "https://compute.north.host/v1.1/blah-blah",
- "region": "South",
- "versionId": "1.1",
- "versionInfo": "https://compute.north.host/v1.1/",
- "versionList": "https://compute.north.host/"}
- ],
- "endpoints_links": [{
- "rel": "next",
- "href": "https://identity.north.host/v2.0/"
- "endpoints?marker=2"
- }]
- },
- {
- "name": "Image Servers",
- "type": "image",
- "endpoints": [
- {"publicURL": "https://image.north.host/v1/",
- "internalURL": "https://image-internal.north.host/v1/",
- "region": "North"},
- {"publicURL": "https://image.south.host/v1/",
- "internalURL": "https://image-internal.south.host/v1/",
- "region": "South"}
- ],
- "endpoints_links": []
- },
- ],
- "serviceCatalog_links": [{
- "rel": "next",
- "href": ("https://identity.host/v2.0/endpoints?"
- "session=2hfh8Ar&marker=2")
- }]
- }
-}
-
-
-class ServiceCatalogTest(utils.TestCase):
- def test_building_a_service_catalog(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
-
- self.assertEquals(sc.url_for(service_type='compute'),
- "https://compute.north.host/v1/1234")
- self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
- "https://compute.north.host/v1/1234")
- self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
- "https://compute.north.host/v1.1/3456")
-
- self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
- "South", service_type='compute')
-
- def test_service_catalog_endpoints(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
- public_ep = sc.get_endpoints(service_type='compute',
- endpoint_type='publicURL')
- self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
- self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
- self.assertEquals(public_ep['compute'][1]['internalURL'],
- "https://compute.north.host/v1.1/3456")
-
- def test_service_catalog_regions(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
- region_name="North")
- url = sc.url_for(service_type='image', endpoint_type='publicURL')
- self.assertEquals(url, "https://image.north.host/v1/")
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'],
- region_name="South")
- url = sc.url_for(service_type='image', endpoint_type='internalURL')
- self.assertEquals(url, "https://image-internal.south.host/v1/")
-
- def test_token(self):
- sc = service_catalog.ServiceCatalog(SERVICE_CATALOG['access'])
-
- self.assertEquals(sc.get_token(), {
- 'id': 'ab48a9efdfedb23ty3494',
- 'tenant_id': '345',
- 'user_id': '123',
- 'expires': '2010-11-01T03:32:15-05:00'})
- self.assertEquals(sc.catalog['token']['expires'],
- "2010-11-01T03:32:15-05:00")
- self.assertEquals(sc.catalog['token']['tenant']['id'], '345')
diff --git a/tests/utils.py b/tests/utils.py
index 9d9bf8d..264aceb 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -8,6 +8,8 @@ from keystoneclient.v2_0 import client
class TestCase(testtools.TestCase):
+ TEST_DOMAIN_ID = '1'
+ TEST_DOMAIN_NAME = 'aDomain'
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
diff --git a/tests/v2_0/client_fixtures.py b/tests/v2_0/client_fixtures.py
new file mode 100644
index 0000000..70bb43f
--- /dev/null
+++ b/tests/v2_0/client_fixtures.py
@@ -0,0 +1,163 @@
+UNSCOPED_TOKEN = {
+ u'access': {u'serviceCatalog': {},
+ u'token': {u'expires': u'2012-10-03T16:58:01Z',
+ u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
+ u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ u'roles': [],
+ u'roles_links': [],
+ u'username': u'exampleuser'}
+ }
+}
+
+PROJECT_SCOPED_TOKEN = {
+ u'access': {
+ u'serviceCatalog': [{
+ u'endpoints': [{
+ u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'internalURL':
+ u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'publicURL':
+ u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne'
+ }],
+ u'endpoints_links': [],
+ u'name': u'Volume Service',
+ u'type': u'volume'},
+ {u'endpoints': [{
+ u'adminURL': u'http://admin:9292/v1',
+ u'internalURL': u'http://internal:9292/v1',
+ u'publicURL': u'http://public.com:9292/v1',
+ u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Image Service',
+ u'type': u'image'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Compute Service',
+ u'type': u'compute'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:8773/services/Admin',
+u'internalURL': u'http://internal:8773/services/Cloud',
+u'publicURL': u'http://public.com:8773/services/Cloud',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'EC2 Service',
+ u'type': u'ec2'},
+ {u'endpoints': [{
+u'adminURL': u'http://admin:35357/v2.0',
+u'internalURL': u'http://internal:5000/v2.0',
+u'publicURL': u'http://public.com:5000/v2.0',
+u'region': u'RegionOne'}],
+ u'endpoints_links': [],
+ u'name': u'Identity Service',
+ u'type': u'identity'}],
+ u'token': {u'expires': u'2012-10-03T16:53:36Z',
+ u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
+ u'tenant': {u'description': u'',
+ u'enabled': True,
+ u'id': u'225da22d3ce34b15877ea70b2a575f58',
+ u'name': u'exampleproject'}},
+ u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
+ u'name': u'Member'}],
+ u'roles_links': [],
+ u'username': u'exampleuser'}
+ }
+}
+
+AUTH_RESPONSE_BODY = {
+ u'access': {
+ u'token': {
+ u'id': u'ab48a9efdfedb23ty3494',
+ u'expires': u'2010-11-01T03:32:15-05:00',
+ u'tenant': {
+ u'id': u'345',
+ u'name': u'My Project'
+ }
+ },
+ u'user': {
+ u'id': u'123',
+ u'name': u'jqsmith',
+ u'roles': [{
+ u'id': u'234',
+ u'name': u'compute:admin'
+ }, {
+ u'id': u'235',
+ u'name': u'object-store:admin',
+ u'tenantId': u'1'
+ }],
+ u'roles_links': []
+ },
+ u'serviceCatalog': [{
+ u'name': u'Cloud Servers',
+ u'type': u'compute',
+ u'endpoints': [{
+ u'tenantId': u'1',
+ u'publicURL': u'https://compute.north.host/v1/1234',
+ u'internalURL': u'https://compute.north.host/v1/1234',
+ u'region': u'North',
+ u'versionId': u'1.0',
+ u'versionInfo': u'https://compute.north.host/v1.0/',
+ u'versionList': u'https://compute.north.host/u'
+ }, {
+ u'tenantId': u'2',
+ u'publicURL': u'https://compute.north.host/v1.1/3456',
+ u'internalURL': u'https://compute.north.host/v1.1/3456',
+ u'region': u'North',
+ u'versionId': u'1.1',
+ u'versionInfo': u'https://compute.north.host/v1.1/',
+ u'versionList': u'https://compute.north.host/u'
+ }],
+ u'endpoints_links': []
+ }, {
+ u'name': u'Cloud Files',
+ u'type': u'object-store',
+ u'endpoints': [{
+ u'tenantId': u'11',
+ u'publicURL': u'https://swift.north.host/v1/blah',
+ u'internalURL': u'https://swift.north.host/v1/blah',
+ u'region': u'South',
+ u'versionId': u'1.0',
+ u'versionInfo': u'uri',
+ u'versionList': u'uri'
+ }, {
+ u'tenantId': u'2',
+ u'publicURL': u'https://swift.north.host/v1.1/blah',
+ u'internalURL': u'https://compute.north.host/v1.1/blah',
+ u'region': u'South',
+ u'versionId': u'1.1',
+ u'versionInfo': u'https://swift.north.host/v1.1/',
+ u'versionList': u'https://swift.north.host/'
+ }],
+ u'endpoints_links': [{
+ u'rel': u'next',
+ u'href': u'https://identity.north.host/v2.0/'
+ u'endpoints?marker=2'
+ }]
+ }, {
+ u'name': u'Image Servers',
+ u'type': u'image',
+ u'endpoints': [{
+ u'publicURL': u'https://image.north.host/v1/',
+ u'internalURL': u'https://image-internal.north.host/v1/',
+ u'region': u'North'
+ }, {
+ u'publicURL': u'https://image.south.host/v1/',
+ u'internalURL': u'https://image-internal.south.host/v1/',
+ u'region': u'South'
+ }],
+ u'endpoints_links': []
+ }],
+ u'serviceCatalog_links': [{
+ u'rel': u'next',
+ u'href': (u'https://identity.host/v2.0/endpoints?'
+ u'session=2hfh8Ar&marker=2')
+ }]
+ }
+}
diff --git a/tests/test_access.py b/tests/v2_0/test_access.py
index 31f91d6..c1bb4f7 100644
--- a/tests/test_access.py
+++ b/tests/v2_0/test_access.py
@@ -3,7 +3,7 @@ import datetime
from keystoneclient import access
from keystoneclient.openstack.common import timeutils
from tests import utils
-from tests import client_fixtures
+from tests.v2_0 import client_fixtures
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
@@ -11,7 +11,7 @@ PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
class AccessInfoTest(utils.TestCase):
def test_building_unscoped_accessinfo(self):
- auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -30,6 +30,8 @@ class AccessInfoTest(utils.TestCase):
self.assertEquals(auth_ref.management_url, None)
self.assertFalse(auth_ref.scoped)
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['access']['token']['expires']))
@@ -37,13 +39,13 @@ class AccessInfoTest(utils.TestCase):
def test_will_expire_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
UNSCOPED_TOKEN['access']['token']['expires'] = expires.isoformat()
- auth_ref = access.AccessInfo(UNSCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
self.assertFalse(auth_ref.will_expire_soon())
def test_building_scoped_accessinfo(self):
- auth_ref = access.AccessInfo(PROJECT_SCOPED_TOKEN['access'])
+ auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
self.assertTrue(auth_ref)
self.assertIn('token', auth_ref)
@@ -68,3 +70,5 @@ class AccessInfoTest(utils.TestCase):
('http://admin:35357/v2.0',))
self.assertTrue(auth_ref.scoped)
+ self.assertTrue(auth_ref.project_scoped)
+ self.assertFalse(auth_ref.domain_scoped)
diff --git a/tests/v2_0/test_client.py b/tests/v2_0/test_client.py
new file mode 100644
index 0000000..b1e485f
--- /dev/null
+++ b/tests/v2_0/test_client.py
@@ -0,0 +1,84 @@
+import json
+import mock
+
+import requests
+
+from keystoneclient.v2_0 import client
+from tests import utils
+from tests.v2_0 import client_fixtures
+
+
+class KeystoneClientTest(utils.TestCase):
+ def setUp(self):
+ super(KeystoneClientTest, self).setUp()
+
+ scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
+ })
+ self.scoped_mock_req = mock.Mock(return_value=scoped_fake_resp)
+
+ unscoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.UNSCOPED_TOKEN)
+ })
+ self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
+
+ def test_unscoped_init(self):
+ with mock.patch.object(requests, "request", self.unscoped_mock_req):
+ c = client.Client(username='exampleuser',
+ password='password',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.scoped)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+
+ def test_scoped_init(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ c = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertTrue(c.auth_ref.scoped)
+ self.assertTrue(c.auth_ref.project_scoped)
+ self.assertFalse(c.auth_ref.domain_scoped)
+
+ def test_auth_ref_load(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ cl = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ cache = json.dumps(cl.auth_ref)
+ new_client = client.Client(auth_ref=json.loads(cache))
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v2.0')
+
+ def test_auth_ref_load_with_overridden_arguments(self):
+ with mock.patch.object(requests, "request", self.scoped_mock_req):
+ cl = client.Client(username='exampleuser',
+ password='password',
+ tenant_name='exampleproject',
+ auth_url='http://somewhere/')
+ cache = json.dumps(cl.auth_ref)
+ new_auth_url = "http://new-public:5000/v2.0"
+ new_client = client.Client(auth_ref=json.loads(cache),
+ auth_url=new_auth_url)
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertEquals(new_client.auth_url, new_auth_url)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v2.0')
diff --git a/tests/v2_0/test_service_catalog.py b/tests/v2_0/test_service_catalog.py
new file mode 100644
index 0000000..15838b9
--- /dev/null
+++ b/tests/v2_0/test_service_catalog.py
@@ -0,0 +1,50 @@
+from keystoneclient import access
+from keystoneclient import exceptions
+
+from tests import utils
+from tests.v2_0 import client_fixtures
+
+
+class ServiceCatalogTest(utils.TestCase):
+ def setUp(self):
+ super(ServiceCatalogTest, self).setUp()
+ self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
+
+ def test_building_a_service_catalog(self):
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ self.assertEquals(sc.url_for(service_type='compute'),
+ "https://compute.north.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
+ "https://compute.north.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
+ "https://compute.north.host/v1.1/3456")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
+ "South", service_type='compute')
+
+ def test_service_catalog_endpoints(self):
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+ public_ep = sc.get_endpoints(service_type='compute',
+ endpoint_type='publicURL')
+ self.assertEquals(public_ep['compute'][1]['tenantId'], '2')
+ self.assertEquals(public_ep['compute'][1]['versionId'], '1.1')
+ self.assertEquals(public_ep['compute'][1]['internalURL'],
+ "https://compute.north.host/v1.1/3456")
+
+ def test_service_catalog_regions(self):
+ self.AUTH_RESPONSE_BODY['access']['region_name'] = "North"
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='publicURL')
+ self.assertEquals(url, "https://image.north.host/v1/")
+
+ self.AUTH_RESPONSE_BODY['access']['region_name'] = "South"
+ auth_ref = access.AccessInfo.factory(None, self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='internalURL')
+ self.assertEquals(url, "https://image-internal.south.host/v1/")
diff --git a/tests/v3/client_fixtures.py b/tests/v3/client_fixtures.py
new file mode 100644
index 0000000..fb72dc9
--- /dev/null
+++ b/tests/v3/client_fixtures.py
@@ -0,0 +1,234 @@
+UNSCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': {},
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ }
+ }
+}
+
+DOMAIN_SCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': {},
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ },
+ u'domain': {
+ u'id': u'8e9283b7ba0b1038840c3842058b86ab',
+ u'name': u'anotherdomain'
+ },
+ }
+}
+
+PROJECT_SCOPED_TOKEN = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'catalog': [{
+ u'endpoints': [{
+ u'url':
+ u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url':
+ u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url':
+ u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'volume'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:9292/v1',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'image'
+ }, {
+ u'endpoints': [{
+ u'url':
+ u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url':
+ u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url':
+ u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'compute'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:8773/services/Cloud',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:8773/services/Cloud',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:8773/services/Admin',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'ec2'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://public.com:5000/v3',
+ u'region': u'RegionOne',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://internal:5000/v3',
+ u'region': u'RegionOne',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://admin:35357/v3',
+ u'region': u'RegionOne',
+ u'interface': u'admin'
+ }],
+ u'type': u'identity'
+ }],
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'user': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
+ u'name': u'exampleuser',
+ },
+ u'project': {
+ u'domain': {
+ u'id': u'4e6893b7ba0b4006840c3845660b86ed',
+ u'name': u'exampledomain'
+ },
+ u'id': u'225da22d3ce34b15877ea70b2a575f58',
+ u'name': u'exampleproject',
+ },
+ }
+}
+
+AUTH_RESPONSE_HEADERS = {
+ u'X-Subject-Token': u'3e2813b7ba0b4006840c3825860b86ed'
+}
+
+AUTH_RESPONSE_BODY = {
+ u'token': {
+ u'methods': [
+ u'password'
+ ],
+ u'expires_at': u'2010-11-01T03:32:15-05:00',
+ u'project': {
+ u'domain': {
+ u'id': u'123',
+ u'name': u'aDomain'
+ },
+ u'id': u'345',
+ u'name': u'aTenant'
+ },
+ u'user': {
+ u'domain': {
+ u'id': u'1',
+ u'name': u'aDomain'
+ },
+ u'id': u'567',
+ u'name': u'test'
+ },
+ u'issued_at': u'2010-10-31T03:32:15-05:00',
+ u'catalog': [{
+ u'endpoints': [{
+ u'url': u'https://compute.north.host/novapi/public',
+ u'region': u'North',
+ u'interface': u'public'
+ }, {
+ u'url': u'https://compute.north.host/novapi/internal',
+ u'region': u'North',
+ u'interface': u'internal'
+ }, {
+ u'url': u'https://compute.north.host/novapi/admin',
+ u'region': u'North',
+ u'interface': u'admin'
+ }],
+ u'type': u'compute'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://swift.north.host/swiftapi/public',
+ u'region': u'South',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://swift.north.host/swiftapi/internal',
+ u'region': u'South',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://swift.north.host/swiftapi/admin',
+ u'region': u'South',
+ u'interface': u'admin'
+ }],
+ u'type': u'object-store'
+ }, {
+ u'endpoints': [{
+ u'url': u'http://glance.north.host/glanceapi/public',
+ u'region': u'North',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://glance.north.host/glanceapi/internal',
+ u'region': u'North',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://glance.north.host/glanceapi/admin',
+ u'region': u'North',
+ u'interface': u'admin'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/public',
+ u'region': u'South',
+ u'interface': u'public'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/internal',
+ u'region': u'South',
+ u'interface': u'internal'
+ }, {
+ u'url': u'http://glance.south.host/glanceapi/admin',
+ u'region': u'South',
+ u'interface': u'admin'
+ }],
+ u'type': u'image'
+ }]
+ }
+}
diff --git a/tests/v3/test_access.py b/tests/v3/test_access.py
new file mode 100644
index 0000000..050a373
--- /dev/null
+++ b/tests/v3/test_access.py
@@ -0,0 +1,106 @@
+import datetime
+
+from keystoneclient import access
+from keystoneclient.openstack.common import timeutils
+from tests import utils
+from tests.v3 import client_fixtures
+
+TOKEN_RESPONSE = utils.TestResponse({
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+})
+UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
+DOMAIN_SCOPED_TOKEN = client_fixtures.DOMAIN_SCOPED_TOKEN
+PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
+
+
+class AccessInfoTest(utils.TestCase):
+ def test_building_unscoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=UNSCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertFalse(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.project_name, None)
+ self.assertEquals(auth_ref.project_id, None)
+
+ self.assertEquals(auth_ref.auth_url, None)
+ self.assertEquals(auth_ref.management_url, None)
+
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
+
+ self.assertEquals(auth_ref.expires, timeutils.parse_isotime(
+ UNSCOPED_TOKEN['token']['expires_at']))
+
+ def test_will_expire_soon(self):
+ expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
+ UNSCOPED_TOKEN['token']['expires_at'] = expires.isoformat()
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=UNSCOPED_TOKEN)
+ self.assertFalse(auth_ref.will_expire_soon(stale_duration=120))
+ self.assertTrue(auth_ref.will_expire_soon(stale_duration=300))
+ self.assertFalse(auth_ref.will_expire_soon())
+
+ def test_building_domain_scoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=DOMAIN_SCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertFalse(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.domain_name, 'anotherdomain')
+ self.assertEquals(auth_ref.domain_id,
+ '8e9283b7ba0b1038840c3842058b86ab')
+
+ self.assertEquals(auth_ref.project_name, None)
+ self.assertEquals(auth_ref.project_id, None)
+
+ self.assertTrue(auth_ref.domain_scoped)
+ self.assertFalse(auth_ref.project_scoped)
+
+ def test_building_project_scoped_accessinfo(self):
+ auth_ref = access.AccessInfo.factory(resp=TOKEN_RESPONSE,
+ body=PROJECT_SCOPED_TOKEN)
+
+ self.assertTrue(auth_ref)
+ self.assertIn('methods', auth_ref)
+ self.assertIn('catalog', auth_ref)
+ self.assertTrue(auth_ref['catalog'])
+
+ self.assertEquals(auth_ref.auth_token,
+ '3e2813b7ba0b4006840c3825860b86ed')
+ self.assertEquals(auth_ref.username, 'exampleuser')
+ self.assertEquals(auth_ref.user_id, 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ self.assertEquals(auth_ref.domain_name, None)
+ self.assertEquals(auth_ref.domain_id, None)
+
+ self.assertEquals(auth_ref.project_name, 'exampleproject')
+ self.assertEquals(auth_ref.project_id,
+ '225da22d3ce34b15877ea70b2a575f58')
+
+ self.assertEquals(auth_ref.tenant_name, auth_ref.project_name)
+ self.assertEquals(auth_ref.tenant_id, auth_ref.project_id)
+
+ self.assertEquals(auth_ref.auth_url,
+ ('http://public.com:5000/v3',))
+ self.assertEquals(auth_ref.management_url,
+ ('http://admin:35357/v3',))
+
+ self.assertFalse(auth_ref.domain_scoped)
+ self.assertTrue(auth_ref.project_scoped)
diff --git a/tests/v3/test_auth.py b/tests/v3/test_auth.py
new file mode 100644
index 0000000..41512f8
--- /dev/null
+++ b/tests/v3/test_auth.py
@@ -0,0 +1,408 @@
+import copy
+import json
+import requests
+
+from keystoneclient import exceptions
+from keystoneclient.v3 import client
+
+from tests.v3 import utils
+
+
+class AuthenticateAgainstKeystoneTests(utils.TestCase):
+ def setUp(self):
+ super(AuthenticateAgainstKeystoneTests, self).setUp()
+ self.TEST_RESPONSE_DICT = {
+ "token": {
+ "methods": [
+ "token",
+ "password"
+ ],
+
+ "expires_at": "2020-01-01T00:00:10.000123Z",
+ "project": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_TENANT_ID,
+ "name": self.TEST_TENANT_NAME
+ },
+ "user": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_USER,
+ "name": self.TEST_USER
+ },
+ "issued_at": "2013-05-29T16:55:21.468960Z",
+ "catalog": self.TEST_SERVICE_CATALOG
+ },
+ }
+ self.TEST_REQUEST_BODY = {
+ "auth": {
+ "identity": {
+ "methods": ["password"],
+ "password": {
+ "user": {
+ "domain": {
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "name": self.TEST_USER,
+ "password": self.TEST_TOKEN
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "id": self.TEST_TENANT_ID
+ },
+ }
+ }
+ }
+ self.TEST_REQUEST_HEADERS = {
+ 'Content-Type': 'application/json',
+ 'User-Agent': 'python-keystoneclient'
+ }
+ self.TEST_RESPONSE_HEADERS = {
+ 'X-Subject-Token': self.TEST_TOKEN
+ }
+
+ def test_authenticate_success(self):
+ TEST_TOKEN = "abcdef"
+ self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token, TEST_TOKEN)
+
+ def test_authenticate_failure(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ ident['password']['user']['password'] = 'bad_key'
+ resp = utils.TestResponse({
+ "status_code": 401,
+ "text": json.dumps({
+ "unauthorized": {
+ "message": "Unauthorized",
+ "code": "401",
+ },
+ }),
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ # Workaround for issue with assertRaises on python2.6
+ # where with assertRaises(exceptions.Unauthorized): doesn't work
+ # right
+ def client_create_wrapper():
+ client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password="bad_key",
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+
+ self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
+
+ def test_auth_redirect(self):
+ correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True)
+ dict_responses = [
+ {
+ "headers": {
+ 'location': self.TEST_ADMIN_URL + "/auth/tokens",
+ 'X-Subject-Token': self.TEST_TOKEN,
+ },
+ "status_code": 305,
+ "text": "Use proxy",
+ },
+ {
+ "headers": {'X-Subject-Token': self.TEST_TOKEN},
+ "status_code": 200,
+ "text": correct_response,
+ },
+ ]
+ responses = [(utils.TestResponse(resp))
+ for resp in dict_responses]
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn(responses[0])
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_ADMIN_URL + "/auth/tokens",
+ **kwargs).AndReturn(responses[1])
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_domain_username_password_scoped(self):
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_userid_password_domain_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+
+ scope = self.TEST_REQUEST_BODY['auth']['scope']
+ del scope['project']
+ scope['domain'] = {}
+ scope['domain']['id'] = self.TEST_DOMAIN_ID
+
+ token = self.TEST_RESPONSE_DICT['token']
+ del token['project']
+ token['domain'] = {}
+ token['domain']['id'] = self.TEST_DOMAIN_ID
+ token['domain']['name'] = self.TEST_DOMAIN_NAME
+
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ domain_id=self.TEST_DOMAIN_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_domain_id,
+ self.TEST_DOMAIN_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_userid_password_project_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']['user']['domain']
+ del ident['password']['user']['name']
+ ident['password']['user']['id'] = self.TEST_USER
+
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_id=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_tenant_id,
+ self.TEST_TENANT_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_password_unscoped(self):
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ del self.TEST_REQUEST_BODY['auth']['scope']
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
+ username=self.TEST_USER,
+ password=self.TEST_TOKEN,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+ self.assertFalse('catalog' in cs.service_catalog.catalog)
+
+ def test_authenticate_success_token_domain_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+
+ scope = self.TEST_REQUEST_BODY['auth']['scope']
+ del scope['project']
+ scope['domain'] = {}
+ scope['domain']['id'] = self.TEST_DOMAIN_ID
+
+ token = self.TEST_RESPONSE_DICT['token']
+ del token['project']
+ token['domain'] = {}
+ token['domain']['id'] = self.TEST_DOMAIN_ID
+ token['domain']['name'] = self.TEST_DOMAIN_NAME
+
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ domain_id=self.TEST_DOMAIN_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_domain_id,
+ self.TEST_DOMAIN_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_token_project_scoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ project_id=self.TEST_TENANT_ID,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_tenant_id,
+ self.TEST_TENANT_ID)
+ self.assertEqual(cs.management_url,
+ self.TEST_RESPONSE_DICT["token"]["catalog"][3]
+ ['endpoints'][2]["url"])
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+
+ def test_authenticate_success_token_unscoped(self):
+ ident = self.TEST_REQUEST_BODY['auth']['identity']
+ del ident['password']
+ ident['methods'] = ['token']
+ ident['token'] = {}
+ ident['token']['id'] = self.TEST_TOKEN
+ del self.TEST_REQUEST_BODY['auth']['scope']
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
+ resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(self.TEST_RESPONSE_DICT),
+ "headers": self.TEST_RESPONSE_HEADERS,
+ })
+
+ kwargs = copy.copy(self.TEST_REQUEST_BASE)
+ kwargs['headers'] = self.TEST_REQUEST_HEADERS
+ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
+ requests.request('POST',
+ self.TEST_URL + "/auth/tokens",
+ **kwargs).AndReturn((resp))
+ self.mox.ReplayAll()
+
+ cs = client.Client(token=self.TEST_TOKEN,
+ auth_url=self.TEST_URL)
+ self.assertEqual(cs.auth_token,
+ self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
+ self.assertFalse('catalog' in cs.service_catalog.catalog)
diff --git a/tests/v3/test_client.py b/tests/v3/test_client.py
new file mode 100644
index 0000000..3198722
--- /dev/null
+++ b/tests/v3/test_client.py
@@ -0,0 +1,121 @@
+import json
+import mock
+
+import requests
+
+from keystoneclient.v3 import client
+
+from tests import utils
+from tests.v3 import client_fixtures
+
+
+class KeystoneClientTest(utils.TestCase):
+ def setUp(self):
+ super(KeystoneClientTest, self).setUp()
+
+ domain_scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.DOMAIN_SCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.domain_scoped_mock_req = mock.Mock(
+ return_value=domain_scoped_fake_resp)
+
+ project_scoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.project_scoped_mock_req = mock.Mock(
+ return_value=project_scoped_fake_resp)
+
+ unscoped_fake_resp = utils.TestResponse({
+ "status_code": 200,
+ "text": json.dumps(client_fixtures.UNSCOPED_TOKEN),
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+ self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp)
+
+ def test_unscoped_init(self):
+ with mock.patch.object(requests, "request", self.unscoped_mock_req):
+ c = client.Client(user_domain_name='exampledomain',
+ username='exampleuser',
+ password='password',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+
+ def test_domain_scoped_init(self):
+ with mock.patch.object(requests,
+ "request",
+ self.domain_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ domain_name='exampledomain',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertTrue(c.auth_ref.domain_scoped)
+ self.assertFalse(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+ self.assertEquals(c.auth_domain_id,
+ '8e9283b7ba0b1038840c3842058b86ab')
+
+ def test_project_scoped_init(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ user_domain_name='exampledomain',
+ project_name='exampleproject',
+ auth_url='http://somewhere/')
+ self.assertIsNotNone(c.auth_ref)
+ self.assertFalse(c.auth_ref.domain_scoped)
+ self.assertTrue(c.auth_ref.project_scoped)
+ self.assertEquals(c.auth_user_id,
+ 'c4da488862bd435c9e6c0275a0d0e49a')
+ self.assertEquals(c.auth_tenant_id,
+ '225da22d3ce34b15877ea70b2a575f58')
+
+ def test_auth_ref_load(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ project_id='225da22d3ce34b15877ea70b2a575f58',
+ auth_url='http://somewhere/')
+ cache = json.dumps(c.auth_ref)
+ new_client = client.Client(auth_ref=json.loads(cache))
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v3')
+
+ def test_auth_ref_load_with_overridden_arguments(self):
+ with mock.patch.object(requests,
+ "request",
+ self.project_scoped_mock_req):
+ c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a',
+ password='password',
+ project_id='225da22d3ce34b15877ea70b2a575f58',
+ auth_url='http://somewhere/')
+ cache = json.dumps(c.auth_ref)
+ new_auth_url = "http://new-public:5000/v3"
+ new_client = client.Client(auth_ref=json.loads(cache),
+ auth_url=new_auth_url)
+ self.assertIsNotNone(new_client.auth_ref)
+ self.assertFalse(new_client.auth_ref.domain_scoped)
+ self.assertTrue(new_client.auth_ref.project_scoped)
+ self.assertEquals(new_client.auth_url, new_auth_url)
+ self.assertEquals(new_client.username, 'exampleuser')
+ self.assertIsNone(new_client.password)
+ self.assertEqual(new_client.management_url,
+ 'http://admin:35357/v3')
diff --git a/tests/v3/test_service_catalog.py b/tests/v3/test_service_catalog.py
new file mode 100644
index 0000000..d0dac3a
--- /dev/null
+++ b/tests/v3/test_service_catalog.py
@@ -0,0 +1,55 @@
+from keystoneclient import access
+from keystoneclient import exceptions
+
+from tests.v3 import client_fixtures
+from tests.v3 import utils
+
+
+class ServiceCatalogTest(utils.TestCase):
+ def setUp(self):
+ super(ServiceCatalogTest, self).setUp()
+ self.AUTH_RESPONSE_BODY = client_fixtures.AUTH_RESPONSE_BODY
+ self.RESPONSE = utils.TestResponse({
+ "headers": client_fixtures.AUTH_RESPONSE_HEADERS
+ })
+
+ def test_building_a_service_catalog(self):
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ self.assertEquals(sc.url_for(service_type='compute'),
+ "https://compute.north.host/novapi/public")
+ self.assertEquals(sc.url_for(service_type='compute',
+ endpoint_type='internal'),
+ "https://compute.north.host/novapi/internal")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for, "region",
+ "South", service_type='compute')
+
+ def test_service_catalog_endpoints(self):
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ public_ep = sc.get_endpoints(service_type='compute',
+ endpoint_type='public')
+ self.assertEquals(public_ep['compute'][0]['region'], 'North')
+ self.assertEquals(public_ep['compute'][0]['url'],
+ "https://compute.north.host/novapi/public")
+
+ def test_service_catalog_regions(self):
+ self.AUTH_RESPONSE_BODY['token']['region_name'] = "North"
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+
+ url = sc.url_for(service_type='image', endpoint_type='public')
+ self.assertEquals(url, "http://glance.north.host/glanceapi/public")
+
+ self.AUTH_RESPONSE_BODY['token']['region_name'] = "South"
+ auth_ref = access.AccessInfo.factory(self.RESPONSE,
+ self.AUTH_RESPONSE_BODY)
+ sc = auth_ref.service_catalog
+ url = sc.url_for(service_type='image', endpoint_type='internal')
+ self.assertEquals(url, "http://glance.south.host/glanceapi/internal")
diff --git a/tests/v3/utils.py b/tests/v3/utils.py
index 6ae7c0e..b3b359c 100644
--- a/tests/v3/utils.py
+++ b/tests/v3/utils.py
@@ -1,8 +1,8 @@
import copy
import json
-import uuid
import time
import urlparse
+import uuid
import mox
import requests
@@ -32,6 +32,9 @@ class TestClient(client.Client):
class TestCase(testtools.TestCase):
+ TEST_DOMAIN_ID = '1'
+ TEST_DOMAIN_NAME = 'aDomain'
+ TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
TEST_USER = 'test'
@@ -43,6 +46,84 @@ class TestCase(testtools.TestCase):
'verify': True,
}
+ TEST_SERVICE_CATALOG = [{
+ "endpoints": [{
+ "url": "http://cdn.admin-nets.local:8774/v1.0/",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://cdn.admin-nets.local:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "nova_compat"
+ }, {
+ "endpoints": [{
+ "url": "http://nova/novapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://nova/novapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://nova/novapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "compute"
+ }, {
+ "endpoints": [{
+ "url": "http://glance/glanceapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://glance/glanceapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://glance/glanceapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "image",
+ "name": "glance"
+ }, {
+ "endpoints": [{
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://127.0.0.1:35357/v3",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "identity"
+ }, {
+ "endpoints": [{
+ "url": "http://swift/swiftapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://swift/swiftapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://swift/swiftapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "object-store"
+ }]
+
def setUp(self):
super(TestCase, self).setUp()
self.mox = mox.Mox()