summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitreview1
-rwxr-xr-xexamples/pki/gen_pki.sh5
-rwxr-xr-xexamples/pki/run_all.sh1
-rw-r--r--keystoneclient/adapter.py205
-rw-r--r--keystoneclient/exceptions.py528
-rw-r--r--keystoneclient/fixture/discovery.py246
-rw-r--r--keystoneclient/fixture/v2.py234
-rw-r--r--keystoneclient/fixture/v3.py411
-rw-r--r--keystoneclient/middleware/__init__.py0
-rw-r--r--keystoneclient/middleware/auth_token.py1624
-rw-r--r--keystoneclient/middleware/memcache_crypt.py209
-rw-r--r--keystoneclient/middleware/s3_token.py274
-rw-r--r--keystoneclient/tests/unit/test_auth_token_middleware.py1952
-rw-r--r--keystoneclient/tests/unit/test_https.py22
-rw-r--r--keystoneclient/tests/unit/test_memcache_crypt.py102
-rw-r--r--keystoneclient/tests/unit/test_s3_token_middleware.py264
-rw-r--r--keystoneclient/tests/unit/v3/test_federation.py2
-rw-r--r--requirements.txt2
-rw-r--r--test-requirements.txt1
19 files changed, 110 insertions, 5973 deletions
diff --git a/.gitreview b/.gitreview
index 56224f5..432619b 100644
--- a/.gitreview
+++ b/.gitreview
@@ -2,3 +2,4 @@
host=review.openstack.org
port=29418
project=openstack/python-keystoneclient.git
+defaultbranch=feature/keystoneauth_integration
diff --git a/examples/pki/gen_pki.sh b/examples/pki/gen_pki.sh
index b8b28f9..8e2b59f 100755
--- a/examples/pki/gen_pki.sh
+++ b/examples/pki/gen_pki.sh
@@ -191,11 +191,6 @@ function issue_certs {
check_error $?
}
-function create_middleware_cert {
- cp $CERTS_DIR/ssl_cert.pem $CERTS_DIR/middleware.pem
- cat $PRIVATE_DIR/ssl_key.pem >> $CERTS_DIR/middleware.pem
-}
-
function check_openssl {
echo 'Checking openssl availability ...'
which openssl
diff --git a/examples/pki/run_all.sh b/examples/pki/run_all.sh
index ba2f0b6..2438ec7 100755
--- a/examples/pki/run_all.sh
+++ b/examples/pki/run_all.sh
@@ -26,6 +26,5 @@ generate_ca
ssl_cert_req
cms_signing_cert_req
issue_certs
-create_middleware_cert
gen_sample_cms
cleanup
diff --git a/keystoneclient/adapter.py b/keystoneclient/adapter.py
index 74399da..c33f94a 100644
--- a/keystoneclient/adapter.py
+++ b/keystoneclient/adapter.py
@@ -10,206 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from oslo_serialization import jsonutils
+from keystoneauth1 import adapter
-from keystoneclient import utils
-
-
-class Adapter(object):
- """An instance of a session with local variables.
-
- A session is a global object that is shared around amongst many clients. It
- therefore contains state that is relevant to everyone. There is a lot of
- state such as the service type and region_name that are only relevant to a
- particular client that is using the session. An adapter provides a wrapper
- of client local data around the global session object.
-
- :param session: The session object to wrap.
- :type session: keystoneclient.session.Session
- :param str service_type: The default service_type for URL discovery.
- :param str service_name: The default service_name for URL discovery.
- :param str interface: The default interface for URL discovery.
- :param str region_name: The default region_name for URL discovery.
- :param str endpoint_override: Always use this endpoint URL for requests
- for this client.
- :param tuple version: The version that this API targets.
- :param auth: An auth plugin to use instead of the session one.
- :type auth: keystoneclient.auth.base.BaseAuthPlugin
- :param str user_agent: The User-Agent string to set.
- :param int connect_retries: the maximum number of retries that should
- be attempted for connection errors.
- Default None - use session default which
- is don't retry.
- :param logger: A logging object to use for requests that pass through this
- adapter.
- :type logger: logging.Logger
- """
-
- @utils.positional()
- def __init__(self, session, service_type=None, service_name=None,
- interface=None, region_name=None, endpoint_override=None,
- version=None, auth=None, user_agent=None,
- connect_retries=None, logger=None):
- # NOTE(jamielennox): when adding new parameters to adapter please also
- # add them to the adapter call in httpclient.HTTPClient.__init__
- self.session = session
- self.service_type = service_type
- self.service_name = service_name
- self.interface = interface
- self.region_name = region_name
- self.endpoint_override = endpoint_override
- self.version = version
- self.user_agent = user_agent
- self.auth = auth
- self.connect_retries = connect_retries
- self.logger = logger
-
- def _set_endpoint_filter_kwargs(self, kwargs):
- if self.service_type:
- kwargs.setdefault('service_type', self.service_type)
- if self.service_name:
- kwargs.setdefault('service_name', self.service_name)
- if self.interface:
- kwargs.setdefault('interface', self.interface)
- if self.region_name:
- kwargs.setdefault('region_name', self.region_name)
- if self.version:
- kwargs.setdefault('version', self.version)
-
- def request(self, url, method, **kwargs):
- endpoint_filter = kwargs.setdefault('endpoint_filter', {})
- self._set_endpoint_filter_kwargs(endpoint_filter)
-
- if self.endpoint_override:
- kwargs.setdefault('endpoint_override', self.endpoint_override)
-
- if self.auth:
- kwargs.setdefault('auth', self.auth)
- if self.user_agent:
- kwargs.setdefault('user_agent', self.user_agent)
- if self.connect_retries is not None:
- kwargs.setdefault('connect_retries', self.connect_retries)
- if self.logger:
- kwargs.setdefault('logger', self.logger)
-
- return self.session.request(url, method, **kwargs)
-
- def get_token(self, auth=None):
- """Return a token as provided by the auth plugin.
-
- :param auth: The auth plugin to use for token. Overrides the plugin
- on the session. (optional)
- :type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
-
- :raises keystoneclient.exceptions.AuthorizationFailure: if a new token
- fetch fails.
-
- :returns: A valid token.
- :rtype: string
- """
- return self.session.get_token(auth or self.auth)
-
- def get_endpoint(self, auth=None, **kwargs):
- """Get an endpoint as provided by the auth plugin.
-
- :param auth: The auth plugin to use for token. Overrides the plugin on
- the session. (optional)
- :type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
-
- :raises keystoneclient.exceptions.MissingAuthPlugin: if a plugin is not
- available.
-
- :returns: An endpoint if available or None.
- :rtype: string
- """
- if self.endpoint_override:
- return self.endpoint_override
-
- self._set_endpoint_filter_kwargs(kwargs)
- return self.session.get_endpoint(auth or self.auth, **kwargs)
-
- def invalidate(self, auth=None):
- """Invalidate an authentication plugin."""
- return self.session.invalidate(auth or self.auth)
-
- def get_user_id(self, auth=None):
- """Return the authenticated user_id as provided by the auth plugin.
-
- :param auth: The auth plugin to use for token. Overrides the plugin
- on the session. (optional)
- :type auth: keystoneclient.auth.base.BaseAuthPlugin
-
- :raises keystoneclient.exceptions.AuthorizationFailure:
- if a new token fetch fails.
- :raises keystoneclient.exceptions.MissingAuthPlugin:
- if a plugin is not available.
-
- :returns: Current `user_id` or None if not supported by plugin.
- :rtype: string
- """
- return self.session.get_user_id(auth or self.auth)
-
- def get_project_id(self, auth=None):
- """Return the authenticated project_id as provided by the auth plugin.
-
- :param auth: The auth plugin to use for token. Overrides the plugin
- on the session. (optional)
- :type auth: keystoneclient.auth.base.BaseAuthPlugin
-
- :raises keystoneclient.exceptions.AuthorizationFailure:
- if a new token fetch fails.
- :raises keystoneclient.exceptions.MissingAuthPlugin:
- if a plugin is not available.
-
- :returns: Current `project_id` or None if not supported by plugin.
- :rtype: string
- """
- return self.session.get_project_id(auth or self.auth)
-
- def get(self, url, **kwargs):
- return self.request(url, 'GET', **kwargs)
-
- def head(self, url, **kwargs):
- return self.request(url, 'HEAD', **kwargs)
-
- def post(self, url, **kwargs):
- return self.request(url, 'POST', **kwargs)
-
- def put(self, url, **kwargs):
- return self.request(url, 'PUT', **kwargs)
-
- def patch(self, url, **kwargs):
- return self.request(url, 'PATCH', **kwargs)
-
- def delete(self, url, **kwargs):
- return self.request(url, 'DELETE', **kwargs)
-
-
-class LegacyJsonAdapter(Adapter):
- """Make something that looks like an old HTTPClient.
-
- A common case when using an adapter is that we want an interface similar to
- the HTTPClients of old which returned the body as JSON as well.
-
- You probably don't want this if you are starting from scratch.
- """
-
- def request(self, *args, **kwargs):
- headers = kwargs.setdefault('headers', {})
- headers.setdefault('Accept', 'application/json')
-
- try:
- kwargs['json'] = kwargs.pop('body')
- except KeyError:
- pass
-
- resp = super(LegacyJsonAdapter, self).request(*args, **kwargs)
-
- body = None
- if resp.text:
- try:
- body = jsonutils.loads(resp.text)
- except ValueError:
- pass
-
- return resp, body
+Adapter = adapter.Adapter
+LegacyJsonAdapter = adapter.LegacyJsonAdapter
diff --git a/keystoneclient/exceptions.py b/keystoneclient/exceptions.py
index d88bb73..e140a5e 100644
--- a/keystoneclient/exceptions.py
+++ b/keystoneclient/exceptions.py
@@ -14,18 +14,77 @@
# under the License.
"""Exception definitions."""
-import inspect
-import sys
-
-import six
+from keystoneauth1 import exceptions as new_exceptions
from keystoneclient.i18n import _
-class ClientException(Exception):
- """The base exception class for all exceptions this library raises.
+# NOTE(jamielennox): Import only the exceptions we need from keystoneauth. Lots
+# of other places read exceptions from keystoneclient and so we need to
+# maintain these for backwards compatibility. I'm naming them individually so
+# that we don't import a lot of new exceptions and make more compatibility
+# issues for ourselves in the future.
+
+ClientException = new_exceptions.ClientException
+HttpError = new_exceptions.HttpError
+HTTPClientError = new_exceptions.HTTPClientError
+BadRequest = new_exceptions.BadRequest
+Unauthorized = new_exceptions.Unauthorized
+PaymentRequired = new_exceptions.PaymentRequired
+Forbidden = new_exceptions.Forbidden
+NotFound = new_exceptions.NotFound
+MethodNotAllowed = new_exceptions.MethodNotAllowed
+NotAcceptable = new_exceptions.NotAcceptable
+ProxyAuthenticationRequired = new_exceptions.ProxyAuthenticationRequired
+Conflict = new_exceptions.Conflict
+Gone = new_exceptions.Gone
+LengthRequired = new_exceptions.LengthRequired
+PreconditionFailed = new_exceptions.PreconditionFailed
+RequestEntityTooLarge = new_exceptions.RequestEntityTooLarge
+RequestUriTooLong = new_exceptions.RequestUriTooLong
+UnsupportedMediaType = new_exceptions.UnsupportedMediaType
+RequestedRangeNotSatisfiable = new_exceptions.RequestedRangeNotSatisfiable
+ExpectationFailed = new_exceptions.ExpectationFailed
+UnprocessableEntity = new_exceptions.UnprocessableEntity
+HttpServerError = new_exceptions.HttpServerError
+InternalServerError = new_exceptions.InternalServerError
+HttpNotImplemented = new_exceptions.HttpNotImplemented
+BadGateway = new_exceptions.BadGateway
+ServiceUnavailable = new_exceptions.ServiceUnavailable
+GatewayTimeout = new_exceptions.GatewayTimeout
+HttpVersionNotSupported = new_exceptions.HttpVersionNotSupported
+from_response = new_exceptions.from_response
+
+# NOTE(jamielennox): Rahh! this is just wrong. In the apiclient conversion
+# someone mapped the connection timeout onto the HTTP timeout exception. Assume
+# people want the connection timeout as this is much more common.
+RequestTimeout = new_exceptions.ConnectTimeout
+
+ConnectionError = new_exceptions.ConnectionError
+SSLError = new_exceptions.SSLError
+Timeout = new_exceptions.ConnectTimeout
+
+
+# NOTE(akurilin): This alias should be left here to support backwards
+# compatibility until we are sure that usage of these exceptions in
+# projects is correct.
+HTTPNotImplemented = HttpNotImplemented
+HTTPError = HttpError
+
+
+class HTTPRedirection(HttpError):
+ """HTTP Redirection."""
+ message = _("HTTP Redirection")
+
+
+class MultipleChoices(HTTPRedirection):
+ """HTTP 300 - Multiple Choices.
+
+ Indicates multiple options for the resource that the client may follow.
"""
- pass
+
+ http_status = 300
+ message = _("Multiple Choices")
class ValidationError(ClientException):
@@ -43,14 +102,7 @@ class CommandError(ClientException):
pass
-class AuthorizationFailure(ClientException):
- """Cannot authorize API client."""
- pass
-
-
-class ConnectionError(ClientException):
- """Cannot connect to API service."""
- pass
+AuthorizationFailure = new_exceptions.AuthorizationFailure
class ConnectionRefused(ConnectionError):
@@ -75,399 +127,6 @@ class AuthSystemNotFound(AuthorizationFailure):
self.auth_system = auth_system
-class NoUniqueMatch(ClientException):
- """Multiple entities found instead of one."""
- pass
-
-
-class EndpointException(ClientException):
- """Something is rotten in Service Catalog."""
- pass
-
-
-class EndpointNotFound(EndpointException):
- """Could not find requested endpoint in Service Catalog."""
- pass
-
-
-class AmbiguousEndpoints(EndpointException):
- """Found more than one matching endpoint in Service Catalog."""
- def __init__(self, endpoints=None):
- super(AmbiguousEndpoints, self).__init__(
- _("AmbiguousEndpoints: %r") % endpoints)
- self.endpoints = endpoints
-
-
-class HttpError(ClientException):
- """The base exception class for all HTTP exceptions.
- """
- http_status = 0
- message = _("HTTP Error")
-
- def __init__(self, message=None, details=None,
- response=None, request_id=None,
- url=None, method=None, http_status=None):
- self.http_status = http_status or self.http_status
- self.message = message or self.message
- self.details = details
- self.request_id = request_id
- self.response = response
- self.url = url
- self.method = method
- formatted_string = "%s (HTTP %s)" % (self.message, self.http_status)
- if request_id:
- formatted_string += " (Request-ID: %s)" % request_id
- super(HttpError, self).__init__(formatted_string)
-
-
-class HTTPRedirection(HttpError):
- """HTTP Redirection."""
- message = _("HTTP Redirection")
-
-
-class HTTPClientError(HttpError):
- """Client-side HTTP error.
-
- Exception for cases in which the client seems to have erred.
- """
- message = _("HTTP Client Error")
-
-
-class HttpServerError(HttpError):
- """Server-side HTTP error.
-
- Exception for cases in which the server is aware that it has
- erred or is incapable of performing the request.
- """
- message = _("HTTP Server Error")
-
-
-class MultipleChoices(HTTPRedirection):
- """HTTP 300 - Multiple Choices.
-
- Indicates multiple options for the resource that the client may follow.
- """
-
- http_status = 300
- message = _("Multiple Choices")
-
-
-class BadRequest(HTTPClientError):
- """HTTP 400 - Bad Request.
-
- The request cannot be fulfilled due to bad syntax.
- """
- http_status = 400
- message = _("Bad Request")
-
-
-class Unauthorized(HTTPClientError):
- """HTTP 401 - Unauthorized.
-
- Similar to 403 Forbidden, but specifically for use when authentication
- is required and has failed or has not yet been provided.
- """
- http_status = 401
- message = _("Unauthorized")
-
-
-class PaymentRequired(HTTPClientError):
- """HTTP 402 - Payment Required.
-
- Reserved for future use.
- """
- http_status = 402
- message = _("Payment Required")
-
-
-class Forbidden(HTTPClientError):
- """HTTP 403 - Forbidden.
-
- The request was a valid request, but the server is refusing to respond
- to it.
- """
- http_status = 403
- message = _("Forbidden")
-
-
-class NotFound(HTTPClientError):
- """HTTP 404 - Not Found.
-
- The requested resource could not be found but may be available again
- in the future.
- """
- http_status = 404
- message = _("Not Found")
-
-
-class MethodNotAllowed(HTTPClientError):
- """HTTP 405 - Method Not Allowed.
-
- A request was made of a resource using a request method not supported
- by that resource.
- """
- http_status = 405
- message = _("Method Not Allowed")
-
-
-class NotAcceptable(HTTPClientError):
- """HTTP 406 - Not Acceptable.
-
- The requested resource is only capable of generating content not
- acceptable according to the Accept headers sent in the request.
- """
- http_status = 406
- message = _("Not Acceptable")
-
-
-class ProxyAuthenticationRequired(HTTPClientError):
- """HTTP 407 - Proxy Authentication Required.
-
- The client must first authenticate itself with the proxy.
- """
- http_status = 407
- message = _("Proxy Authentication Required")
-
-
-class RequestTimeout(HTTPClientError):
- """HTTP 408 - Request Timeout.
-
- The server timed out waiting for the request.
- """
- http_status = 408
- message = _("Request Timeout")
-
-
-class Conflict(HTTPClientError):
- """HTTP 409 - Conflict.
-
- Indicates that the request could not be processed because of conflict
- in the request, such as an edit conflict.
- """
- http_status = 409
- message = _("Conflict")
-
-
-class Gone(HTTPClientError):
- """HTTP 410 - Gone.
-
- Indicates that the resource requested is no longer available and will
- not be available again.
- """
- http_status = 410
- message = _("Gone")
-
-
-class LengthRequired(HTTPClientError):
- """HTTP 411 - Length Required.
-
- The request did not specify the length of its content, which is
- required by the requested resource.
- """
- http_status = 411
- message = _("Length Required")
-
-
-class PreconditionFailed(HTTPClientError):
- """HTTP 412 - Precondition Failed.
-
- The server does not meet one of the preconditions that the requester
- put on the request.
- """
- http_status = 412
- message = _("Precondition Failed")
-
-
-class RequestEntityTooLarge(HTTPClientError):
- """HTTP 413 - Request Entity Too Large.
-
- The request is larger than the server is willing or able to process.
- """
- http_status = 413
- message = _("Request Entity Too Large")
-
- def __init__(self, *args, **kwargs):
- try:
- self.retry_after = int(kwargs.pop('retry_after'))
- except (KeyError, ValueError):
- self.retry_after = 0
-
- super(RequestEntityTooLarge, self).__init__(*args, **kwargs)
-
-
-class RequestUriTooLong(HTTPClientError):
- """HTTP 414 - Request-URI Too Long.
-
- The URI provided was too long for the server to process.
- """
- http_status = 414
- message = _("Request-URI Too Long")
-
-
-class UnsupportedMediaType(HTTPClientError):
- """HTTP 415 - Unsupported Media Type.
-
- The request entity has a media type which the server or resource does
- not support.
- """
- http_status = 415
- message = _("Unsupported Media Type")
-
-
-class RequestedRangeNotSatisfiable(HTTPClientError):
- """HTTP 416 - Requested Range Not Satisfiable.
-
- The client has asked for a portion of the file, but the server cannot
- supply that portion.
- """
- http_status = 416
- message = _("Requested Range Not Satisfiable")
-
-
-class ExpectationFailed(HTTPClientError):
- """HTTP 417 - Expectation Failed.
-
- The server cannot meet the requirements of the Expect request-header field.
- """
- http_status = 417
- message = _("Expectation Failed")
-
-
-class UnprocessableEntity(HTTPClientError):
- """HTTP 422 - Unprocessable Entity.
-
- The request was well-formed but was unable to be followed due to semantic
- errors.
- """
- http_status = 422
- message = _("Unprocessable Entity")
-
-
-class InternalServerError(HttpServerError):
- """HTTP 500 - Internal Server Error.
-
- A generic error message, given when no more specific message is suitable.
- """
- http_status = 500
- message = _("Internal Server Error")
-
-
-# NotImplemented is a python keyword.
-class HttpNotImplemented(HttpServerError):
- """HTTP 501 - Not Implemented.
-
- The server either does not recognize the request method, or it lacks
- the ability to fulfill the request.
- """
- http_status = 501
- message = _("Not Implemented")
-
-
-class BadGateway(HttpServerError):
- """HTTP 502 - Bad Gateway.
-
- The server was acting as a gateway or proxy and received an invalid
- response from the upstream server.
- """
- http_status = 502
- message = _("Bad Gateway")
-
-
-class ServiceUnavailable(HttpServerError):
- """HTTP 503 - Service Unavailable.
-
- The server is currently unavailable.
- """
- http_status = 503
- message = _("Service Unavailable")
-
-
-class GatewayTimeout(HttpServerError):
- """HTTP 504 - Gateway Timeout.
-
- The server was acting as a gateway or proxy and did not receive a timely
- response from the upstream server.
- """
- http_status = 504
- message = _("Gateway Timeout")
-
-
-class HttpVersionNotSupported(HttpServerError):
- """HTTP 505 - HttpVersion Not Supported.
-
- The server does not support the HTTP protocol version used in the request.
- """
- http_status = 505
- message = _("HTTP Version Not Supported")
-
-
-# _code_map contains all the classes that have http_status attribute.
-_code_map = dict(
- (getattr(obj, 'http_status', None), obj)
- for name, obj in six.iteritems(vars(sys.modules[__name__]))
- if inspect.isclass(obj) and getattr(obj, 'http_status', False)
-)
-
-
-def from_response(response, method, url):
- """Returns an instance of :class:`HttpError` or subclass based on response.
-
- :param response: instance of `requests.Response` class
- :param method: HTTP method used for request
- :param url: URL used for request
- """
-
- req_id = response.headers.get("x-openstack-request-id")
- # NOTE(hdd) true for older versions of nova and cinder
- if not req_id:
- req_id = response.headers.get("x-compute-request-id")
- kwargs = {
- "http_status": response.status_code,
- "response": response,
- "method": method,
- "url": url,
- "request_id": req_id,
- }
- if "retry-after" in response.headers:
- kwargs["retry_after"] = response.headers["retry-after"]
-
- content_type = response.headers.get("Content-Type", "")
- if content_type.startswith("application/json"):
- try:
- body = response.json()
- except ValueError:
- pass
- else:
- if isinstance(body, dict):
- error = body.get(list(body)[0])
- if isinstance(error, dict):
- kwargs["message"] = (error.get("message") or
- error.get("faultstring"))
- kwargs["details"] = (error.get("details") or
- six.text_type(body))
- elif content_type.startswith("text/"):
- kwargs["details"] = getattr(response, 'text', '')
-
- try:
- cls = _code_map[response.status_code]
- except KeyError:
- if 500 <= response.status_code < 600:
- cls = HttpServerError
- elif 400 <= response.status_code < 500:
- cls = HTTPClientError
- else:
- cls = HttpError
- return cls(**kwargs)
-
-
-# NOTE(akurilin): This alias should be left here to support backwards
-# compatibility until we are sure that usage of these exceptions in
-# projects is correct.
-HTTPNotImplemented = HttpNotImplemented
-Timeout = RequestTimeout
-HTTPError = HttpError
-
-
class CertificateConfigError(Exception):
"""Error reading the certificate."""
def __init__(self, output):
@@ -484,46 +143,35 @@ class CMSError(Exception):
super(CMSError, self).__init__(msg)
-class EmptyCatalog(EndpointNotFound):
- """The service catalog is empty."""
- pass
+EndpointException = new_exceptions.CatalogException
+EmptyCatalog = new_exceptions.EmptyCatalog
+EndpointNotFound = new_exceptions.EndpointNotFound
-class SSLError(ConnectionRefused):
- """An SSL error occurred."""
+class NoUniqueMatch(EndpointException):
+ """Multiple entities found instead of one."""
+ pass
-class DiscoveryFailure(ClientException):
- """Discovery of client versions failed."""
+class AmbiguousEndpoints(EndpointException):
+ """Found more than one matching endpoint in Service Catalog."""
+ def __init__(self, endpoints=None):
+ super(AmbiguousEndpoints, self).__init__(
+ _("AmbiguousEndpoints: %r") % endpoints)
+ self.endpoints = endpoints
-class VersionNotAvailable(DiscoveryFailure):
- """Discovery failed as the version you requested is not available."""
+DiscoveryFailure = new_exceptions.DiscoveryFailure
+VersionNotAvailable = new_exceptions.VersionNotAvailable
class MethodNotImplemented(ClientException):
"""Method not implemented by the keystoneclient API."""
-class MissingAuthPlugin(ClientException):
- """An authenticated request is required but no plugin available."""
-
-
-class NoMatchingPlugin(ClientException):
- """There were no auth plugins that could be created from the parameters
- provided.
-
- :param str name: The name of the plugin that was attempted to load.
-
- .. py:attribute:: name
-
- The name of the plugin that was attempted to load.
- """
-
- def __init__(self, name):
- self.name = name
- msg = _('The plugin %s could not be found') % name
- super(NoMatchingPlugin, self).__init__(msg)
+MissingAuthPlugin = new_exceptions.MissingAuthPlugin
+NoMatchingPlugin = new_exceptions.NoMatchingPlugin
+InvalidResponse = new_exceptions.InvalidResponse
class UnsupportedParameters(ClientException):
@@ -541,11 +189,3 @@ class UnsupportedParameters(ClientException):
m = _('The following parameters were given that are unsupported: %s')
super(UnsupportedParameters, self).__init__(m % ', '.join(self.names))
-
-
-class InvalidResponse(ClientException):
- """The response from the server is not valid for this request."""
-
- def __init__(self, response):
- super(InvalidResponse, self).__init__()
- self.response = response
diff --git a/keystoneclient/fixture/discovery.py b/keystoneclient/fixture/discovery.py
index 50a6bce..c531cae 100644
--- a/keystoneclient/fixture/discovery.py
+++ b/keystoneclient/fixture/discovery.py
@@ -10,251 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-import datetime
-
-from oslo_utils import timeutils
-
-from keystoneclient import utils
+from keystoneauth1 import fixture
__all__ = ['DiscoveryList',
'V2Discovery',
'V3Discovery',
]
-_DEFAULT_DAYS_AGO = 30
-
-
-class DiscoveryBase(dict):
- """The basic version discovery structure.
-
- All version discovery elements should have access to these values.
-
- :param string id: The version id for this version entry.
- :param string status: The status of this entry.
- :param DateTime updated: When the API was last updated.
- """
-
- @utils.positional()
- def __init__(self, id, status=None, updated=None):
- super(DiscoveryBase, self).__init__()
-
- self.id = id
- self.status = status or 'stable'
- self.updated = updated or (timeutils.utcnow() -
- datetime.timedelta(days=_DEFAULT_DAYS_AGO))
-
- @property
- def id(self):
- return self.get('id')
-
- @id.setter
- def id(self, value):
- self['id'] = value
-
- @property
- def status(self):
- return self.get('status')
-
- @status.setter
- def status(self, value):
- self['status'] = value
-
- @property
- def links(self):
- return self.setdefault('links', [])
-
- @property
- def updated_str(self):
- return self.get('updated')
-
- @updated_str.setter
- def updated_str(self, value):
- self['updated'] = value
-
- @property
- def updated(self):
- return timeutils.parse_isotime(self.updated_str)
-
- @updated.setter
- def updated(self, value):
- self.updated_str = utils.isotime(value)
-
- @utils.positional()
- def add_link(self, href, rel='self', type=None):
- link = {'href': href, 'rel': rel}
- if type:
- link['type'] = type
- self.links.append(link)
- return link
-
- @property
- def media_types(self):
- return self.setdefault('media-types', [])
-
- @utils.positional(1)
- def add_media_type(self, base, type):
- mt = {'base': base, 'type': type}
- self.media_types.append(mt)
- return mt
-
-
-class V2Discovery(DiscoveryBase):
- """A Version element for a V2 identity service endpoint.
-
- Provides some default values and helper methods for creating a v2.0
- endpoint version structure. Clients should use this instead of creating
- their own structures.
-
- :param string href: The url that this entry should point to.
- :param string id: The version id that should be reported. (optional)
- Defaults to 'v2.0'.
- :param bool html: Add HTML describedby links to the structure.
- :param bool pdf: Add PDF describedby links to the structure.
-
- """
-
- _DESC_URL = 'http://docs.openstack.org/api/openstack-identity-service/2.0/'
-
- @utils.positional()
- def __init__(self, href, id=None, html=True, pdf=True, **kwargs):
- super(V2Discovery, self).__init__(id or 'v2.0', **kwargs)
-
- self.add_link(href)
-
- if html:
- self.add_html_description()
- if pdf:
- self.add_pdf_description()
-
- def add_html_description(self):
- """Add the HTML described by links.
-
- The standard structure includes a link to a HTML document with the
- API specification. Add it to this entry.
- """
- self.add_link(href=self._DESC_URL + 'content',
- rel='describedby',
- type='text/html')
-
- def add_pdf_description(self):
- """Add the PDF described by links.
-
- The standard structure includes a link to a PDF document with the
- API specification. Add it to this entry.
- """
- self.add_link(href=self._DESC_URL + 'identity-dev-guide-2.0.pdf',
- rel='describedby',
- type='application/pdf')
-
-
-class V3Discovery(DiscoveryBase):
- """A Version element for a V3 identity service endpoint.
-
- Provides some default values and helper methods for creating a v3
- endpoint version structure. Clients should use this instead of creating
- their own structures.
-
- :param href: The url that this entry should point to.
- :param string id: The version id that should be reported. (optional)
- Defaults to 'v3.0'.
- :param bool json: Add JSON media-type elements to the structure.
- :param bool xml: Add XML media-type elements to the structure.
- """
-
- @utils.positional()
- def __init__(self, href, id=None, json=True, xml=True, **kwargs):
- super(V3Discovery, self).__init__(id or 'v3.0', **kwargs)
-
- self.add_link(href)
-
- if json:
- self.add_json_media_type()
- if xml:
- self.add_xml_media_type()
-
- def add_json_media_type(self):
- """Add the JSON media-type links.
-
- The standard structure includes a list of media-types that the endpoint
- supports. Add JSON to the list.
- """
- self.add_media_type(base='application/json',
- type='application/vnd.openstack.identity-v3+json')
-
- def add_xml_media_type(self):
- """Add the XML media-type links.
-
- The standard structure includes a list of media-types that the endpoint
- supports. Add XML to the list.
- """
- self.add_media_type(base='application/xml',
- type='application/vnd.openstack.identity-v3+xml')
-
-
-class DiscoveryList(dict):
- """A List of version elements.
-
- Creates a correctly structured list of identity service endpoints for
- use in testing with discovery.
-
- :param string href: The url that this should be based at.
- :param bool v2: Add a v2 element.
- :param bool v3: Add a v3 element.
- :param string v2_status: The status to use for the v2 element.
- :param DateTime v2_updated: The update time to use for the v2 element.
- :param bool v2_html: True to add a html link to the v2 element.
- :param bool v2_pdf: True to add a pdf link to the v2 element.
- :param string v3_status: The status to use for the v3 element.
- :param DateTime v3_updated: The update time to use for the v3 element.
- :param bool v3_json: True to add a html link to the v2 element.
- :param bool v3_xml: True to add a pdf link to the v2 element.
- """
-
- TEST_URL = 'http://keystone.host:5000/'
-
- @utils.positional(2)
- def __init__(self, href=None, v2=True, v3=True, v2_id=None, v3_id=None,
- v2_status=None, v2_updated=None, v2_html=True, v2_pdf=True,
- v3_status=None, v3_updated=None, v3_json=True, v3_xml=True):
- super(DiscoveryList, self).__init__(versions={'values': []})
-
- href = href or self.TEST_URL
-
- if v2:
- v2_href = href.rstrip('/') + '/v2.0'
- self.add_v2(v2_href, id=v2_id, status=v2_status,
- updated=v2_updated, html=v2_html, pdf=v2_pdf)
-
- if v3:
- v3_href = href.rstrip('/') + '/v3'
- self.add_v3(v3_href, id=v3_id, status=v3_status,
- updated=v3_updated, json=v3_json, xml=v3_xml)
-
- @property
- def versions(self):
- return self['versions']['values']
-
- def add_version(self, version):
- """Add a new version structure to the list.
-
- :param dict version: A new version structure to add to the list.
- """
- self.versions.append(version)
-
- def add_v2(self, href, **kwargs):
- """Add a v2 version to the list.
-
- The parameters are the same as V2Discovery.
- """
- obj = V2Discovery(href, **kwargs)
- self.add_version(obj)
- return obj
-
- def add_v3(self, href, **kwargs):
- """Add a v3 version to the list.
-
- The parameters are the same as V3Discovery.
- """
- obj = V3Discovery(href, **kwargs)
- self.add_version(obj)
- return obj
+DiscoveryList = fixture.DiscoveryList
+V2Discovery = fixture.V2Discovery
+V3Discovery = fixture.V3Discovery
diff --git a/keystoneclient/fixture/v2.py b/keystoneclient/fixture/v2.py
index da60b9b..6e6a4e3 100644
--- a/keystoneclient/fixture/v2.py
+++ b/keystoneclient/fixture/v2.py
@@ -10,237 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-import datetime
-import uuid
+from keystoneauth1 import fixture
-from oslo_utils import timeutils
-from keystoneclient.fixture import exception
-from keystoneclient import utils
-
-
-class _Service(dict):
-
- def add_endpoint(self, public, admin=None, internal=None,
- tenant_id=None, region=None, id=None):
- data = {'tenantId': tenant_id or uuid.uuid4().hex,
- 'publicURL': public,
- 'adminURL': admin or public,
- 'internalURL': internal or public,
- 'region': region,
- 'id': id or uuid.uuid4().hex}
-
- self.setdefault('endpoints', []).append(data)
- return data
-
-
-class Token(dict):
- """A V2 Keystone token that can be used for testing.
-
- This object is designed to allow clients to generate a correct V2 token for
- use in there test code. It should prevent clients from having to know the
- correct token format and allow them to test the portions of token handling
- that matter to them and not copy and paste sample.
- """
-
- def __init__(self, token_id=None, expires=None, issued=None,
- tenant_id=None, tenant_name=None, user_id=None,
- user_name=None, trust_id=None, trustee_user_id=None,
- audit_id=None, audit_chain_id=None):
- super(Token, self).__init__()
-
- self.token_id = token_id or uuid.uuid4().hex
- self.user_id = user_id or uuid.uuid4().hex
- self.user_name = user_name or uuid.uuid4().hex
- self.audit_id = audit_id or uuid.uuid4().hex
-
- if not issued:
- issued = timeutils.utcnow() - datetime.timedelta(minutes=2)
- if not expires:
- expires = issued + datetime.timedelta(hours=1)
-
- try:
- self.issued = issued
- except (TypeError, AttributeError):
- # issued should be able to be passed as a string so ignore
- self.issued_str = issued
-
- try:
- self.expires = expires
- except (TypeError, AttributeError):
- # expires should be able to be passed as a string so ignore
- self.expires_str = expires
-
- if tenant_id or tenant_name:
- self.set_scope(tenant_id, tenant_name)
-
- if trust_id or trustee_user_id:
- # the trustee_user_id will generally be the same as the user_id as
- # the token is being issued to the trustee
- self.set_trust(id=trust_id,
- trustee_user_id=trustee_user_id or user_id)
-
- if audit_chain_id:
- self.audit_chain_id = audit_chain_id
-
- @property
- def root(self):
- return self.setdefault('access', {})
-
- @property
- def _token(self):
- return self.root.setdefault('token', {})
-
- @property
- def token_id(self):
- return self._token['id']
-
- @token_id.setter
- def token_id(self, value):
- self._token['id'] = value
-
- @property
- def expires_str(self):
- return self._token['expires']
-
- @expires_str.setter
- def expires_str(self, value):
- self._token['expires'] = value
-
- @property
- def expires(self):
- return timeutils.parse_isotime(self.expires_str)
-
- @expires.setter
- def expires(self, value):
- self.expires_str = utils.isotime(value)
-
- @property
- def issued_str(self):
- return self._token['issued_at']
-
- @issued_str.setter
- def issued_str(self, value):
- self._token['issued_at'] = value
-
- @property
- def issued(self):
- return timeutils.parse_isotime(self.issued_str)
-
- @issued.setter
- def issued(self, value):
- self.issued_str = utils.isotime(value)
-
- @property
- def _user(self):
- return self.root.setdefault('user', {})
-
- @property
- def user_id(self):
- return self._user['id']
-
- @user_id.setter
- def user_id(self, value):
- self._user['id'] = value
-
- @property
- def user_name(self):
- return self._user['name']
-
- @user_name.setter
- def user_name(self, value):
- self._user['name'] = value
-
- @property
- def tenant_id(self):
- return self._token.get('tenant', {}).get('id')
-
- @tenant_id.setter
- def tenant_id(self, value):
- self._token.setdefault('tenant', {})['id'] = value
-
- @property
- def tenant_name(self):
- return self._token.get('tenant', {}).get('name')
-
- @tenant_name.setter
- def tenant_name(self, value):
- self._token.setdefault('tenant', {})['name'] = value
-
- @property
- def _metadata(self):
- return self.root.setdefault('metadata', {})
-
- @property
- def trust_id(self):
- return self.root.setdefault('trust', {}).get('id')
-
- @trust_id.setter
- def trust_id(self, value):
- self.root.setdefault('trust', {})['id'] = value
-
- @property
- def trustee_user_id(self):
- return self.root.setdefault('trust', {}).get('trustee_user_id')
-
- @trustee_user_id.setter
- def trustee_user_id(self, value):
- self.root.setdefault('trust', {})['trustee_user_id'] = value
-
- @property
- def audit_id(self):
- try:
- return self._token.get('audit_ids', [])[0]
- except IndexError:
- return None
-
- @audit_id.setter
- def audit_id(self, value):
- audit_chain_id = self.audit_chain_id
- lval = [value] if audit_chain_id else [value, audit_chain_id]
- self._token['audit_ids'] = lval
-
- @property
- def audit_chain_id(self):
- try:
- return self._token.get('audit_ids', [])[1]
- except IndexError:
- return None
-
- @audit_chain_id.setter
- def audit_chain_id(self, value):
- self._token['audit_ids'] = [self.audit_id, value]
-
- def validate(self):
- scoped = 'tenant' in self.token
- catalog = self.root.get('serviceCatalog')
-
- if catalog and not scoped:
- msg = 'You cannot have a service catalog on an unscoped token'
- raise exception.FixtureValidationError(msg)
-
- if scoped and not self.user.get('roles'):
- msg = 'You must have roles on a token to scope it'
- raise exception.FixtureValidationError(msg)
-
- def add_role(self, name=None, id=None):
- id = id or uuid.uuid4().hex
- name = name or uuid.uuid4().hex
- roles = self._user.setdefault('roles', [])
- roles.append({'name': name})
- self._metadata.setdefault('roles', []).append(id)
- return {'id': id, 'name': name}
-
- def add_service(self, type, name=None):
- name = name or uuid.uuid4().hex
- service = _Service(name=name, type=type)
- self.root.setdefault('serviceCatalog', []).append(service)
- return service
-
- def set_scope(self, id=None, name=None):
- self.tenant_id = id or uuid.uuid4().hex
- self.tenant_name = name or uuid.uuid4().hex
-
- def set_trust(self, id=None, trustee_user_id=None):
- self.trust_id = id or uuid.uuid4().hex
- self.trustee_user_id = trustee_user_id or uuid.uuid4().hex
+Token = fixture.V2Token
diff --git a/keystoneclient/fixture/v3.py b/keystoneclient/fixture/v3.py
index d27a93b..2ccaca5 100644
--- a/keystoneclient/fixture/v3.py
+++ b/keystoneclient/fixture/v3.py
@@ -10,413 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
-import datetime
-import uuid
+from keystoneauth1 import fixture
-from oslo_utils import timeutils
-from keystoneclient.fixture import exception
-from keystoneclient import utils
-
-
-class _Service(dict):
- """One of the services that exist in the catalog.
-
- You use this by adding a service to a token which returns an instance of
- this object and then you can add_endpoints to the service.
- """
-
- def add_endpoint(self, interface, url, region=None, id=None):
- data = {'id': id or uuid.uuid4().hex,
- 'interface': interface,
- 'url': url,
- 'region': region,
- 'region_id': region}
- self.setdefault('endpoints', []).append(data)
- return data
-
- def add_standard_endpoints(self, public=None, admin=None, internal=None,
- region=None):
- ret = []
-
- if public:
- ret.append(self.add_endpoint('public', public, region=region))
- if admin:
- ret.append(self.add_endpoint('admin', admin, region=region))
- if internal:
- ret.append(self.add_endpoint('internal', internal, region=region))
-
- return ret
-
-
-class Token(dict):
- """A V3 Keystone token that can be used for testing.
-
- This object is designed to allow clients to generate a correct V3 token for
- use in there test code. It should prevent clients from having to know the
- correct token format and allow them to test the portions of token handling
- that matter to them and not copy and paste sample.
- """
-
- def __init__(self, expires=None, issued=None, user_id=None, user_name=None,
- user_domain_id=None, user_domain_name=None, methods=None,
- project_id=None, project_name=None, project_domain_id=None,
- project_domain_name=None, domain_id=None, domain_name=None,
- trust_id=None, trust_impersonation=None, trustee_user_id=None,
- trustor_user_id=None, oauth_access_token_id=None,
- oauth_consumer_id=None, audit_id=None, audit_chain_id=None):
- super(Token, self).__init__()
-
- self.user_id = user_id or uuid.uuid4().hex
- self.user_name = user_name or uuid.uuid4().hex
- self.user_domain_id = user_domain_id or uuid.uuid4().hex
- self.user_domain_name = user_domain_name or uuid.uuid4().hex
- self.audit_id = audit_id or uuid.uuid4().hex
-
- if not methods:
- methods = ['password']
- self.methods.extend(methods)
-
- if not issued:
- issued = timeutils.utcnow() - datetime.timedelta(minutes=2)
-
- try:
- self.issued = issued
- except (TypeError, AttributeError):
- # issued should be able to be passed as a string so ignore
- self.issued_str = issued
-
- if not expires:
- expires = self.issued + datetime.timedelta(hours=1)
-
- try:
- self.expires = expires
- except (TypeError, AttributeError):
- # expires should be able to be passed as a string so ignore
- self.expires_str = expires
-
- if (project_id or project_name or
- project_domain_id or project_domain_name):
- self.set_project_scope(id=project_id,
- name=project_name,
- domain_id=project_domain_id,
- domain_name=project_domain_name)
-
- if domain_id or domain_name:
- self.set_domain_scope(id=domain_id, name=domain_name)
-
- if (trust_id or (trust_impersonation is not None) or
- trustee_user_id or trustor_user_id):
- self.set_trust_scope(id=trust_id,
- impersonation=trust_impersonation,
- trustee_user_id=trustee_user_id,
- trustor_user_id=trustor_user_id)
-
- if oauth_access_token_id or oauth_consumer_id:
- self.set_oauth(access_token_id=oauth_access_token_id,
- consumer_id=oauth_consumer_id)
-
- if audit_chain_id:
- self.audit_chain_id = audit_chain_id
-
- @property
- def root(self):
- return self.setdefault('token', {})
-
- @property
- def expires_str(self):
- return self.root.get('expires_at')
-
- @expires_str.setter
- def expires_str(self, value):
- self.root['expires_at'] = value
-
- @property
- def expires(self):
- return timeutils.parse_isotime(self.expires_str)
-
- @expires.setter
- def expires(self, value):
- self.expires_str = utils.isotime(value, subsecond=True)
-
- @property
- def issued_str(self):
- return self.root.get('issued_at')
-
- @issued_str.setter
- def issued_str(self, value):
- self.root['issued_at'] = value
-
- @property
- def issued(self):
- return timeutils.parse_isotime(self.issued_str)
-
- @issued.setter
- def issued(self, value):
- self.issued_str = utils.isotime(value, subsecond=True)
-
- @property
- def _user(self):
- return self.root.setdefault('user', {})
-
- @property
- def user_id(self):
- return self._user.get('id')
-
- @user_id.setter
- def user_id(self, value):
- self._user['id'] = value
-
- @property
- def user_name(self):
- return self._user.get('name')
-
- @user_name.setter
- def user_name(self, value):
- self._user['name'] = value
-
- @property
- def _user_domain(self):
- return self._user.setdefault('domain', {})
-
- @property
- def user_domain_id(self):
- return self._user_domain.get('id')
-
- @user_domain_id.setter
- def user_domain_id(self, value):
- self._user_domain['id'] = value
-
- @property
- def user_domain_name(self):
- return self._user_domain.get('name')
-
- @user_domain_name.setter
- def user_domain_name(self, value):
- self._user_domain['name'] = value
-
- @property
- def methods(self):
- return self.root.setdefault('methods', [])
-
- @property
- def project_id(self):
- return self.root.get('project', {}).get('id')
-
- @project_id.setter
- def project_id(self, value):
- self.root.setdefault('project', {})['id'] = value
-
- @property
- def project_name(self):
- return self.root.get('project', {}).get('name')
-
- @project_name.setter
- def project_name(self, value):
- self.root.setdefault('project', {})['name'] = value
-
- @property
- def project_domain_id(self):
- return self.root.get('project', {}).get('domain', {}).get('id')
-
- @project_domain_id.setter
- def project_domain_id(self, value):
- project = self.root.setdefault('project', {})
- project.setdefault('domain', {})['id'] = value
-
- @property
- def project_domain_name(self):
- return self.root.get('project', {}).get('domain', {}).get('name')
-
- @project_domain_name.setter
- def project_domain_name(self, value):
- project = self.root.setdefault('project', {})
- project.setdefault('domain', {})['name'] = value
-
- @property
- def domain_id(self):
- return self.root.get('domain', {}).get('id')
-
- @domain_id.setter
- def domain_id(self, value):
- self.root.setdefault('domain', {})['id'] = value
-
- @property
- def domain_name(self):
- return self.root.get('domain', {}).get('name')
-
- @domain_name.setter
- def domain_name(self, value):
- self.root.setdefault('domain', {})['name'] = value
-
- @property
- def trust_id(self):
- return self.root.get('OS-TRUST:trust', {}).get('id')
-
- @trust_id.setter
- def trust_id(self, value):
- self.root.setdefault('OS-TRUST:trust', {})['id'] = value
-
- @property
- def trust_impersonation(self):
- return self.root.get('OS-TRUST:trust', {}).get('impersonation')
-
- @trust_impersonation.setter
- def trust_impersonation(self, value):
- self.root.setdefault('OS-TRUST:trust', {})['impersonation'] = value
-
- @property
- def trustee_user_id(self):
- trust = self.root.get('OS-TRUST:trust', {})
- return trust.get('trustee_user', {}).get('id')
-
- @trustee_user_id.setter
- def trustee_user_id(self, value):
- trust = self.root.setdefault('OS-TRUST:trust', {})
- trust.setdefault('trustee_user', {})['id'] = value
-
- @property
- def trustor_user_id(self):
- trust = self.root.get('OS-TRUST:trust', {})
- return trust.get('trustor_user', {}).get('id')
-
- @trustor_user_id.setter
- def trustor_user_id(self, value):
- trust = self.root.setdefault('OS-TRUST:trust', {})
- trust.setdefault('trustor_user', {})['id'] = value
-
- @property
- def oauth_access_token_id(self):
- return self.root.get('OS-OAUTH1', {}).get('access_token_id')
-
- @oauth_access_token_id.setter
- def oauth_access_token_id(self, value):
- self.root.setdefault('OS-OAUTH1', {})['access_token_id'] = value
-
- @property
- def oauth_consumer_id(self):
- return self.root.get('OS-OAUTH1', {}).get('consumer_id')
-
- @oauth_consumer_id.setter
- def oauth_consumer_id(self, value):
- self.root.setdefault('OS-OAUTH1', {})['consumer_id'] = value
-
- @property
- def audit_id(self):
- try:
- return self.root.get('audit_ids', [])[0]
- except IndexError:
- return None
-
- @audit_id.setter
- def audit_id(self, value):
- audit_chain_id = self.audit_chain_id
- lval = [value] if audit_chain_id else [value, audit_chain_id]
- self.root['audit_ids'] = lval
-
- @property
- def audit_chain_id(self):
- try:
- return self.root.get('audit_ids', [])[1]
- except IndexError:
- return None
-
- @audit_chain_id.setter
- def audit_chain_id(self, value):
- self.root['audit_ids'] = [self.audit_id, value]
-
- @property
- def role_ids(self):
- return [r['id'] for r in self.root.get('roles', [])]
-
- @property
- def role_names(self):
- return [r['name'] for r in self.root.get('roles', [])]
-
- def validate(self):
- project = self.root.get('project')
- domain = self.root.get('domain')
- trust = self.root.get('OS-TRUST:trust')
- catalog = self.root.get('catalog')
- roles = self.root.get('roles')
- scoped = project or domain or trust
-
- if sum((bool(project), bool(domain), bool(trust))) > 1:
- msg = 'You cannot scope to multiple targets'
- raise exception.FixtureValidationError(msg)
-
- if catalog and not scoped:
- msg = 'You cannot have a service catalog on an unscoped token'
- raise exception.FixtureValidationError(msg)
-
- if scoped and not self.user.get('roles'):
- msg = 'You must have roles on a token to scope it'
- raise exception.FixtureValidationError(msg)
-
- if bool(scoped) != bool(roles):
- msg = 'You must be scoped to have roles and vice-versa'
- raise exception.FixtureValidationError(msg)
-
- def add_role(self, name=None, id=None):
- roles = self.root.setdefault('roles', [])
- data = {'id': id or uuid.uuid4().hex,
- 'name': name or uuid.uuid4().hex}
- roles.append(data)
- return data
-
- def add_service(self, type, name=None, id=None):
- service = _Service(type=type, id=id or uuid.uuid4().hex)
- if name:
- service['name'] = name
- self.root.setdefault('catalog', []).append(service)
- return service
-
- def set_project_scope(self, id=None, name=None, domain_id=None,
- domain_name=None):
- self.project_id = id or uuid.uuid4().hex
- self.project_name = name or uuid.uuid4().hex
- self.project_domain_id = domain_id or uuid.uuid4().hex
- self.project_domain_name = domain_name or uuid.uuid4().hex
-
- def set_domain_scope(self, id=None, name=None):
- self.domain_id = id or uuid.uuid4().hex
- self.domain_name = name or uuid.uuid4().hex
-
- def set_trust_scope(self, id=None, impersonation=False,
- trustee_user_id=None, trustor_user_id=None):
- self.trust_id = id or uuid.uuid4().hex
- self.trust_impersonation = impersonation
- self.trustee_user_id = trustee_user_id or uuid.uuid4().hex
- self.trustor_user_id = trustor_user_id or uuid.uuid4().hex
-
- def set_oauth(self, access_token_id=None, consumer_id=None):
- self.oauth_access_token_id = access_token_id or uuid.uuid4().hex
- self.oauth_consumer_id = consumer_id or uuid.uuid4().hex
-
-
-class V3FederationToken(Token):
- """A V3 Keystone Federation token that can be used for testing.
-
- Similar to V3Token, this object is designed to allow clients to generate
- a correct V3 federation token for use in test code.
- """
-
- def __init__(self, methods=None, identity_provider=None, protocol=None,
- groups=None):
- methods = methods or ['saml2']
- super(V3FederationToken, self).__init__(methods=methods)
- # NOTE(stevemar): Federated tokens do not have a domain for the user
- del self._user['domain']
- self.add_federation_info_to_user(identity_provider, protocol, groups)
-
- def add_federation_info_to_user(self, identity_provider=None,
- protocol=None, groups=None):
- data = {
- "OS-FEDERATION": {
- "identity_provider": identity_provider or uuid.uuid4().hex,
- "protocol": protocol or uuid.uuid4().hex,
- "groups": groups or [{"id": uuid.uuid4().hex}]
- }
- }
- self._user.update(data)
- return data
+Token = fixture.V3Token
+V3FederationToken = fixture.V3FederationToken
diff --git a/keystoneclient/middleware/__init__.py b/keystoneclient/middleware/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/keystoneclient/middleware/__init__.py
+++ /dev/null
diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py
deleted file mode 100644
index 86cc11a..0000000
--- a/keystoneclient/middleware/auth_token.py
+++ /dev/null
@@ -1,1624 +0,0 @@
-# Copyright 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-TOKEN-BASED AUTH MIDDLEWARE
-
-.. warning::
-
- This module is DEPRECATED. The auth_token middleware has been moved to the
- `keystonemiddleware repository
- <http://docs.openstack.org/developer/keystonemiddleware/>`_.
-
-This WSGI component:
-
-* Verifies that incoming client requests have valid tokens by validating
- tokens with the auth service.
-* Rejects unauthenticated requests UNLESS it is in 'delay_auth_decision'
- mode, which means the final decision is delegated to the downstream WSGI
- component (usually the OpenStack service)
-* Collects and forwards identity information based on a valid token
- such as user name, tenant, etc
-
-HEADERS
--------
-
-* Headers starting with HTTP\_ is a standard http header
-* Headers starting with HTTP_X is an extended http header
-
-Coming in from initial call from client or customer
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-HTTP_X_AUTH_TOKEN
- The client token being passed in.
-
-HTTP_X_STORAGE_TOKEN
- The client token being passed in (legacy Rackspace use) to support
- swift/cloud files
-
-Used for communication between components
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-WWW-Authenticate
- HTTP header returned to a user indicating which endpoint to use
- to retrieve a new token
-
-What we add to the request for use by the OpenStack service
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-HTTP_X_IDENTITY_STATUS
- 'Confirmed' or 'Invalid'
- The underlying service will only see a value of 'Invalid' if the Middleware
- is configured to run in 'delay_auth_decision' mode
-
-HTTP_X_DOMAIN_ID
- Identity service managed unique identifier, string. Only present if
- this is a domain-scoped v3 token.
-
-HTTP_X_DOMAIN_NAME
- Unique domain name, string. Only present if this is a domain-scoped
- v3 token.
-
-HTTP_X_PROJECT_ID
- Identity service managed unique identifier, string. Only present if
- this is a project-scoped v3 token, or a tenant-scoped v2 token.
-
-HTTP_X_PROJECT_NAME
- Project name, unique within owning domain, string. Only present if
- this is a project-scoped v3 token, or a tenant-scoped v2 token.
-
-HTTP_X_PROJECT_DOMAIN_ID
- Identity service managed unique identifier of owning domain of
- project, string. Only present if this is a project-scoped v3 token. If
- this variable is set, this indicates that the PROJECT_NAME can only
- be assumed to be unique within this domain.
-
-HTTP_X_PROJECT_DOMAIN_NAME
- Name of owning domain of project, string. Only present if this is a
- project-scoped v3 token. If this variable is set, this indicates that
- the PROJECT_NAME can only be assumed to be unique within this domain.
-
-HTTP_X_USER_ID
- Identity-service managed unique identifier, string
-
-HTTP_X_USER_NAME
- User identifier, unique within owning domain, string
-
-HTTP_X_USER_DOMAIN_ID
- Identity service managed unique identifier of owning domain of
- user, string. If this variable is set, this indicates that the USER_NAME
- can only be assumed to be unique within this domain.
-
-HTTP_X_USER_DOMAIN_NAME
- Name of owning domain of user, string. If this variable is set, this
- indicates that the USER_NAME can only be assumed to be unique within
- this domain.
-
-HTTP_X_ROLES
- Comma delimited list of case-sensitive role names
-
-HTTP_X_SERVICE_CATALOG
- json encoded keystone service catalog (optional).
- For compatibility reasons this catalog will always be in the V2 catalog
- format even if it is a v3 token.
-
-HTTP_X_TENANT_ID
- *Deprecated* in favor of HTTP_X_PROJECT_ID
- Identity service managed unique identifier, string. For v3 tokens, this
- will be set to the same value as HTTP_X_PROJECT_ID
-
-HTTP_X_TENANT_NAME
- *Deprecated* in favor of HTTP_X_PROJECT_NAME
- Project identifier, unique within owning domain, string. For v3 tokens,
- this will be set to the same value as HTTP_X_PROJECT_NAME
-
-HTTP_X_TENANT
- *Deprecated* in favor of HTTP_X_TENANT_ID and HTTP_X_TENANT_NAME
- Keystone-assigned unique identifier, string. For v3 tokens, this
- will be set to the same value as HTTP_X_PROJECT_ID
-
-HTTP_X_USER
- *Deprecated* in favor of HTTP_X_USER_ID and HTTP_X_USER_NAME
- User name, unique within owning domain, string
-
-HTTP_X_ROLE
- *Deprecated* in favor of HTTP_X_ROLES
- Will contain the same values as HTTP_X_ROLES.
-
-OTHER ENVIRONMENT VARIABLES
----------------------------
-
-keystone.token_info
- Information about the token discovered in the process of
- validation. This may include extended information returned by the
- Keystone token validation call, as well as basic information about
- the tenant and user.
-
-"""
-
-import contextlib
-import datetime
-import logging
-import os
-import stat
-import tempfile
-import time
-
-import netaddr
-from oslo_config import cfg
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-import requests
-import six
-from six.moves import urllib
-
-from keystoneclient import access
-from keystoneclient.common import cms
-from keystoneclient import exceptions
-from keystoneclient.middleware import memcache_crypt
-from keystoneclient.openstack.common import memorycache
-from keystoneclient import utils
-
-
-# alternative middleware configuration in the main application's
-# configuration file e.g. in nova.conf
-# [keystone_authtoken]
-# auth_host = 127.0.0.1
-# auth_port = 35357
-# auth_protocol = http
-# admin_tenant_name = admin
-# admin_user = admin
-# admin_password = badpassword
-
-# when deploy Keystone auth_token middleware with Swift, user may elect
-# to use Swift memcache instead of the local Keystone memcache. Swift memcache
-# is passed in from the request environment and its identified by the
-# 'swift.cache' key. However it could be different, depending on deployment.
-# To use Swift memcache, you must set the 'cache' option to the environment
-# key where the Swift cache object is stored.
-
-
-# NOTE(jamielennox): A number of options below are deprecated however are left
-# in the list and only mentioned as deprecated in the help string. This is
-# because we have to provide the same deprecation functionality for arguments
-# passed in via the conf in __init__ (from paste) and there is no way to test
-# that the default value was set or not in CONF.
-# Also if we were to remove the options from the CONF list (as typical CONF
-# deprecation works) then other projects will not be able to override the
-# options via CONF.
-
-opts = [
- cfg.StrOpt('auth_admin_prefix',
- default='',
- help='Prefix to prepend at the beginning of the path. '
- 'Deprecated, use identity_uri.'),
- cfg.StrOpt('auth_host',
- default='127.0.0.1',
- help='Host providing the admin Identity API endpoint. '
- 'Deprecated, use identity_uri.'),
- cfg.IntOpt('auth_port',
- default=35357,
- help='Port of the admin Identity API endpoint. '
- 'Deprecated, use identity_uri.'),
- cfg.StrOpt('auth_protocol',
- default='https',
- help='Protocol of the admin Identity API endpoint '
- '(http or https). Deprecated, use identity_uri.'),
- cfg.StrOpt('auth_uri',
- default=None,
- # FIXME(dolph): should be default='http://127.0.0.1:5000/v2.0/',
- # or (depending on client support) an unversioned, publicly
- # accessible identity endpoint (see bug 1207517)
- help='Complete public Identity API endpoint'),
- cfg.StrOpt('identity_uri',
- default=None,
- help='Complete admin Identity API endpoint. This should '
- 'specify the unversioned root endpoint '
- 'e.g. https://localhost:35357/'),
- cfg.StrOpt('auth_version',
- default=None,
- help='API version of the admin Identity API endpoint'),
- cfg.BoolOpt('delay_auth_decision',
- default=False,
- help='Do not handle authorization requests within the'
- ' middleware, but delegate the authorization decision to'
- ' downstream WSGI components'),
- cfg.BoolOpt('http_connect_timeout',
- default=None,
- help='Request timeout value for communicating with Identity'
- ' API server.'),
- cfg.IntOpt('http_request_max_retries',
- default=3,
- help='How many times are we trying to reconnect when'
- ' communicating with Identity API Server.'),
- cfg.StrOpt('admin_token',
- secret=True,
- help='This option is deprecated and may be removed in a future'
- ' release. Single shared secret with the Keystone configuration'
- ' used for bootstrapping a Keystone installation, or otherwise'
- ' bypassing the normal authentication process. This option'
- ' should not be used, use `admin_user` and `admin_password`'
- ' instead.'),
- cfg.StrOpt('admin_user',
- help='Keystone account username'),
- cfg.StrOpt('admin_password',
- secret=True,
- help='Keystone account password'),
- cfg.StrOpt('admin_tenant_name',
- default='admin',
- help='Keystone service account tenant name to validate'
- ' user tokens'),
- cfg.StrOpt('cache',
- default=None,
- help='Env key for the swift cache'),
- cfg.StrOpt('certfile',
- help='Required if Keystone server requires client certificate'),
- cfg.StrOpt('keyfile',
- help='Required if Keystone server requires client certificate'),
- cfg.StrOpt('cafile', default=None,
- help='A PEM encoded Certificate Authority to use when '
- 'verifying HTTPs connections. Defaults to system CAs.'),
- cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
- cfg.StrOpt('signing_dir',
- help='Directory used to cache files related to PKI tokens'),
- cfg.ListOpt('memcached_servers',
- deprecated_name='memcache_servers',
- help='Optionally specify a list of memcached server(s) to'
- ' use for caching. If left undefined, tokens will instead be'
- ' cached in-process.'),
- cfg.IntOpt('token_cache_time',
- default=300,
- help='In order to prevent excessive effort spent validating'
- ' tokens, the middleware caches previously-seen tokens for a'
- ' configurable duration (in seconds). Set to -1 to disable'
- ' caching completely.'),
- cfg.IntOpt('revocation_cache_time',
- default=10,
- help='Determines the frequency at which the list of revoked'
- ' tokens is retrieved from the Identity service (in seconds). A'
- ' high number of revocation events combined with a low cache'
- ' duration may significantly reduce performance.'),
- cfg.StrOpt('memcache_security_strategy',
- default=None,
- help='(optional) if defined, indicate whether token data'
- ' should be authenticated or authenticated and encrypted.'
- ' Acceptable values are MAC or ENCRYPT. If MAC, token data is'
- ' authenticated (with HMAC) in the cache. If ENCRYPT, token'
- ' data is encrypted and authenticated in the cache. If the'
- ' value is not one of these options or empty, auth_token will'
- ' raise an exception on initialization.'),
- cfg.StrOpt('memcache_secret_key',
- default=None,
- secret=True,
- help='(optional, mandatory if memcache_security_strategy is'
- ' defined) this string is used for key derivation.'),
- cfg.BoolOpt('include_service_catalog',
- default=True,
- help='(optional) indicate whether to set the X-Service-Catalog'
- ' header. If False, middleware will not ask for service'
- ' catalog on token validation and will not set the'
- ' X-Service-Catalog header.'),
- cfg.StrOpt('enforce_token_bind',
- default='permissive',
- help='Used to control the use and type of token binding. Can'
- ' be set to: "disabled" to not check token binding.'
- ' "permissive" (default) to validate binding information if the'
- ' bind type is of a form known to the server and ignore it if'
- ' not. "strict" like "permissive" but if the bind type is'
- ' unknown the token will be rejected. "required" any form of'
- ' token binding is needed to be allowed. Finally the name of a'
- ' binding method that must be present in tokens.'),
- cfg.BoolOpt('check_revocations_for_cached', default=False,
- help='If true, the revocation list will be checked for cached'
- ' tokens. This requires that PKI tokens are configured on the'
- ' Keystone server.'),
- cfg.ListOpt('hash_algorithms', default=['md5'],
- help='Hash algorithms to use for hashing PKI tokens. This may'
- ' be a single algorithm or multiple. The algorithms are those'
- ' supported by Python standard hashlib.new(). The hashes will'
- ' be tried in the order given, so put the preferred one first'
- ' for performance. The result of the first hash will be stored'
- ' in the cache. This will typically be set to multiple values'
- ' only while migrating from a less secure algorithm to a more'
- ' secure one. Once all the old tokens are expired this option'
- ' should be set to a single value for better performance.'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(opts, group='keystone_authtoken')
-
-LIST_OF_VERSIONS_TO_ATTEMPT = ['v2.0', 'v3.0']
-CACHE_KEY_TEMPLATE = 'tokens/%s'
-
-
-class BIND_MODE(object):
- DISABLED = 'disabled'
- PERMISSIVE = 'permissive'
- STRICT = 'strict'
- REQUIRED = 'required'
- KERBEROS = 'kerberos'
-
-
-def will_expire_soon(expiry):
- """Determines if expiration is about to occur.
-
- :param expiry: a datetime of the expected expiration
- :returns: boolean : true if expiration is within 30 seconds
- """
- soon = (timeutils.utcnow() + datetime.timedelta(seconds=30))
- return expiry < soon
-
-
-def _token_is_v2(token_info):
- return ('access' in token_info)
-
-
-def _token_is_v3(token_info):
- return ('token' in token_info)
-
-
-def confirm_token_not_expired(data):
- if not data:
- raise InvalidUserToken('Token authorization failed')
- if _token_is_v2(data):
- timestamp = data['access']['token']['expires']
- elif _token_is_v3(data):
- timestamp = data['token']['expires_at']
- else:
- raise InvalidUserToken('Token authorization failed')
- expires = timeutils.parse_isotime(timestamp)
- expires = timeutils.normalize_time(expires)
- utcnow = timeutils.utcnow()
- if utcnow >= expires:
- raise InvalidUserToken('Token authorization failed')
- return utils.isotime(at=expires, subsecond=True)
-
-
-def _v3_to_v2_catalog(catalog):
- """Convert a catalog to v2 format.
-
- X_SERVICE_CATALOG must be specified in v2 format. If you get a token
- that is in v3 convert it.
- """
- v2_services = []
- for v3_service in catalog:
- # first copy over the entries we allow for the service
- v2_service = {'type': v3_service['type']}
- try:
- v2_service['name'] = v3_service['name']
- except KeyError:
- pass
-
- # now convert the endpoints. Because in v3 we specify region per
- # URL not per group we have to collect all the entries of the same
- # region together before adding it to the new service.
- regions = {}
- for v3_endpoint in v3_service.get('endpoints', []):
- region_name = v3_endpoint.get('region')
- try:
- region = regions[region_name]
- except KeyError:
- region = {'region': region_name} if region_name else {}
- regions[region_name] = region
-
- interface_name = v3_endpoint['interface'].lower() + 'URL'
- region[interface_name] = v3_endpoint['url']
-
- v2_service['endpoints'] = list(regions.values())
- v2_services.append(v2_service)
-
- return v2_services
-
-
-def safe_quote(s):
- """URL-encode strings that are not already URL-encoded."""
- return urllib.parse.quote(s) if s == urllib.parse.unquote(s) else s
-
-
-def _conf_values_type_convert(conf):
- """Convert conf values into correct type."""
- if not conf:
- return {}
- _opts = {}
- opt_types = dict((o.dest, getattr(o, 'type', str)) for o in opts)
- for k, v in six.iteritems(conf):
- try:
- if v is None:
- _opts[k] = v
- else:
- _opts[k] = opt_types[k](v)
- except KeyError:
- _opts[k] = v
- except ValueError as e:
- raise ConfigurationError(
- 'Unable to convert the value of %s option into correct '
- 'type: %s' % (k, e))
- return _opts
-
-
-class InvalidUserToken(Exception):
- pass
-
-
-class ServiceError(Exception):
- pass
-
-
-class ConfigurationError(Exception):
- pass
-
-
-class NetworkError(Exception):
- pass
-
-
-class MiniResp(object):
- def __init__(self, error_message, env, headers=[]):
- # The HEAD method is unique: it must never return a body, even if
- # it reports an error (RFC-2616 clause 9.4). We relieve callers
- # from varying the error responses depending on the method.
- if env['REQUEST_METHOD'] == 'HEAD':
- self.body = ['']
- else:
- self.body = [error_message]
- self.headers = list(headers)
- self.headers.append(('Content-type', 'text/plain'))
-
-
-class AuthProtocol(object):
- """Auth Middleware that handles authenticating client calls."""
-
- def __init__(self, app, conf):
- self.LOG = logging.getLogger(conf.get('log_name', __name__))
- self.LOG.info('Starting keystone auth_token middleware')
- self.LOG.warning(
- 'This middleware module is deprecated as of v0.10.0 in favor of '
- 'keystonemiddleware.auth_token - please update your WSGI pipeline '
- 'to reference the new middleware package.')
- # NOTE(wanghong): If options are set in paste file, all the option
- # values passed into conf are string type. So, we should convert the
- # conf value into correct type.
- self.conf = _conf_values_type_convert(conf)
- self.app = app
-
- # delay_auth_decision means we still allow unauthenticated requests
- # through and we let the downstream service make the final decision
- self.delay_auth_decision = (self._conf_get('delay_auth_decision') in
- (True, 'true', 't', '1', 'on', 'yes', 'y'))
-
- # where to find the auth service (we use this to validate tokens)
- self.identity_uri = self._conf_get('identity_uri')
- self.auth_uri = self._conf_get('auth_uri')
-
- # NOTE(jamielennox): it does appear here that our defaults arguments
- # are backwards. We need to do it this way so that we can handle the
- # same deprecation strategy for CONF and the conf variable.
- if not self.identity_uri:
- self.LOG.warning('Configuring admin URI using auth fragments. '
- 'This is deprecated, use \'identity_uri\''
- ' instead.')
-
- auth_host = self._conf_get('auth_host')
- auth_port = int(self._conf_get('auth_port'))
- auth_protocol = self._conf_get('auth_protocol')
- auth_admin_prefix = self._conf_get('auth_admin_prefix')
-
- if netaddr.valid_ipv6(auth_host):
- # Note(dzyu) it is an IPv6 address, so it needs to be wrapped
- # with '[]' to generate a valid IPv6 URL, based on
- # http://www.ietf.org/rfc/rfc2732.txt
- auth_host = '[%s]' % auth_host
-
- self.identity_uri = '%s://%s:%s' % (auth_protocol, auth_host,
- auth_port)
- if auth_admin_prefix:
- self.identity_uri = '%s/%s' % (self.identity_uri,
- auth_admin_prefix.strip('/'))
- else:
- self.identity_uri = self.identity_uri.rstrip('/')
-
- if self.auth_uri is None:
- self.LOG.warning(
- 'Configuring auth_uri to point to the public identity '
- 'endpoint is required; clients may not be able to '
- 'authenticate against an admin endpoint')
-
- # FIXME(dolph): drop support for this fallback behavior as
- # documented in bug 1207517.
- # NOTE(jamielennox): we urljoin '/' to get just the base URI as
- # this is the original behaviour.
- self.auth_uri = urllib.parse.urljoin(self.identity_uri, '/')
- self.auth_uri = self.auth_uri.rstrip('/')
-
- # SSL
- self.cert_file = self._conf_get('certfile')
- self.key_file = self._conf_get('keyfile')
- self.ssl_ca_file = self._conf_get('cafile')
- self.ssl_insecure = self._conf_get('insecure')
-
- # signing
- self.signing_dirname = self._conf_get('signing_dir')
- if self.signing_dirname is None:
- self.signing_dirname = tempfile.mkdtemp(prefix='keystone-signing-')
- self.LOG.info('Using %s as cache directory for signing certificate',
- self.signing_dirname)
- self.verify_signing_dir()
-
- val = '%s/signing_cert.pem' % self.signing_dirname
- self.signing_cert_file_name = val
- val = '%s/cacert.pem' % self.signing_dirname
- self.signing_ca_file_name = val
- val = '%s/revoked.pem' % self.signing_dirname
- self.revoked_file_name = val
-
- # Credentials used to verify this component with the Auth service since
- # validating tokens is a privileged call
- self.admin_token = self._conf_get('admin_token')
- if self.admin_token:
- self.LOG.warning(
- "The admin_token option in the auth_token middleware is "
- "deprecated and should not be used. The admin_user and "
- "admin_password options should be used instead. The "
- "admin_token option may be removed in a future release.")
- self.admin_token_expiry = None
- self.admin_user = self._conf_get('admin_user')
- self.admin_password = self._conf_get('admin_password')
- self.admin_tenant_name = self._conf_get('admin_tenant_name')
-
- memcache_security_strategy = (
- self._conf_get('memcache_security_strategy'))
-
- self._token_cache = TokenCache(
- self.LOG,
- cache_time=int(self._conf_get('token_cache_time')),
- hash_algorithms=self._conf_get('hash_algorithms'),
- env_cache_name=self._conf_get('cache'),
- memcached_servers=self._conf_get('memcached_servers'),
- memcache_security_strategy=memcache_security_strategy,
- memcache_secret_key=self._conf_get('memcache_secret_key'))
-
- self._token_revocation_list = None
- self._token_revocation_list_fetched_time = None
- self.token_revocation_list_cache_timeout = datetime.timedelta(
- seconds=self._conf_get('revocation_cache_time'))
- http_connect_timeout_cfg = self._conf_get('http_connect_timeout')
- self.http_connect_timeout = (http_connect_timeout_cfg and
- int(http_connect_timeout_cfg))
- self.auth_version = None
- self.http_request_max_retries = (
- self._conf_get('http_request_max_retries'))
-
- self.include_service_catalog = self._conf_get(
- 'include_service_catalog')
-
- self.check_revocations_for_cached = self._conf_get(
- 'check_revocations_for_cached')
-
- def _conf_get(self, name):
- # try config from paste-deploy first
- if name in self.conf:
- return self.conf[name]
- else:
- return CONF.keystone_authtoken[name]
-
- def _choose_api_version(self):
- """Determine the api version that we should use."""
-
- # If the configuration specifies an auth_version we will just
- # assume that is correct and use it. We could, of course, check
- # that this version is supported by the server, but in case
- # there are some problems in the field, we want as little code
- # as possible in the way of letting auth_token talk to the
- # server.
- if self._conf_get('auth_version'):
- version_to_use = self._conf_get('auth_version')
- self.LOG.info('Auth Token proceeding with requested %s apis',
- version_to_use)
- else:
- version_to_use = None
- versions_supported_by_server = self._get_supported_versions()
- if versions_supported_by_server:
- for version in LIST_OF_VERSIONS_TO_ATTEMPT:
- if version in versions_supported_by_server:
- version_to_use = version
- break
- if version_to_use:
- self.LOG.info('Auth Token confirmed use of %s apis',
- version_to_use)
- else:
- self.LOG.error(
- 'Attempted versions [%s] not in list supported by '
- 'server [%s]',
- ', '.join(LIST_OF_VERSIONS_TO_ATTEMPT),
- ', '.join(versions_supported_by_server))
- raise ServiceError('No compatible apis supported by server')
- return version_to_use
-
- def _get_supported_versions(self):
- versions = []
- response, data = self._json_request('GET', '/')
- if response.status_code == 501:
- self.LOG.warning('Old keystone installation found...assuming v2.0')
- versions.append('v2.0')
- elif response.status_code != 300:
- self.LOG.error('Unable to get version info from keystone: %s',
- response.status_code)
- raise ServiceError('Unable to get version info from keystone')
- else:
- try:
- for version in data['versions']['values']:
- versions.append(version['id'])
- except KeyError:
- self.LOG.error(
- 'Invalid version response format from server')
- raise ServiceError('Unable to parse version response '
- 'from keystone')
-
- self.LOG.debug('Server reports support for api versions: %s',
- ', '.join(versions))
- return versions
-
- def __call__(self, env, start_response):
- """Handle incoming request.
-
- Authenticate send downstream on success. Reject request if
- we can't authenticate.
-
- """
- self.LOG.debug('Authenticating user token')
-
- self._token_cache.initialize(env)
-
- try:
- self._remove_auth_headers(env)
- user_token = self._get_user_token_from_header(env)
- token_info = self._validate_user_token(user_token, env)
- env['keystone.token_info'] = token_info
- user_headers = self._build_user_headers(token_info)
- self._add_headers(env, user_headers)
- return self.app(env, start_response)
-
- except InvalidUserToken:
- if self.delay_auth_decision:
- self.LOG.info(
- 'Invalid user token - deferring reject downstream')
- self._add_headers(env, {'X-Identity-Status': 'Invalid'})
- return self.app(env, start_response)
- else:
- self.LOG.info('Invalid user token - rejecting request')
- return self._reject_request(env, start_response)
-
- except ServiceError as e:
- self.LOG.critical('Unable to obtain admin token: %s', e)
- resp = MiniResp('Service unavailable', env)
- start_response('503 Service Unavailable', resp.headers)
- return resp.body
-
- def _remove_auth_headers(self, env):
- """Remove headers so a user can't fake authentication.
-
- :param env: wsgi request environment
-
- """
- auth_headers = (
- 'X-Identity-Status',
- 'X-Domain-Id',
- 'X-Domain-Name',
- 'X-Project-Id',
- 'X-Project-Name',
- 'X-Project-Domain-Id',
- 'X-Project-Domain-Name',
- 'X-User-Id',
- 'X-User-Name',
- 'X-User-Domain-Id',
- 'X-User-Domain-Name',
- 'X-Roles',
- 'X-Service-Catalog',
- # Deprecated
- 'X-User',
- 'X-Tenant-Id',
- 'X-Tenant-Name',
- 'X-Tenant',
- 'X-Role',
- )
- self.LOG.debug('Removing headers from request environment: %s',
- ','.join(auth_headers))
- self._remove_headers(env, auth_headers)
-
- def _get_user_token_from_header(self, env):
- """Get token id from request.
-
- :param env: wsgi request environment
- :return token id
- :raises InvalidUserToken if no token is provided in request
-
- """
- token = self._get_header(env, 'X-Auth-Token',
- self._get_header(env, 'X-Storage-Token'))
- if token:
- return token
- else:
- if not self.delay_auth_decision:
- self.LOG.warning('Unable to find authentication token'
- ' in headers')
- self.LOG.debug('Headers: %s', env)
- raise InvalidUserToken('Unable to find token in headers')
-
- def _reject_request(self, env, start_response):
- """Redirect client to auth server.
-
- :param env: wsgi request environment
- :param start_response: wsgi response callback
- :returns HTTPUnauthorized http response
-
- """
- headers = [('WWW-Authenticate', 'Keystone uri=\'%s\'' % self.auth_uri)]
- resp = MiniResp('Authentication required', env, headers)
- start_response('401 Unauthorized', resp.headers)
- return resp.body
-
- def get_admin_token(self):
- """Return admin token, possibly fetching a new one.
-
- if self.admin_token_expiry is set from fetching an admin token, check
- it for expiration, and request a new token is the existing token
- is about to expire.
-
- :return admin token id
- :raise ServiceError when unable to retrieve token from keystone
-
- """
- if self.admin_token_expiry:
- if will_expire_soon(self.admin_token_expiry):
- self.admin_token = None
-
- if not self.admin_token:
- (self.admin_token,
- self.admin_token_expiry) = self._request_admin_token()
-
- return self.admin_token
-
- def _http_request(self, method, path, **kwargs):
- """HTTP request helper used to make unspecified content type requests.
-
- :param method: http method
- :param path: relative request url
- :return (http response object, response body)
- :raise ServerError when unable to communicate with keystone
-
- """
- url = '%s/%s' % (self.identity_uri, path.lstrip('/'))
-
- kwargs.setdefault('timeout', self.http_connect_timeout)
- if self.cert_file and self.key_file:
- kwargs['cert'] = (self.cert_file, self.key_file)
- elif self.cert_file or self.key_file:
- self.LOG.warning('Cannot use only a cert or key file. '
- 'Please provide both. Ignoring.')
-
- kwargs['verify'] = self.ssl_ca_file or True
- if self.ssl_insecure:
- kwargs['verify'] = False
-
- RETRIES = self.http_request_max_retries
- retry = 0
- while True:
- try:
- response = requests.request(method, url, **kwargs)
- break
- except Exception as e:
- if retry >= RETRIES:
- self.LOG.error('HTTP connection exception: %s', e)
- raise NetworkError('Unable to communicate with keystone')
- # NOTE(vish): sleep 0.5, 1, 2
- self.LOG.warning('Retrying on HTTP connection exception: %s',
- e)
- time.sleep(2.0 ** retry / 2)
- retry += 1
-
- return response
-
- def _json_request(self, method, path, body=None, additional_headers=None):
- """HTTP request helper used to make json requests.
-
- :param method: http method
- :param path: relative request url
- :param body: dict to encode to json as request body. Optional.
- :param additional_headers: dict of additional headers to send with
- http request. Optional.
- :return (http response object, response body parsed as json)
- :raise ServerError when unable to communicate with keystone
-
- """
- kwargs = {
- 'headers': {
- 'Content-type': 'application/json',
- 'Accept': 'application/json',
- },
- }
-
- if additional_headers:
- kwargs['headers'].update(additional_headers)
-
- if body:
- kwargs['data'] = jsonutils.dumps(body)
-
- response = self._http_request(method, path, **kwargs)
-
- try:
- data = jsonutils.loads(response.text)
- except ValueError:
- self.LOG.debug('Keystone did not return json-encoded body')
- data = {}
-
- return response, data
-
- def _request_admin_token(self):
- """Retrieve new token as admin user from keystone.
-
- :return token id upon success
- :raises ServerError when unable to communicate with keystone
-
- Irrespective of the auth version we are going to use for the
- user token, for simplicity we always use a v2 admin token to
- validate the user token.
-
- """
- params = {
- 'auth': {
- 'passwordCredentials': {
- 'username': self.admin_user,
- 'password': self.admin_password,
- },
- 'tenantName': self.admin_tenant_name,
- }
- }
-
- response, data = self._json_request('POST',
- '/v2.0/tokens',
- body=params)
-
- try:
- token = data['access']['token']['id']
- expiry = data['access']['token']['expires']
- if not (token and expiry):
- raise AssertionError('invalid token or expire')
- datetime_expiry = timeutils.parse_isotime(expiry)
- return (token, timeutils.normalize_time(datetime_expiry))
- except (AssertionError, KeyError):
- self.LOG.warning(
- 'Unexpected response from keystone service: %s', data)
- raise ServiceError('invalid json response')
- except (ValueError):
- data['access']['token']['id'] = '<SANITIZED>'
- self.LOG.warning(
- 'Unable to parse expiration time from token: %s', data)
- raise ServiceError('invalid json response')
-
- def _validate_user_token(self, user_token, env, retry=True):
- """Authenticate user token
-
- :param user_token: user's token id
- :param retry: Ignored, as it is not longer relevant
- :return uncrypted body of the token if the token is valid
- :raise InvalidUserToken if token is rejected
- :no longer raises ServiceError since it no longer makes RPC
-
- """
- token_id = None
-
- try:
- token_ids, cached = self._token_cache.get(user_token)
- token_id = token_ids[0]
- if cached:
- data = cached
-
- if self.check_revocations_for_cached:
- # A token stored in Memcached might have been revoked
- # regardless of initial mechanism used to validate it,
- # and needs to be checked.
- for tid in token_ids:
- is_revoked = self._is_token_id_in_revoked_list(tid)
- if is_revoked:
- self.LOG.debug(
- 'Token is marked as having been revoked')
- raise InvalidUserToken(
- 'Token authorization failed')
- elif cms.is_pkiz(user_token):
- verified = self.verify_pkiz_token(user_token, token_ids)
- data = jsonutils.loads(verified)
- elif cms.is_asn1_token(user_token):
- verified = self.verify_signed_token(user_token, token_ids)
- data = jsonutils.loads(verified)
- else:
- data = self.verify_uuid_token(user_token, retry)
- expires = confirm_token_not_expired(data)
- self._confirm_token_bind(data, env)
- self._token_cache.store(token_id, data, expires)
- return data
- except NetworkError:
- self.LOG.debug('Token validation failure.', exc_info=True)
- self.LOG.warning('Authorization failed for token')
- raise InvalidUserToken('Token authorization failed')
- except Exception:
- self.LOG.debug('Token validation failure.', exc_info=True)
- if token_id:
- self._token_cache.store_invalid(token_id)
- self.LOG.warning('Authorization failed for token')
- raise InvalidUserToken('Token authorization failed')
-
- def _build_user_headers(self, token_info):
- """Convert token object into headers.
-
- Build headers that represent authenticated user - see main
- doc info at start of file for details of headers to be defined.
-
- :param token_info: token object returned by keystone on authentication
- :raise InvalidUserToken when unable to parse token object
-
- """
- auth_ref = access.AccessInfo.factory(body=token_info)
- roles = ','.join(auth_ref.role_names)
-
- if _token_is_v2(token_info) and not auth_ref.project_id:
- raise InvalidUserToken('Unable to determine tenancy.')
-
- rval = {
- 'X-Identity-Status': 'Confirmed',
- 'X-Domain-Id': auth_ref.domain_id,
- 'X-Domain-Name': auth_ref.domain_name,
- 'X-Project-Id': auth_ref.project_id,
- 'X-Project-Name': auth_ref.project_name,
- 'X-Project-Domain-Id': auth_ref.project_domain_id,
- 'X-Project-Domain-Name': auth_ref.project_domain_name,
- 'X-User-Id': auth_ref.user_id,
- 'X-User-Name': auth_ref.username,
- 'X-User-Domain-Id': auth_ref.user_domain_id,
- 'X-User-Domain-Name': auth_ref.user_domain_name,
- 'X-Roles': roles,
- # Deprecated
- 'X-User': auth_ref.username,
- 'X-Tenant-Id': auth_ref.project_id,
- 'X-Tenant-Name': auth_ref.project_name,
- 'X-Tenant': auth_ref.project_name,
- 'X-Role': roles,
- }
-
- self.LOG.debug('Received request from user: %s with project_id : %s'
- ' and roles: %s ',
- auth_ref.user_id, auth_ref.project_id, roles)
-
- if self.include_service_catalog and auth_ref.has_service_catalog():
- catalog = auth_ref.service_catalog.get_data()
- if _token_is_v3(token_info):
- catalog = _v3_to_v2_catalog(catalog)
- rval['X-Service-Catalog'] = jsonutils.dumps(catalog)
-
- return rval
-
- def _header_to_env_var(self, key):
- """Convert header to wsgi env variable.
-
- :param key: http header name (ex. 'X-Auth-Token')
- :return wsgi env variable name (ex. 'HTTP_X_AUTH_TOKEN')
-
- """
- return 'HTTP_%s' % key.replace('-', '_').upper()
-
- def _add_headers(self, env, headers):
- """Add http headers to environment."""
- for (k, v) in six.iteritems(headers):
- env_key = self._header_to_env_var(k)
- env[env_key] = v
-
- def _remove_headers(self, env, keys):
- """Remove http headers from environment."""
- for k in keys:
- env_key = self._header_to_env_var(k)
- try:
- del env[env_key]
- except KeyError:
- pass
-
- def _get_header(self, env, key, default=None):
- """Get http header from environment."""
- env_key = self._header_to_env_var(key)
- return env.get(env_key, default)
-
- def _invalid_user_token(self, msg=False):
- # NOTE(jamielennox): use False as the default so that None is valid
- if msg is False:
- msg = 'Token authorization failed'
-
- raise InvalidUserToken(msg)
-
- def _confirm_token_bind(self, data, env):
- bind_mode = self._conf_get('enforce_token_bind')
-
- if bind_mode == BIND_MODE.DISABLED:
- return
-
- try:
- if _token_is_v2(data):
- bind = data['access']['token']['bind']
- elif _token_is_v3(data):
- bind = data['token']['bind']
- else:
- self._invalid_user_token()
- except KeyError:
- bind = {}
-
- # permissive and strict modes don't require there to be a bind
- permissive = bind_mode in (BIND_MODE.PERMISSIVE, BIND_MODE.STRICT)
-
- if not bind:
- if permissive:
- # no bind provided and none required
- return
- else:
- self.LOG.info('No bind information present in token.')
- self._invalid_user_token()
-
- # get the named mode if bind_mode is not one of the predefined
- if permissive or bind_mode == BIND_MODE.REQUIRED:
- name = None
- else:
- name = bind_mode
-
- if name and name not in bind:
- self.LOG.info('Named bind mode %s not in bind information', name)
- self._invalid_user_token()
-
- for bind_type, identifier in six.iteritems(bind):
- if bind_type == BIND_MODE.KERBEROS:
- if not env.get('AUTH_TYPE', '').lower() == 'negotiate':
- self.LOG.info('Kerberos credentials required and '
- 'not present.')
- self._invalid_user_token()
-
- if not env.get('REMOTE_USER') == identifier:
- self.LOG.info('Kerberos credentials do not match '
- 'those in bind.')
- self._invalid_user_token()
-
- self.LOG.debug('Kerberos bind authentication successful.')
-
- elif bind_mode == BIND_MODE.PERMISSIVE:
- self.LOG.debug('Ignoring Unknown bind for permissive mode: '
- '%(bind_type)s: %(identifier)s.',
- {'bind_type': bind_type,
- 'identifier': identifier})
-
- else:
- self.LOG.info('Couldn`t verify unknown bind: %(bind_type)s: '
- '%(identifier)s.',
- {'bind_type': bind_type,
- 'identifier': identifier})
- self._invalid_user_token()
-
- def verify_uuid_token(self, user_token, retry=True):
- """Authenticate user token with keystone.
-
- :param user_token: user's token id
- :param retry: flag that forces the middleware to retry
- user authentication when an indeterminate
- response is received. Optional.
- :returns: token object received from keystone on success
- :raise InvalidUserToken: if token is rejected
- :raise ServiceError: if unable to authenticate token
-
- """
- # Determine the highest api version we can use.
- if not self.auth_version:
- self.auth_version = self._choose_api_version()
-
- if self.auth_version == 'v3.0':
- headers = {'X-Auth-Token': self.get_admin_token(),
- 'X-Subject-Token': safe_quote(user_token)}
- path = '/v3/auth/tokens'
- if not self.include_service_catalog:
- # NOTE(gyee): only v3 API support this option
- path = path + '?nocatalog'
- response, data = self._json_request(
- 'GET',
- path,
- additional_headers=headers)
- else:
- headers = {'X-Auth-Token': self.get_admin_token()}
- response, data = self._json_request(
- 'GET',
- '/v2.0/tokens/%s' % safe_quote(user_token),
- additional_headers=headers)
-
- if response.status_code == 200:
- return data
- if response.status_code == 404:
- self.LOG.warning('Authorization failed for token')
- raise InvalidUserToken('Token authorization failed')
- if response.status_code == 401:
- self.LOG.info(
- 'Keystone rejected admin token, resetting')
- self.admin_token = None
- else:
- self.LOG.error('Bad response code while validating token: %s',
- response.status_code)
- if retry:
- self.LOG.info('Retrying validation')
- return self.verify_uuid_token(user_token, False)
- else:
- self.LOG.warning('Invalid user token. Keystone response: %s', data)
-
- raise InvalidUserToken()
-
- def is_signed_token_revoked(self, token_ids):
- """Indicate whether the token appears in the revocation list."""
- for token_id in token_ids:
- if self._is_token_id_in_revoked_list(token_id):
- self.LOG.debug('Token is marked as having been revoked')
- return True
- return False
-
- def _is_token_id_in_revoked_list(self, token_id):
- """Indicate whether the token_id appears in the revocation list."""
- revocation_list = self.token_revocation_list
- revoked_tokens = revocation_list.get('revoked', None)
- if not revoked_tokens:
- return False
-
- revoked_ids = (x['id'] for x in revoked_tokens)
- return token_id in revoked_ids
-
- def cms_verify(self, data, inform=cms.PKI_ASN1_FORM):
- """Verifies the signature of the provided data's IAW CMS syntax.
-
- If either of the certificate files might be missing, fetch them and
- retry.
- """
- def verify():
- try:
- return cms.cms_verify(data, self.signing_cert_file_name,
- self.signing_ca_file_name,
- inform=inform).decode('utf-8')
- except cms.subprocess.CalledProcessError as err:
- self.LOG.warning('Verify error: %s', err)
- raise
-
- try:
- return verify()
- except exceptions.CertificateConfigError:
- # the certs might be missing; unconditionally fetch to avoid racing
- self.fetch_signing_cert()
- self.fetch_ca_cert()
-
- try:
- # retry with certs in place
- return verify()
- except exceptions.CertificateConfigError as err:
- # if this is still occurring, something else is wrong and we
- # need err.output to identify the problem
- self.LOG.error('CMS Verify output: %s', err.output)
- raise
-
- def verify_signed_token(self, signed_text, token_ids):
- """Check that the token is unrevoked and has a valid signature."""
- if self.is_signed_token_revoked(token_ids):
- raise InvalidUserToken('Token has been revoked')
-
- formatted = cms.token_to_cms(signed_text)
- verified = self.cms_verify(formatted)
- return verified
-
- def verify_pkiz_token(self, signed_text, token_ids):
- if self.is_signed_token_revoked(token_ids):
- raise InvalidUserToken('Token has been revoked')
- try:
- uncompressed = cms.pkiz_uncompress(signed_text)
- verified = self.cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
- return verified
- # TypeError If the signed_text is not zlib compressed
- except TypeError:
- raise InvalidUserToken(signed_text)
-
- def verify_signing_dir(self):
- if os.path.exists(self.signing_dirname):
- if not os.access(self.signing_dirname, os.W_OK):
- raise ConfigurationError(
- 'unable to access signing_dir %s' % self.signing_dirname)
- uid = os.getuid()
- if os.stat(self.signing_dirname).st_uid != uid:
- self.LOG.warning(
- 'signing_dir is not owned by %s', uid)
- current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode)
- if current_mode != stat.S_IRWXU:
- self.LOG.warning(
- 'signing_dir mode is %s instead of %s',
- oct(current_mode), oct(stat.S_IRWXU))
- else:
- os.makedirs(self.signing_dirname, stat.S_IRWXU)
-
- @property
- def token_revocation_list_fetched_time(self):
- if not self._token_revocation_list_fetched_time:
- # If the fetched list has been written to disk, use its
- # modification time.
- if os.path.exists(self.revoked_file_name):
- mtime = os.path.getmtime(self.revoked_file_name)
- fetched_time = datetime.datetime.utcfromtimestamp(mtime)
- # Otherwise the list will need to be fetched.
- else:
- fetched_time = datetime.datetime.min
- self._token_revocation_list_fetched_time = fetched_time
- return self._token_revocation_list_fetched_time
-
- @token_revocation_list_fetched_time.setter
- def token_revocation_list_fetched_time(self, value):
- self._token_revocation_list_fetched_time = value
-
- @property
- def token_revocation_list(self):
- timeout = (self.token_revocation_list_fetched_time +
- self.token_revocation_list_cache_timeout)
- list_is_current = timeutils.utcnow() < timeout
-
- if list_is_current:
- # Load the list from disk if required
- if not self._token_revocation_list:
- open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
- with open(self.revoked_file_name, 'r', **open_kwargs) as f:
- self._token_revocation_list = jsonutils.loads(f.read())
- else:
- self.token_revocation_list = self.fetch_revocation_list()
- return self._token_revocation_list
-
- def _atomic_write_to_signing_dir(self, file_name, value):
- # In Python2, encoding is slow so the following check avoids it if it
- # is not absolutely necessary.
- if isinstance(value, six.text_type):
- value = value.encode('utf-8')
-
- def _atomic_write(destination, data):
- with tempfile.NamedTemporaryFile(dir=self.signing_dirname,
- delete=False) as f:
- f.write(data)
- os.rename(f.name, destination)
-
- try:
- _atomic_write(file_name, value)
- except (OSError, IOError):
- self.verify_signing_dir()
- _atomic_write(file_name, value)
-
- @token_revocation_list.setter
- def token_revocation_list(self, value):
- """Save a revocation list to memory and to disk.
-
- :param value: A json-encoded revocation list
-
- """
- self._token_revocation_list = jsonutils.loads(value)
- self.token_revocation_list_fetched_time = timeutils.utcnow()
- self._atomic_write_to_signing_dir(self.revoked_file_name, value)
-
- def fetch_revocation_list(self, retry=True):
- headers = {'X-Auth-Token': self.get_admin_token()}
- response, data = self._json_request('GET', '/v2.0/tokens/revoked',
- additional_headers=headers)
- if response.status_code == 401:
- if retry:
- self.LOG.info(
- 'Keystone rejected admin token, resetting admin token')
- self.admin_token = None
- return self.fetch_revocation_list(retry=False)
- if response.status_code != 200:
- raise ServiceError('Unable to fetch token revocation list.')
- if 'signed' not in data:
- raise ServiceError('Revocation list improperly formatted.')
- return self.cms_verify(data['signed'])
-
- def _fetch_cert_file(self, cert_file_name, cert_type):
- if not self.auth_version:
- self.auth_version = self._choose_api_version()
-
- if self.auth_version == 'v3.0':
- if cert_type == 'signing':
- cert_type = 'certificates'
- path = '/v3/OS-SIMPLE-CERT/' + cert_type
- else:
- path = '/v2.0/certificates/' + cert_type
- response = self._http_request('GET', path)
- if response.status_code != 200:
- raise exceptions.CertificateConfigError(response.text)
- self._atomic_write_to_signing_dir(cert_file_name, response.text)
-
- def fetch_signing_cert(self):
- self._fetch_cert_file(self.signing_cert_file_name, 'signing')
-
- def fetch_ca_cert(self):
- self._fetch_cert_file(self.signing_ca_file_name, 'ca')
-
-
-class CachePool(list):
- """A lazy pool of cache references."""
-
- def __init__(self, cache, memcached_servers):
- self._environment_cache = cache
- self._memcached_servers = memcached_servers
-
- @contextlib.contextmanager
- def reserve(self):
- """Context manager to manage a pooled cache reference."""
- if self._environment_cache is not None:
- # skip pooling and just use the cache from the upstream filter
- yield self._environment_cache
- return # otherwise the context manager will continue!
-
- try:
- c = self.pop()
- except IndexError:
- # the pool is empty, so we need to create a new client
- c = memorycache.get_client(self._memcached_servers)
-
- try:
- yield c
- finally:
- self.append(c)
-
-
-class TokenCache(object):
- """Encapsulates the auth_token token cache functionality.
-
- auth_token caches tokens that it's seen so that when a token is re-used the
- middleware doesn't have to do a more expensive operation (like going to the
- identity server) to validate the token.
-
- initialize() must be called before calling the other methods.
-
- Store a valid token in the cache using store(); mark a token as invalid in
- the cache using store_invalid().
-
- Check if a token is in the cache and retrieve it using get().
-
- """
-
- _INVALID_INDICATOR = 'invalid'
-
- def __init__(self, log, cache_time=None, hash_algorithms=None,
- env_cache_name=None, memcached_servers=None,
- memcache_security_strategy=None, memcache_secret_key=None):
- self.LOG = log
- self._cache_time = cache_time
- self._hash_algorithms = hash_algorithms
- self._env_cache_name = env_cache_name
- self._memcached_servers = memcached_servers
-
- # memcache value treatment, ENCRYPT or MAC
- self._memcache_security_strategy = memcache_security_strategy
- if self._memcache_security_strategy is not None:
- self._memcache_security_strategy = (
- self._memcache_security_strategy.upper())
- self._memcache_secret_key = memcache_secret_key
-
- self._cache_pool = None
- self._initialized = False
-
- self._assert_valid_memcache_protection_config()
-
- def initialize(self, env):
- if self._initialized:
- return
-
- self._cache_pool = CachePool(env.get(self._env_cache_name),
- self._memcached_servers)
- self._initialized = True
-
- def get(self, user_token):
- """Check if the token is cached already.
-
- Returns a tuple. The first element is a list of token IDs, where the
- first one is the preferred hash.
-
- The second element is the token data from the cache if the token was
- cached, otherwise ``None``.
-
- :raises InvalidUserToken: if the token is invalid
-
- """
-
- if cms.is_asn1_token(user_token) or cms.is_pkiz(user_token):
- # user_token is a PKI token that's not hashed.
-
- token_hashes = list(cms.cms_hash_token(user_token, mode=algo)
- for algo in self._hash_algorithms)
-
- for token_hash in token_hashes:
- cached = self._cache_get(token_hash)
- if cached:
- return (token_hashes, cached)
-
- # The token wasn't found using any hash algorithm.
- return (token_hashes, None)
-
- # user_token is either a UUID token or a hashed PKI token.
- token_id = user_token
- cached = self._cache_get(token_id)
- return ([token_id], cached)
-
- def store(self, token_id, data, expires):
- """Put token data into the cache.
-
- Stores the parsed expire date in cache allowing
- quick check of token freshness on retrieval.
-
- """
- self.LOG.debug('Storing token in cache')
- self._cache_store(token_id, (data, expires))
-
- def store_invalid(self, token_id):
- """Store invalid token in cache."""
- self.LOG.debug('Marking token as unauthorized in cache')
- self._cache_store(token_id, self._INVALID_INDICATOR)
-
- def _assert_valid_memcache_protection_config(self):
- if self._memcache_security_strategy:
- if self._memcache_security_strategy not in ('MAC', 'ENCRYPT'):
- raise ConfigurationError('memcache_security_strategy must be '
- 'ENCRYPT or MAC')
- if not self._memcache_secret_key:
- raise ConfigurationError('memcache_secret_key must be defined '
- 'when a memcache_security_strategy '
- 'is defined')
-
- def _cache_get(self, token_id):
- """Return token information from cache.
-
- If token is invalid raise InvalidUserToken
- return token only if fresh (not expired).
- """
-
- if not token_id:
- # Nothing to do
- return
-
- if self._memcache_security_strategy is None:
- key = CACHE_KEY_TEMPLATE % token_id
- with self._cache_pool.reserve() as cache:
- serialized = cache.get(key)
- else:
- secret_key = self._memcache_secret_key
- if isinstance(secret_key, six.string_types):
- secret_key = secret_key.encode('utf-8')
- security_strategy = self._memcache_security_strategy
- if isinstance(security_strategy, six.string_types):
- security_strategy = security_strategy.encode('utf-8')
- keys = memcache_crypt.derive_keys(
- token_id,
- secret_key,
- security_strategy)
- cache_key = CACHE_KEY_TEMPLATE % (
- memcache_crypt.get_cache_key(keys))
- with self._cache_pool.reserve() as cache:
- raw_cached = cache.get(cache_key)
- try:
- # unprotect_data will return None if raw_cached is None
- serialized = memcache_crypt.unprotect_data(keys,
- raw_cached)
- except Exception:
- msg = 'Failed to decrypt/verify cache data'
- self.LOG.exception(msg)
- # this should have the same effect as data not
- # found in cache
- serialized = None
-
- if serialized is None:
- return None
-
- # Note that _INVALID_INDICATOR and (data, expires) are the only
- # valid types of serialized cache entries, so there is not
- # a collision with jsonutils.loads(serialized) == None.
- if not isinstance(serialized, six.string_types):
- serialized = serialized.decode('utf-8')
- cached = jsonutils.loads(serialized)
- if cached == self._INVALID_INDICATOR:
- self.LOG.debug('Cached Token is marked unauthorized')
- raise InvalidUserToken('Token authorization failed')
-
- data, expires = cached
-
- try:
- expires = timeutils.parse_isotime(expires)
- except ValueError:
- # Gracefully handle upgrade of expiration times from *nix
- # timestamps to ISO 8601 formatted dates by ignoring old cached
- # values.
- return
-
- expires = timeutils.normalize_time(expires)
- utcnow = timeutils.utcnow()
- if utcnow < expires:
- self.LOG.debug('Returning cached token')
- return data
- else:
- self.LOG.debug('Cached Token seems expired')
- raise InvalidUserToken('Token authorization failed')
-
- def _cache_store(self, token_id, data):
- """Store value into memcache.
-
- data may be _INVALID_INDICATOR or a tuple like (data, expires)
-
- """
- serialized_data = jsonutils.dumps(data)
- if isinstance(serialized_data, six.text_type):
- serialized_data = serialized_data.encode('utf-8')
- if self._memcache_security_strategy is None:
- cache_key = CACHE_KEY_TEMPLATE % token_id
- data_to_store = serialized_data
- else:
- secret_key = self._memcache_secret_key
- if isinstance(secret_key, six.string_types):
- secret_key = secret_key.encode('utf-8')
- security_strategy = self._memcache_security_strategy
- if isinstance(security_strategy, six.string_types):
- security_strategy = security_strategy.encode('utf-8')
- keys = memcache_crypt.derive_keys(
- token_id, secret_key, security_strategy)
- cache_key = CACHE_KEY_TEMPLATE % memcache_crypt.get_cache_key(keys)
- data_to_store = memcache_crypt.protect_data(keys, serialized_data)
-
- with self._cache_pool.reserve() as cache:
- cache.set(cache_key, data_to_store, time=self._cache_time)
-
-
-def filter_factory(global_conf, **local_conf):
- """Returns a WSGI filter app for use with paste.deploy."""
- conf = global_conf.copy()
- conf.update(local_conf)
-
- def auth_filter(app):
- return AuthProtocol(app, conf)
- return auth_filter
-
-
-def app_factory(global_conf, **local_conf):
- conf = global_conf.copy()
- conf.update(local_conf)
- return AuthProtocol(None, conf)
-
-
-if __name__ == '__main__':
- """Run this module directly to start a protected echo service::
-
- $ python -m keystoneclient.middleware.auth_token
-
- When the ``auth_token`` module authenticates a request, the echo service
- will respond with all the environment variables presented to it by this
- module.
-
- """
- def echo_app(environ, start_response):
- """A WSGI application that echoes the CGI environment to the user."""
- start_response('200 OK', [('Content-Type', 'application/json')])
- environment = dict((k, v) for k, v in six.iteritems(environ)
- if k.startswith('HTTP_X_'))
- yield jsonutils.dumps(environment)
-
- from wsgiref import simple_server
-
- # hardcode any non-default configuration here
- conf = {'auth_protocol': 'http', 'admin_token': 'ADMIN'}
- app = AuthProtocol(echo_app, conf)
- server = simple_server.make_server('', 8000, app)
- print('Serving on port 8000 (Ctrl+C to end)...')
- server.serve_forever()
diff --git a/keystoneclient/middleware/memcache_crypt.py b/keystoneclient/middleware/memcache_crypt.py
deleted file mode 100644
index 40e2051..0000000
--- a/keystoneclient/middleware/memcache_crypt.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# Copyright 2010-2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Utilities for memcache encryption and integrity check.
-
-Data should be serialized before entering these functions. Encryption
-has a dependency on the pycrypto. If pycrypto is not available,
-CryptoUnavailableError will be raised.
-
-This module will not be called unless signing or encryption is enabled
-in the config. It will always validate signatures, and will decrypt
-data if encryption is enabled. It is not valid to mix protection
-modes.
-
-"""
-
-import base64
-import functools
-import hashlib
-import hmac
-import math
-import os
-import sys
-
-import six
-
-# make sure pycrypto is available
-try:
- from Crypto.Cipher import AES
-except ImportError:
- AES = None
-
-HASH_FUNCTION = hashlib.sha384
-DIGEST_LENGTH = HASH_FUNCTION().digest_size
-DIGEST_SPLIT = DIGEST_LENGTH // 3
-DIGEST_LENGTH_B64 = 4 * int(math.ceil(DIGEST_LENGTH / 3.0))
-
-
-class InvalidMacError(Exception):
- """raise when unable to verify MACed data.
-
- This usually indicates that data had been expectedly modified in memcache.
-
- """
- pass
-
-
-class DecryptError(Exception):
- """raise when unable to decrypt encrypted data.
-
- """
- pass
-
-
-class CryptoUnavailableError(Exception):
- """raise when Python Crypto module is not available.
-
- """
- pass
-
-
-def assert_crypto_availability(f):
- """Ensure Crypto module is available."""
-
- @functools.wraps(f)
- def wrapper(*args, **kwds):
- if AES is None:
- raise CryptoUnavailableError()
- return f(*args, **kwds)
- return wrapper
-
-
-if sys.version_info >= (3, 3):
- constant_time_compare = hmac.compare_digest
-else:
- def constant_time_compare(first, second):
- """Returns True if both string inputs are equal, otherwise False.
-
- This function should take a constant amount of time regardless of
- how many characters in the strings match.
-
- """
- if len(first) != len(second):
- return False
- result = 0
- if six.PY3 and isinstance(first, bytes) and isinstance(second, bytes):
- for x, y in zip(first, second):
- result |= x ^ y
- else:
- for x, y in zip(first, second):
- result |= ord(x) ^ ord(y)
- return result == 0
-
-
-def derive_keys(token, secret, strategy):
- """Derives keys for MAC and ENCRYPTION from the user-provided
- secret. The resulting keys should be passed to the protect and
- unprotect functions.
-
- As suggested by NIST Special Publication 800-108, this uses the
- first 128 bits from the sha384 KDF for the obscured cache key
- value, the second 128 bits for the message authentication key and
- the remaining 128 bits for the encryption key.
-
- This approach is faster than computing a separate hmac as the KDF
- for each desired key.
- """
- digest = hmac.new(secret, token + strategy, HASH_FUNCTION).digest()
- return {'CACHE_KEY': digest[:DIGEST_SPLIT],
- 'MAC': digest[DIGEST_SPLIT: 2 * DIGEST_SPLIT],
- 'ENCRYPTION': digest[2 * DIGEST_SPLIT:],
- 'strategy': strategy}
-
-
-def sign_data(key, data):
- """Sign the data using the defined function and the derived key."""
- mac = hmac.new(key, data, HASH_FUNCTION).digest()
- return base64.b64encode(mac)
-
-
-@assert_crypto_availability
-def encrypt_data(key, data):
- """Encrypt the data with the given secret key.
-
- Padding is n bytes of the value n, where 1 <= n <= blocksize.
- """
- iv = os.urandom(16)
- cipher = AES.new(key, AES.MODE_CBC, iv)
- padding = 16 - len(data) % 16
- return iv + cipher.encrypt(data + six.int2byte(padding) * padding)
-
-
-@assert_crypto_availability
-def decrypt_data(key, data):
- """Decrypt the data with the given secret key."""
- iv = data[:16]
- cipher = AES.new(key, AES.MODE_CBC, iv)
- try:
- result = cipher.decrypt(data[16:])
- except Exception:
- raise DecryptError('Encrypted data appears to be corrupted.')
-
- # Strip the last n padding bytes where n is the last value in
- # the plaintext
- return result[:-1 * six.byte2int([result[-1]])]
-
-
-def protect_data(keys, data):
- """Given keys and serialized data, returns an appropriately
- protected string suitable for storage in the cache.
-
- """
- if keys['strategy'] == b'ENCRYPT':
- data = encrypt_data(keys['ENCRYPTION'], data)
-
- encoded_data = base64.b64encode(data)
-
- signature = sign_data(keys['MAC'], encoded_data)
- return signature + encoded_data
-
-
-def unprotect_data(keys, signed_data):
- """Given keys and cached string data, verifies the signature,
- decrypts if necessary, and returns the original serialized data.
-
- """
- # cache backends return None when no data is found. We don't mind
- # that this particular special value is unsigned.
- if signed_data is None:
- return None
-
- # First we calculate the signature
- provided_mac = signed_data[:DIGEST_LENGTH_B64]
- calculated_mac = sign_data(
- keys['MAC'],
- signed_data[DIGEST_LENGTH_B64:])
-
- # Then verify that it matches the provided value
- if not constant_time_compare(provided_mac, calculated_mac):
- raise InvalidMacError('Invalid MAC; data appears to be corrupted.')
-
- data = base64.b64decode(signed_data[DIGEST_LENGTH_B64:])
-
- # then if necessary decrypt the data
- if keys['strategy'] == b'ENCRYPT':
- data = decrypt_data(keys['ENCRYPTION'], data)
-
- return data
-
-
-def get_cache_key(keys):
- """Given keys generated by derive_keys(), returns a base64
- encoded value suitable for use as a cache key in memcached.
-
- """
- return base64.b64encode(keys['CACHE_KEY'])
diff --git a/keystoneclient/middleware/s3_token.py b/keystoneclient/middleware/s3_token.py
deleted file mode 100644
index ea804bb..0000000
--- a/keystoneclient/middleware/s3_token.py
+++ /dev/null
@@ -1,274 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2011,2012 Akira YOSHIYAMA <akirayoshiyama@gmail.com>
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# This source code is based ./auth_token.py and ./ec2_token.py.
-# See them for their copyright.
-
-"""
-S3 TOKEN MIDDLEWARE
-
-.. warning::
-
- This module is DEPRECATED and may be removed in the 2.0.0 release. The
- s3_token middleware has been moved to the `keystonemiddleware repository
- <http://docs.openstack.org/developer/keystonemiddleware/>`_.
-
-This WSGI component:
-
-* Get a request from the swift3 middleware with an S3 Authorization
- access key.
-* Validate s3 token in Keystone.
-* Transform the account name to AUTH_%(tenant_name).
-
-"""
-
-import logging
-
-from oslo_serialization import jsonutils
-from oslo_utils import strutils
-import requests
-import six
-from six.moves import urllib
-import webob
-
-
-PROTOCOL_NAME = 'S3 Token Authentication'
-
-
-# TODO(kun): remove it after oslo merge this.
-def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
- """Validate and split the given HTTP request path.
-
- **Examples**::
-
- ['a'] = split_path('/a')
- ['a', None] = split_path('/a', 1, 2)
- ['a', 'c'] = split_path('/a/c', 1, 2)
- ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
-
- :param path: HTTP Request path to be split
- :param minsegs: Minimum number of segments to be extracted
- :param maxsegs: Maximum number of segments to be extracted
- :param rest_with_last: If True, trailing data will be returned as part
- of last segment. If False, and there is
- trailing data, raises ValueError.
- :returns: list of segments with a length of maxsegs (non-existent
- segments will return as None)
- :raises: ValueError if given an invalid path
- """
- if not maxsegs:
- maxsegs = minsegs
- if minsegs > maxsegs:
- raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
- if rest_with_last:
- segs = path.split('/', maxsegs)
- minsegs += 1
- maxsegs += 1
- count = len(segs)
- if (segs[0] or count < minsegs or count > maxsegs or
- '' in segs[1:minsegs]):
- raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
- else:
- minsegs += 1
- maxsegs += 1
- segs = path.split('/', maxsegs)
- count = len(segs)
- if (segs[0] or count < minsegs or count > maxsegs + 1 or
- '' in segs[1:minsegs] or
- (count == maxsegs + 1 and segs[maxsegs])):
- raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
- segs = segs[1:maxsegs]
- segs.extend([None] * (maxsegs - 1 - len(segs)))
- return segs
-
-
-class ServiceError(Exception):
- pass
-
-
-class S3Token(object):
- """Auth Middleware that handles S3 authenticating client calls."""
-
- def __init__(self, app, conf):
- """Common initialization code."""
- self.app = app
- self.logger = logging.getLogger(conf.get('log_name', __name__))
- self.logger.debug('Starting the %s component', PROTOCOL_NAME)
- self.logger.warning(
- 'This middleware module is deprecated as of v0.11.0 in favor of '
- 'keystonemiddleware.s3_token - please update your WSGI pipeline '
- 'to reference the new middleware package.')
- self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_')
- # where to find the auth service (we use this to validate tokens)
-
- auth_host = conf.get('auth_host')
- auth_port = int(conf.get('auth_port', 35357))
- auth_protocol = conf.get('auth_protocol', 'https')
-
- self.request_uri = '%s://%s:%s' % (auth_protocol, auth_host, auth_port)
-
- # SSL
- insecure = strutils.bool_from_string(conf.get('insecure', False))
- cert_file = conf.get('certfile')
- key_file = conf.get('keyfile')
-
- if insecure:
- self.verify = False
- elif cert_file and key_file:
- self.verify = (cert_file, key_file)
- elif cert_file:
- self.verify = cert_file
- else:
- self.verify = None
-
- def deny_request(self, code):
- error_table = {
- 'AccessDenied': (401, 'Access denied'),
- 'InvalidURI': (400, 'Could not parse the specified URI'),
- }
- resp = webob.Response(content_type='text/xml')
- resp.status = error_table[code][0]
- error_msg = ('<?xml version="1.0" encoding="UTF-8"?>\r\n'
- '<Error>\r\n <Code>%s</Code>\r\n '
- '<Message>%s</Message>\r\n</Error>\r\n' %
- (code, error_table[code][1]))
- if six.PY3:
- error_msg = error_msg.encode()
- resp.body = error_msg
- return resp
-
- def _json_request(self, creds_json):
- headers = {'Content-Type': 'application/json'}
- try:
- response = requests.post('%s/v2.0/s3tokens' % self.request_uri,
- headers=headers, data=creds_json,
- verify=self.verify)
- except requests.exceptions.RequestException as e:
- self.logger.info('HTTP connection exception: %s', e)
- resp = self.deny_request('InvalidURI')
- raise ServiceError(resp)
-
- if response.status_code < 200 or response.status_code >= 300:
- self.logger.debug('Keystone reply error: status=%s reason=%s',
- response.status_code, response.reason)
- resp = self.deny_request('AccessDenied')
- raise ServiceError(resp)
-
- return response
-
- def __call__(self, environ, start_response):
- """Handle incoming request. authenticate and send downstream."""
- req = webob.Request(environ)
- self.logger.debug('Calling S3Token middleware.')
-
- try:
- parts = split_path(req.path, 1, 4, True)
- version, account, container, obj = parts
- except ValueError:
- msg = 'Not a path query, skipping.'
- self.logger.debug(msg)
- return self.app(environ, start_response)
-
- # Read request signature and access id.
- if 'Authorization' not in req.headers:
- msg = 'No Authorization header. skipping.'
- self.logger.debug(msg)
- return self.app(environ, start_response)
-
- token = req.headers.get('X-Auth-Token',
- req.headers.get('X-Storage-Token'))
- if not token:
- msg = 'You did not specify an auth or a storage token. skipping.'
- self.logger.debug(msg)
- return self.app(environ, start_response)
-
- auth_header = req.headers['Authorization']
- try:
- access, signature = auth_header.split(' ')[-1].rsplit(':', 1)
- except ValueError:
- msg = 'You have an invalid Authorization header: %s'
- self.logger.debug(msg, auth_header)
- return self.deny_request('InvalidURI')(environ, start_response)
-
- # NOTE(chmou): This is to handle the special case with nova
- # when we have the option s3_affix_tenant. We will force it to
- # connect to another account than the one
- # authenticated. Before people start getting worried about
- # security, I should point that we are connecting with
- # username/token specified by the user but instead of
- # connecting to its own account we will force it to go to an
- # another account. In a normal scenario if that user don't
- # have the reseller right it will just fail but since the
- # reseller account can connect to every account it is allowed
- # by the swift_auth middleware.
- force_tenant = None
- if ':' in access:
- access, force_tenant = access.split(':')
-
- # Authenticate request.
- creds = {'credentials': {'access': access,
- 'token': token,
- 'signature': signature}}
- creds_json = jsonutils.dumps(creds)
- self.logger.debug('Connecting to Keystone sending this JSON: %s',
- creds_json)
- # NOTE(vish): We could save a call to keystone by having
- # keystone return token, tenant, user, and roles
- # from this call.
- #
- # NOTE(chmou): We still have the same problem we would need to
- # change token_auth to detect if we already
- # identified and not doing a second query and just
- # pass it through to swiftauth in this case.
- try:
- resp = self._json_request(creds_json)
- except ServiceError as e:
- resp = e.args[0]
- msg = 'Received error, exiting middleware with error: %s'
- self.logger.debug(msg, resp.status_code)
- return resp(environ, start_response)
-
- self.logger.debug('Keystone Reply: Status: %d, Output: %s',
- resp.status_code, resp.content)
-
- try:
- identity_info = resp.json()
- token_id = str(identity_info['access']['token']['id'])
- tenant = identity_info['access']['token']['tenant']
- except (ValueError, KeyError):
- error = 'Error on keystone reply: %d %s'
- self.logger.debug(error, resp.status_code, resp.content)
- return self.deny_request('InvalidURI')(environ, start_response)
-
- req.headers['X-Auth-Token'] = token_id
- tenant_to_connect = force_tenant or tenant['id']
- self.logger.debug('Connecting with tenant: %s', tenant_to_connect)
- new_tenant_name = '%s%s' % (self.reseller_prefix, tenant_to_connect)
- environ['PATH_INFO'] = environ['PATH_INFO'].replace(account,
- new_tenant_name)
- return self.app(environ, start_response)
-
-
-def filter_factory(global_conf, **local_conf):
- """Returns a WSGI filter app for use with paste.deploy."""
- conf = global_conf.copy()
- conf.update(local_conf)
-
- def auth_filter(app):
- return S3Token(app, conf)
- return auth_filter
diff --git a/keystoneclient/tests/unit/test_auth_token_middleware.py b/keystoneclient/tests/unit/test_auth_token_middleware.py
deleted file mode 100644
index 9cdaf92..0000000
--- a/keystoneclient/tests/unit/test_auth_token_middleware.py
+++ /dev/null
@@ -1,1952 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import calendar
-import datetime
-import json
-import os
-import shutil
-import stat
-import tempfile
-import time
-import uuid
-
-import fixtures
-import iso8601
-import mock
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from requests_mock.contrib import fixture as mock_fixture
-import six
-from six.moves.urllib import parse as urlparse
-import testresources
-import testtools
-from testtools import matchers
-import webob
-
-from keystoneclient import access
-from keystoneclient.common import cms
-from keystoneclient import exceptions
-from keystoneclient import fixture
-from keystoneclient.middleware import auth_token
-from keystoneclient.openstack.common import memorycache
-from keystoneclient.tests.unit import client_fixtures
-from keystoneclient.tests.unit import utils
-from keystoneclient import utils as client_utils
-
-
-EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_name1',
- 'HTTP_X_USER_ID': 'user_id1',
- 'HTTP_X_USER_NAME': 'user_name1',
- 'HTTP_X_ROLES': 'role1,role2',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
-}
-
-
-BASE_HOST = 'https://keystone.example.com:1234'
-BASE_URI = '%s/testadmin' % BASE_HOST
-FAKE_ADMIN_TOKEN_ID = 'admin_token2'
-FAKE_ADMIN_TOKEN = jsonutils.dumps(
- {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID,
- 'expires': '2022-10-03T16:58:01Z'}}})
-
-
-VERSION_LIST_v2 = jsonutils.dumps(fixture.DiscoveryList(href=BASE_URI,
- v3=False))
-VERSION_LIST_v3 = jsonutils.dumps(fixture.DiscoveryList(href=BASE_URI))
-
-ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2'
-MEMCACHED_SERVERS = ['localhost:11211']
-MEMCACHED_AVAILABLE = None
-
-
-def memcached_available():
- """Do a sanity check against memcached.
-
- Returns ``True`` if the following conditions are met (otherwise, returns
- ``False``):
-
- - ``python-memcached`` is installed
- - a usable ``memcached`` instance is available via ``MEMCACHED_SERVERS``
- - the client is able to set and get a key/value pair
-
- """
- global MEMCACHED_AVAILABLE
-
- if MEMCACHED_AVAILABLE is None:
- try:
- import memcache
- c = memcache.Client(MEMCACHED_SERVERS)
- c.set('ping', 'pong', time=1)
- MEMCACHED_AVAILABLE = c.get('ping') == 'pong'
- except ImportError:
- MEMCACHED_AVAILABLE = False
-
- return MEMCACHED_AVAILABLE
-
-
-def cleanup_revoked_file(filename):
- try:
- os.remove(filename)
- except OSError:
- pass
-
-
-class TimezoneFixture(fixtures.Fixture):
- @staticmethod
- def supported():
- # tzset is only supported on Unix.
- return hasattr(time, 'tzset')
-
- def __init__(self, new_tz):
- super(TimezoneFixture, self).__init__()
- self.tz = new_tz
- self.old_tz = os.environ.get('TZ')
-
- def setUp(self):
- super(TimezoneFixture, self).setUp()
- if not self.supported():
- raise NotImplementedError('timezone override is not supported.')
- os.environ['TZ'] = self.tz
- time.tzset()
- self.addCleanup(self.cleanup)
-
- def cleanup(self):
- if self.old_tz is not None:
- os.environ['TZ'] = self.old_tz
- elif 'TZ' in os.environ:
- del os.environ['TZ']
- time.tzset()
-
-
-class TimeFixture(fixtures.Fixture):
-
- def __init__(self, new_time, normalize=True):
- super(TimeFixture, self).__init__()
- if isinstance(new_time, six.string_types):
- new_time = timeutils.parse_isotime(new_time)
- if normalize:
- new_time = timeutils.normalize_time(new_time)
- self.new_time = new_time
-
- def setUp(self):
- super(TimeFixture, self).setUp()
- timeutils.set_time_override(self.new_time)
- self.addCleanup(timeutils.clear_time_override)
-
-
-class FakeApp(object):
- """This represents a WSGI app protected by the auth_token middleware."""
-
- SUCCESS = b'SUCCESS'
-
- def __init__(self, expected_env=None):
- self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
-
- if expected_env:
- self.expected_env.update(expected_env)
-
- def __call__(self, env, start_response):
- for k, v in self.expected_env.items():
- assert env[k] == v, '%s != %s' % (env[k], v)
-
- resp = webob.Response()
- resp.body = FakeApp.SUCCESS
- return resp(env, start_response)
-
-
-class v3FakeApp(FakeApp):
- """This represents a v3 WSGI app protected by the auth_token middleware."""
-
- def __init__(self, expected_env=None):
-
- # with v3 additions, these are for the DEFAULT TOKEN
- v3_default_env_additions = {
- 'HTTP_X_PROJECT_ID': 'tenant_id1',
- 'HTTP_X_PROJECT_NAME': 'tenant_name1',
- 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
- 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
- }
-
- if expected_env:
- v3_default_env_additions.update(expected_env)
-
- super(v3FakeApp, self).__init__(v3_default_env_additions)
-
-
-class BaseAuthTokenMiddlewareTest(testtools.TestCase):
- """Base test class for auth_token middleware.
-
- All the tests allow for running with auth_token
- configured for receiving v2 or v3 tokens, with the
- choice being made by passing configuration data into
- setUp().
-
- The base class will, by default, run all the tests
- expecting v2 token formats. Child classes can override
- this to specify, for instance, v3 format.
-
- """
- def setUp(self, expected_env=None, auth_version=None, fake_app=None):
- super(BaseAuthTokenMiddlewareTest, self).setUp()
-
- self.useFixture(client_fixtures.Deprecations())
-
- self.expected_env = expected_env or dict()
- self.fake_app = fake_app or FakeApp
- self.middleware = None
-
- self.conf = {
- 'identity_uri': 'https://keystone.example.com:1234/testadmin/',
- 'signing_dir': client_fixtures.CERTDIR,
- 'auth_version': auth_version,
- 'auth_uri': 'https://keystone.example.com:1234',
- }
-
- self.auth_version = auth_version
- self.response_status = None
- self.response_headers = None
-
- self.requests_mock = self.useFixture(mock_fixture.Fixture())
-
- def set_middleware(self, expected_env=None, conf=None):
- """Configure the class ready to call the auth_token middleware.
-
- Set up the various fake items needed to run the middleware.
- Individual tests that need to further refine these can call this
- function to override the class defaults.
-
- """
- if conf:
- self.conf.update(conf)
-
- if expected_env:
- self.expected_env.update(expected_env)
-
- self.middleware = auth_token.AuthProtocol(
- self.fake_app(self.expected_env), self.conf)
- self.middleware._iso8601 = iso8601
-
- with tempfile.NamedTemporaryFile(dir=self.middleware.signing_dirname,
- delete=False) as f:
- pass
- self.middleware.revoked_file_name = f.name
-
- self.addCleanup(cleanup_revoked_file,
- self.middleware.revoked_file_name)
-
- self.middleware.token_revocation_list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
-
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
-
- def assertLastPath(self, path):
- if path:
- parts = urlparse.urlparse(self.requests_mock.last_request.url)
- self.assertEqual(path, parts.path)
- else:
- self.assertIsNone(self.requests_mock.last_request)
-
-
-class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_fetch_revocation_list_with_expire(self):
- self.set_middleware()
-
- # Get a token, then try to retrieve revocation list and get a 401.
- # Get a new token, try to retrieve revocation list and return 200.
- self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- text = self.examples.SIGNED_REVOCATION_LIST
- self.requests_mock.get("%s/v2.0/tokens/revoked" % BASE_URI,
- response_list=[{'status_code': 401},
- {'text': text}])
-
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
-
- # Check that 4 requests have been made
- self.assertEqual(len(self.requests_mock.request_history), 4)
-
-
-class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- """Auth Token middleware should understand Diablo keystone responses."""
- def setUp(self):
- # pre-diablo only had Tenant ID, which was also the Name
- expected_env = {
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_id1',
- # now deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_id1',
- }
-
- super(DiabloAuthTokenMiddlewareTest, self).setUp(
- expected_env=expected_env)
-
- self.requests_mock.get("%s/" % BASE_URI,
- text=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.token_id = self.examples.VALID_DIABLO_TOKEN
- token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
-
- url = '%s/v2.0/tokens/%s' % (BASE_URI, self.token_id)
- self.requests_mock.get(url, text=token_response)
-
- self.set_middleware()
-
- def test_valid_diablo_response(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_id
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertIn('keystone.token_info', req.environ)
-
-
-class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
- """These tests will not have the memcache module available."""
-
- def setUp(self):
- super(NoMemcacheAuthToken, self).setUp()
- self.useFixture(utils.DisableModuleFixture('memcache'))
-
- def test_nomemcache(self):
- conf = {
- 'admin_token': 'admin_token1',
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'memcached_servers': MEMCACHED_SERVERS,
- 'auth_uri': 'https://keystone.example.com:1234',
- }
-
- auth_token.AuthProtocol(FakeApp(), conf)
-
-
-class CachePoolTest(BaseAuthTokenMiddlewareTest):
- def test_use_cache_from_env(self):
- """If `swift.cache` is set in the environment and `cache` is set in the
- config then the env cache is used.
- """
- env = {'swift.cache': 'CACHE_TEST'}
- conf = {
- 'cache': 'swift.cache'
- }
- self.set_middleware(conf=conf)
- self.middleware._token_cache.initialize(env)
- with self.middleware._token_cache._cache_pool.reserve() as cache:
- self.assertEqual(cache, 'CACHE_TEST')
-
- def test_not_use_cache_from_env(self):
- """If `swift.cache` is set in the environment but `cache` isn't set in
- the config then the env cache isn't used.
- """
- self.set_middleware()
- env = {'swift.cache': 'CACHE_TEST'}
- self.middleware._token_cache.initialize(env)
- with self.middleware._token_cache._cache_pool.reserve() as cache:
- self.assertNotEqual(cache, 'CACHE_TEST')
-
- def test_multiple_context_managers_share_single_client(self):
- self.set_middleware()
- token_cache = self.middleware._token_cache
- env = {}
- token_cache.initialize(env)
-
- caches = []
-
- with token_cache._cache_pool.reserve() as cache:
- caches.append(cache)
-
- with token_cache._cache_pool.reserve() as cache:
- caches.append(cache)
-
- self.assertIs(caches[0], caches[1])
- self.assertEqual(set(caches), set(token_cache._cache_pool))
-
- def test_nested_context_managers_create_multiple_clients(self):
- self.set_middleware()
- env = {}
- self.middleware._token_cache.initialize(env)
- token_cache = self.middleware._token_cache
-
- with token_cache._cache_pool.reserve() as outer_cache:
- with token_cache._cache_pool.reserve() as inner_cache:
- self.assertNotEqual(outer_cache, inner_cache)
-
- self.assertEqual(
- set([inner_cache, outer_cache]),
- set(token_cache._cache_pool))
-
-
-class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """These tests are not affected by the token format
- (see CommonAuthTokenMiddlewareTest).
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_will_expire_soon(self):
- tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
- seconds=10)
- self.assertTrue(auth_token.will_expire_soon(tenseconds))
- fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
- seconds=40)
- self.assertFalse(auth_token.will_expire_soon(fortyseconds))
-
- def test_token_is_v2_accepts_v2(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertTrue(auth_token._token_is_v2(token_response))
-
- def test_token_is_v2_rejects_v3(self):
- token = self.examples.v3_UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertFalse(auth_token._token_is_v2(token_response))
-
- def test_token_is_v3_rejects_v2(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertFalse(auth_token._token_is_v3(token_response))
-
- def test_token_is_v3_accepts_v3(self):
- token = self.examples.v3_UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertTrue(auth_token._token_is_v3(token_response))
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_encrypt_cache_data(self):
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'encrypt',
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = b'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = client_utils.strtime(some_time_later)
- data = ('this_data', expires)
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_sign_cache_data(self):
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = b'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = client_utils.strtime(some_time_later)
- data = ('this_data', expires)
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_no_memcache_protection(self):
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = 'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = client_utils.strtime(some_time_later)
- data = ('this_data', expires)
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
-
- def test_assert_valid_memcache_protection_config(self):
- # test missing memcache_secret_key
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'Encrypt'
- }
- self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
- conf=conf)
- # test invalue memcache_security_strategy
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'whatever'
- }
- self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
- conf=conf)
- # test missing memcache_secret_key
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'mac'
- }
- self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
- conf=conf)
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'Encrypt',
- 'memcache_secret_key': ''
- }
- self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
- conf=conf)
- conf = {
- 'memcached_servers': MEMCACHED_SERVERS,
- 'memcache_security_strategy': 'mAc',
- 'memcache_secret_key': ''
- }
- self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
- conf=conf)
-
- def test_config_revocation_cache_timeout(self):
- conf = {
- 'revocation_cache_time': 24,
- 'auth_uri': 'https://keystone.example.com:1234',
- }
- middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(middleware.token_revocation_list_cache_timeout,
- datetime.timedelta(seconds=24))
-
- def test_conf_values_type_convert(self):
- conf = {
- 'revocation_cache_time': '24',
- 'identity_uri': 'https://keystone.example.com:1234',
- 'include_service_catalog': '0',
- 'nonexsit_option': '0',
- }
-
- middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(datetime.timedelta(seconds=24),
- middleware.token_revocation_list_cache_timeout)
- self.assertEqual(False, middleware.include_service_catalog)
- self.assertEqual('https://keystone.example.com:1234',
- middleware.identity_uri)
- self.assertEqual('0', middleware.conf['nonexsit_option'])
-
- def test_conf_values_type_convert_with_wrong_value(self):
- conf = {
- 'include_service_catalog': '123',
- }
- self.assertRaises(auth_token.ConfigurationError,
- auth_token.AuthProtocol, self.fake_app, conf)
-
-
-class CommonAuthTokenMiddlewareTest(object):
- """These tests are run once using v2 tokens and again using v3 tokens."""
-
- def test_init_does_not_call_http(self):
- conf = {
- 'revocation_cache_time': 1
- }
- self.set_middleware(conf=conf)
- self.assertLastPath(None)
-
- def test_init_by_ipv6Addr_auth_host(self):
- del self.conf['identity_uri']
- conf = {
- 'auth_host': '2001:2013:1:f101::1',
- 'auth_port': 1234,
- 'auth_protocol': 'http',
- 'auth_uri': None,
- }
- self.set_middleware(conf=conf)
- expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
- self.assertEqual(expected_auth_uri, self.middleware.auth_uri)
-
- def assert_valid_request_200(self, token, with_catalog=True):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- if with_catalog:
- self.assertTrue(req.headers.get('X-Service-Catalog'))
- else:
- self.assertNotIn('X-Service-Catalog', req.headers)
- self.assertEqual(body, [FakeApp.SUCCESS])
- self.assertIn('keystone.token_info', req.environ)
- return req
-
- def test_valid_uuid_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- token = self.token_dict['uuid_token_default']
- self.assert_valid_request_200(token)
- self.assert_valid_last_url(token)
-
- def test_valid_uuid_request_with_auth_fragments(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = 1234
- self.conf['auth_admin_prefix'] = '/testadmin'
- self.set_middleware()
- self.assert_valid_request_200(self.token_dict['uuid_token_default'])
- self.assert_valid_last_url(self.token_dict['uuid_token_default'])
-
- def _test_cache_revoked(self, token, revoked_form=None):
- # When the token is cached and revoked, 401 is returned.
- self.middleware.check_revocations_for_cached = True
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
-
- # Token should be cached as ok after this.
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
-
- # Put it in revocation list.
- self.middleware.token_revocation_list = self.get_revocation_list_json(
- token_ids=[revoked_form or token])
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
-
- def test_cached_revoked_uuid(self):
- # When the UUID token is cached and revoked, 401 is returned.
- self._test_cache_revoked(self.token_dict['uuid_token_default'])
-
- def test_valid_signed_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_valid_signed_compressed_request(self):
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_revoked_token_receives_401(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
-
- def test_revoked_token_receives_401_sha256(self):
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.set_middleware()
- self.middleware.token_revocation_list = (
- self.get_revocation_list_json(mode='sha256'))
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
-
- def test_cached_revoked_pki(self):
- # When the PKI token is cached and revoked, 401 is returned.
- token = self.token_dict['signed_token_scoped']
- revoked_form = cms.cms_hash_token(token)
- self._test_cache_revoked(token, revoked_form)
-
- def test_cached_revoked_pkiz(self):
- # When the PKI token is cached and revoked, 401 is returned.
- token = self.token_dict['signed_token_scoped_pkiz']
- revoked_form = cms.cms_hash_token(token)
- self._test_cache_revoked(token, revoked_form)
-
- def test_revoked_token_receives_401_md5_secondary(self):
- # When hash_algorithms has 'md5' as the secondary hash and the
- # revocation list contains the md5 hash for a token, that token is
- # considered revoked so returns 401.
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.set_middleware()
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
-
- def _test_revoked_hashed_token(self, token_key):
- # If hash_algorithms is set as ['sha256', 'md5'],
- # and check_revocations_for_cached is True,
- # and a token is in the cache because it was successfully validated
- # using the md5 hash, then
- # if the token is in the revocation list by md5 hash, it'll be
- # rejected and auth_token returns 401.
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.conf['check_revocations_for_cached'] = True
- self.set_middleware()
-
- token = self.token_dict[token_key]
-
- # Put the token in the revocation list.
- token_hashed = cms.cms_hash_token(token)
- self.middleware.token_revocation_list = self.get_revocation_list_json(
- token_ids=[token_hashed])
-
- # request is using the hashed token, is valid so goes in
- # cache using the given hash.
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token_hashed
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
-
- # This time use the PKI(Z) token
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
-
- # Should find the token in the cache and revocation list.
- self.assertEqual(401, self.response_status)
-
- def test_revoked_hashed_pki_token(self):
- self._test_revoked_hashed_token('signed_token_scoped')
-
- def test_revoked_hashed_pkiz_token(self):
- self._test_revoked_hashed_token('signed_token_scoped_pkiz')
-
- def get_revocation_list_json(self, token_ids=None, mode=None):
- if token_ids is None:
- key = 'revoked_token_hash' + (('_' + mode) if mode else '')
- token_ids = [self.token_dict[key]]
- revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
- for x in token_ids]}
- return jsonutils.dumps(revocation_list)
-
- def test_is_signed_token_revoked_returns_false(self):
- # explicitly setting an empty revocation list here to document intent
- self.middleware.token_revocation_list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
- result = self.middleware.is_signed_token_revoked(
- [self.token_dict['revoked_token_hash']])
- self.assertFalse(result)
-
- def test_is_signed_token_revoked_returns_true(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- result = self.middleware.is_signed_token_revoked(
- [self.token_dict['revoked_token_hash']])
- self.assertTrue(result)
-
- def test_is_signed_token_revoked_returns_true_sha256(self):
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.set_middleware()
- self.middleware.token_revocation_list = (
- self.get_revocation_list_json(mode='sha256'))
- result = self.middleware.is_signed_token_revoked(
- [self.token_dict['revoked_token_hash_sha256']])
- self.assertTrue(result)
-
- def test_verify_signed_token_raises_exception_for_revoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token,
- self.token_dict['revoked_token'],
- [self.token_dict['revoked_token_hash']])
-
- def test_verify_signed_token_raises_exception_for_revoked_token_s256(self):
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.set_middleware()
- self.middleware.token_revocation_list = (
- self.get_revocation_list_json(mode='sha256'))
- self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_signed_token,
- self.token_dict['revoked_token'],
- [self.token_dict['revoked_token_hash_sha256'],
- self.token_dict['revoked_token_hash']])
-
- def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
- self.middleware.token_revocation_list = (
- self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
- self.assertRaises(auth_token.InvalidUserToken,
- self.middleware.verify_pkiz_token,
- self.token_dict['revoked_token_pkiz'],
- [self.token_dict['revoked_token_pkiz_hash']])
-
- def assertIsValidJSON(self, text):
- json.loads(text)
-
- def test_verify_signed_token_succeeds_for_unrevoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- text = self.middleware.verify_signed_token(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- text = self.middleware.verify_pkiz_token(
- self.token_dict['signed_token_scoped_pkiz'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self):
- self.conf['hash_algorithms'] = ['sha256', 'md5']
- self.set_middleware()
- self.middleware.token_revocation_list = (
- self.get_revocation_list_json(mode='sha256'))
- text = self.middleware.verify_signed_token(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash_sha256'],
- self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_verify_signing_dir_create_while_missing(self):
- tmp_name = uuid.uuid4().hex
- test_parent_signing_dir = "/tmp/%s" % tmp_name
- self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
- self.middleware.signing_cert_file_name = (
- "%s/test.pem" % self.middleware.signing_dirname)
- self.middleware.verify_signing_dir()
- # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
- self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
- self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
- self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
- os.getuid())
- self.assertEqual(
- stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
- stat.S_IRWXU)
- shutil.rmtree(test_parent_signing_dir)
-
- def test_get_token_revocation_list_fetched_time_returns_min(self):
- self.middleware.token_revocation_list_fetched_time = None
- self.middleware.revoked_file_name = ''
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
- datetime.datetime.min)
-
- def test_get_token_revocation_list_fetched_time_returns_mtime(self):
- self.middleware.token_revocation_list_fetched_time = None
- mtime = os.path.getmtime(self.middleware.revoked_file_name)
- fetched_time = datetime.datetime.utcfromtimestamp(mtime)
- self.assertEqual(fetched_time,
- self.middleware.token_revocation_list_fetched_time)
-
- @testtools.skipUnless(TimezoneFixture.supported(),
- 'TimezoneFixture not supported')
- def test_get_token_revocation_list_fetched_time_returns_utc(self):
- with TimezoneFixture('UTC-1'):
- self.middleware.token_revocation_list = jsonutils.dumps(
- self.examples.REVOCATION_LIST)
- self.middleware.token_revocation_list_fetched_time = None
- fetched_time = self.middleware.token_revocation_list_fetched_time
- self.assertTrue(timeutils.is_soon(fetched_time, 1))
-
- def test_get_token_revocation_list_fetched_time_returns_value(self):
- expected = self.middleware._token_revocation_list_fetched_time
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
- expected)
-
- def test_get_revocation_list_returns_fetched_list(self):
- # auth_token uses v2 to fetch this, so don't allow the v3
- # tests to override the fake http connection
- self.middleware.token_revocation_list_fetched_time = None
- os.remove(self.middleware.revoked_file_name)
- self.assertEqual(self.middleware.token_revocation_list,
- self.examples.REVOCATION_LIST)
-
- def test_get_revocation_list_returns_current_list_from_memory(self):
- self.assertEqual(self.middleware.token_revocation_list,
- self.middleware._token_revocation_list)
-
- def test_get_revocation_list_returns_current_list_from_disk(self):
- in_memory_list = self.middleware.token_revocation_list
- self.middleware._token_revocation_list = None
- self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
-
- def test_invalid_revocation_list_raises_service_error(self):
- self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI, text='{}')
-
- self.assertRaises(auth_token.ServiceError,
- self.middleware.fetch_revocation_list)
-
- def test_fetch_revocation_list(self):
- # auth_token uses v2 to fetch this, so don't allow the v3
- # tests to override the fake http connection
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
-
- def test_request_invalid_uuid_token(self):
- # remember because we are testing the middleware we stub the connection
- # to the keystone server, but this is not what gets returned
- invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests_mock.get(invalid_uri, text="", status_code=404)
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
-
- def test_request_invalid_signed_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- self.response_headers['WWW-Authenticate'])
-
- def test_request_invalid_signed_pkiz_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_PKIZ_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- self.response_headers['WWW-Authenticate'])
-
- def test_request_no_token(self):
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
-
- def test_request_no_token_log_message(self):
- class FakeLog(object):
- def __init__(self):
- self.msg = None
- self.debugmsg = None
-
- def warning(self, msg=None, *args, **kwargs):
- self.msg = msg
-
- def debug(self, msg=None, *args, **kwargs):
- self.debugmsg = msg
-
- self.middleware.LOG = FakeLog()
- self.middleware.delay_auth_decision = False
- self.assertRaises(auth_token.InvalidUserToken,
- self.middleware._get_user_token_from_header, {})
- self.assertIsNotNone(self.middleware.LOG.msg)
- self.assertIsNotNone(self.middleware.LOG.debugmsg)
-
- def test_request_no_token_http(self):
- req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
- self.set_middleware()
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
- self.assertEqual(body, [''])
-
- def test_request_blank_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ''
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
-
- def _get_cached_token(self, token, mode='md5'):
- token_id = cms.cms_hash_token(token, mode=mode)
- return self.middleware._token_cache._cache_get(token_id)
-
- def test_memcache(self):
- req = webob.Request.blank('/')
- token = self.token_dict['signed_token_scoped']
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertIsNotNone(self._get_cached_token(token))
-
- def test_expired(self):
- req = webob.Request.blank('/')
- token = self.token_dict['signed_token_scoped_expired']
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
-
- def test_memcache_set_invalid_uuid(self):
- invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests_mock.get(invalid_uri, status_code=404)
-
- req = webob.Request.blank('/')
- token = 'invalid-token'
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(auth_token.InvalidUserToken,
- self._get_cached_token, token)
-
- def _test_memcache_set_invalid_signed(self, hash_algorithms=None,
- exp_mode='md5'):
- req = webob.Request.blank('/')
- token = self.token_dict['signed_token_scoped_expired']
- req.headers['X-Auth-Token'] = token
- if hash_algorithms:
- self.conf['hash_algorithms'] = hash_algorithms
- self.set_middleware()
- self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(auth_token.InvalidUserToken,
- self._get_cached_token, token, mode=exp_mode)
-
- def test_memcache_set_invalid_signed(self):
- self._test_memcache_set_invalid_signed()
-
- def test_memcache_set_invalid_signed_sha256_md5(self):
- hash_algorithms = ['sha256', 'md5']
- self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms,
- exp_mode='sha256')
-
- def test_memcache_set_invalid_signed_sha256(self):
- hash_algorithms = ['sha256']
- self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms,
- exp_mode='sha256')
-
- def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
- token_cache_time = 10
- conf = {
- 'token_cache_time': token_cache_time,
- 'signing_dir': client_fixtures.CERTDIR,
- }
- conf.update(extra_conf)
- self.set_middleware(conf=conf)
- req = webob.Request.blank('/')
- token = self.token_dict['signed_token_scoped']
- req.headers['X-Auth-Token'] = token
- req.environ.update(extra_environ)
-
- now = datetime.datetime.utcnow()
- self.useFixture(TimeFixture(now))
-
- self.middleware(req.environ, self.start_fake_response)
- self.assertIsNotNone(self._get_cached_token(token))
-
- timeutils.advance_time_seconds(token_cache_time)
- self.assertIsNone(self._get_cached_token(token))
-
- def test_swift_memcache_set_expired(self):
- extra_conf = {'cache': 'swift.cache'}
- extra_environ = {'swift.cache': memorycache.Client()}
- self.test_memcache_set_expired(extra_conf, extra_environ)
-
- def test_http_error_not_cached_token(self):
- """Test to don't cache token as invalid on network errors.
-
- We use UUID tokens since they are the easiest one to reach
- get_http_connection.
- """
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ERROR_TOKEN
- self.middleware.http_request_max_retries = 0
- self.middleware(req.environ, self.start_fake_response)
- self.assertIsNone(self._get_cached_token(ERROR_TOKEN))
- self.assert_valid_last_url(ERROR_TOKEN)
-
- def test_http_request_max_retries(self):
- times_retry = 10
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ERROR_TOKEN
-
- conf = {'http_request_max_retries': times_retry}
- self.set_middleware(conf=conf)
-
- with mock.patch('time.sleep') as mock_obj:
- self.middleware(req.environ, self.start_fake_response)
-
- self.assertEqual(mock_obj.call_count, times_retry)
-
- def test_nocatalog(self):
- conf = {
- 'include_service_catalog': False
- }
- self.set_middleware(conf=conf)
- self.assert_valid_request_200(self.token_dict['uuid_token_default'],
- with_catalog=False)
-
- def assert_kerberos_bind(self, token, bind_level,
- use_kerberos=True, success=True):
- conf = {
- 'enforce_token_bind': bind_level,
- 'auth_version': self.auth_version,
- }
- self.set_middleware(conf=conf)
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
-
- if use_kerberos:
- if use_kerberos is True:
- req.environ['REMOTE_USER'] = self.examples.KERBEROS_BIND
- else:
- req.environ['REMOTE_USER'] = use_kerberos
-
- req.environ['AUTH_TYPE'] = 'Negotiate'
-
- body = self.middleware(req.environ, self.start_fake_response)
-
- if success:
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, [FakeApp.SUCCESS])
- self.assertIn('keystone.token_info', req.environ)
- self.assert_valid_last_url(token)
- else:
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'"
- )
-
- def test_uuid_bind_token_disabled_with_kerb_user(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='disabled',
- use_kerberos=use_kerberos,
- success=True)
-
- def test_uuid_bind_token_disabled_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_permissive_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='permissive',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_permissive_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='permissive',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_permissive_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='permissive',
- use_kerberos=use_kerberos,
- success=True)
-
- def test_uuid_bind_token_permissive_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_strict_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='strict',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_strict_with_kerbout_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='strict',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_strict_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='strict',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_required_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='required',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_required_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='required',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_required_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='required',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_required_without_bind(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_default'],
- bind_level='required',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_named_kerberos_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='kerberos',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_without_bind(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_default'],
- bind_level='kerberos',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_with_unknown_named_FOO(self):
- token = self.token_dict['uuid_token_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='FOO',
- use_kerberos=use_kerberos,
- success=False)
-
-
-class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def __init__(self, *args, **kwargs):
- super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v2.0'
- self.fake_app = None
- self.ca_path = '/v2.0/certificates/ca'
- self.signing_path = '/v2.0/certificates/signing'
-
- def setUp(self):
- super(V2CertDownloadMiddlewareTest, self).setUp(
- auth_version=self.auth_version,
- fake_app=self.fake_app)
- self.base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.base_dir)
- self.cert_dir = os.path.join(self.base_dir, 'certs')
- os.makedirs(self.cert_dir, stat.S_IRWXU)
- conf = {
- 'signing_dir': self.cert_dir,
- 'auth_version': self.auth_version,
- }
- self.set_middleware(conf=conf)
-
- # Usually we supply a signed_dir with pre-installed certificates,
- # so invocation of /usr/bin/openssl succeeds. This time we give it
- # an empty directory, so it fails.
- def test_request_no_token_dummy(self):
- cms._ensure_subprocess()
-
- self.requests_mock.get("%s%s" % (BASE_URI, self.ca_path),
- status_code=404)
- url = "%s%s" % (BASE_URI, self.signing_path)
- self.requests_mock.get(url, status_code=404)
- self.assertRaises(exceptions.CertificateConfigError,
- self.middleware.verify_signed_token,
- self.examples.SIGNED_TOKEN_SCOPED,
- [self.examples.SIGNED_TOKEN_SCOPED_HASH])
-
- def test_fetch_signing_cert(self):
- data = 'FAKE CERT'
- url = '%s%s' % (BASE_URI, self.signing_path)
- self.requests_mock.get(url, text=data)
- self.middleware.fetch_signing_cert()
-
- with open(self.middleware.signing_cert_file_name, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertLastPath("/testadmin%s" % self.signing_path)
-
- def test_fetch_signing_ca(self):
- data = 'FAKE CA'
- self.requests_mock.get("%s%s" % (BASE_URI, self.ca_path), text=data)
- self.middleware.fetch_ca_cert()
-
- with open(self.middleware.signing_ca_file_name, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertLastPath("/testadmin%s" % self.ca_path)
-
- def test_prefix_trailing_slash(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = 1234
- self.conf['auth_admin_prefix'] = '/newadmin/'
-
- self.requests_mock.get("%s/newadmin%s" % (BASE_HOST, self.ca_path),
- text='FAKECA')
- url = "%s/newadmin%s" % (BASE_HOST, self.signing_path)
- self.requests_mock.get(url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware.fetch_ca_cert()
-
- self.assertLastPath('/newadmin%s' % self.ca_path)
-
- self.middleware.fetch_signing_cert()
-
- self.assertLastPath('/newadmin%s' % self.signing_path)
-
- def test_without_prefix(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = 1234
- self.conf['auth_admin_prefix'] = ''
-
- self.requests_mock.get("%s%s" % (BASE_HOST, self.ca_path),
- text='FAKECA')
- self.requests_mock.get("%s%s" % (BASE_HOST, self.signing_path),
- text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware.fetch_ca_cert()
-
- self.assertLastPath(self.ca_path)
-
- self.middleware.fetch_signing_cert()
-
- self.assertLastPath(self.signing_path)
-
-
-class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
-
- def __init__(self, *args, **kwargs):
- super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v3.0'
- self.fake_app = v3FakeApp
- self.ca_path = '/v3/OS-SIMPLE-CERT/ca'
- self.signing_path = '/v3/OS-SIMPLE-CERT/certificates'
-
-
-def network_error_response(method, uri, headers):
- raise auth_token.NetworkError("Network connection error.")
-
-
-class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- CommonAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """v2 token specific tests.
-
- There are some differences between how the auth-token middleware handles
- v2 and v3 tokens over and above the token formats, namely:
-
- - A v3 keystone server will auto scope a token to a user's default project
- if no scope is specified. A v2 server assumes that the auth-token
- middleware will do that.
- - A v2 keystone server may issue a token without a catalog, even with a
- tenant
-
- The tests below were originally part of the generic AuthTokenMiddlewareTest
- class, but now, since they really are v2 specific, they are included here.
-
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v2AuthTokenMiddlewareTest, self).setUp()
-
- self.token_dict = {
- 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT,
- 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
- 'uuid_token_bind': self.examples.UUID_TOKEN_BIND,
- 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
- 'revoked_token': self.examples.REVOKED_TOKEN,
- 'revoked_token_pkiz': self.examples.REVOKED_TOKEN_PKIZ,
- 'revoked_token_pkiz_hash':
- self.examples.REVOKED_TOKEN_PKIZ_HASH,
- 'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH,
- 'revoked_token_hash_sha256':
- self.examples.REVOKED_TOKEN_HASH_SHA256,
- }
-
- self.requests_mock.get("%s/" % BASE_URI,
- text=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.requests_mock.get("%s/v2.0/tokens/revoked" % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
-
- for token in (self.examples.UUID_TOKEN_DEFAULT,
- self.examples.UUID_TOKEN_UNSCOPED,
- self.examples.UUID_TOKEN_BIND,
- self.examples.UUID_TOKEN_UNKNOWN_BIND,
- self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
- self.examples.SIGNED_TOKEN_SCOPED_KEY,
- self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
- text=text)
-
- self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
- text=network_error_response)
-
- self.set_middleware()
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped v2 requests with a default tenant should "auto-scope."
-
- The implied scope is the user's tenant ID.
-
- """
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, [FakeApp.SUCCESS])
- self.assertIn('keystone.token_info', req.environ)
-
- def assert_valid_last_url(self, token_id):
- self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id)
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.SIGNED_TOKEN_UNSCOPED)
-
- def test_request_prevent_service_catalog_injection(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
- req.headers['X-Auth-Token'] = (
- self.examples.UUID_TOKEN_NO_SERVICE_CATALOG)
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertFalse(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, [FakeApp.SUCCESS])
-
-
-class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_valid_uuid_request_forced_to_2_0(self):
- """Test forcing auth_token to use lower api version.
-
- By installing the v3 http hander, auth_token will be get
- a version list that looks like a v3 server - from which it
- would normally chose v3.0 as the auth version. However, here
- we specify v2.0 in the configuration - which should force
- auth_token to use that version instead.
-
- """
- conf = {
- 'signing_dir': client_fixtures.CERTDIR,
- 'auth_version': 'v2.0'
- }
-
- self.requests_mock.get('%s/' % BASE_URI,
- text=VERSION_LIST_v3,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- token = self.examples.UUID_TOKEN_DEFAULT
- url = '%s/v2.0/tokens/%s' % (BASE_URI, token)
- response_body = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get(url, text=response_body)
-
- self.set_middleware(conf=conf)
-
- # This tests will only work is auth_token has chosen to use the
- # lower, v2, api version
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertLastPath("/testadmin/v2.0/tokens/%s" %
- self.examples.UUID_TOKEN_DEFAULT)
-
-
-class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- CommonAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """Test auth_token middleware with v3 tokens.
-
- Re-execute the AuthTokenMiddlewareTest class tests, but with the
- auth_token middleware configured to expect v3 tokens back from
- a keystone server.
-
- This is done by configuring the AuthTokenMiddlewareTest class via
- its Setup(), passing in v3 style data that will then be used by
- the tests themselves. This approach has been used to ensure we
- really are running the same tests for both v2 and v3 tokens.
-
- There a few additional specific test for v3 only:
-
- - We allow an unscoped token to be validated (as unscoped), where
- as for v2 tokens, the auth_token middleware is expected to try and
- auto-scope it (and fail if there is no default tenant)
- - Domain scoped tokens
-
- Since we don't specify an auth version for auth_token to use, by
- definition we are thefore implicitely testing that it will use
- the highest available auth version, i.e. v3.0
-
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v3AuthTokenMiddlewareTest, self).setUp(
- auth_version='v3.0',
- fake_app=v3FakeApp)
-
- self.token_dict = {
- 'uuid_token_default': self.examples.v3_UUID_TOKEN_DEFAULT,
- 'uuid_token_unscoped': self.examples.v3_UUID_TOKEN_UNSCOPED,
- 'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND,
- 'uuid_token_unknown_bind':
- self.examples.v3_UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz':
- self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
- 'revoked_token': self.examples.REVOKED_v3_TOKEN,
- 'revoked_token_pkiz': self.examples.REVOKED_v3_TOKEN_PKIZ,
- 'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH,
- 'revoked_token_hash_sha256':
- self.examples.REVOKED_v3_TOKEN_HASH_SHA256,
- 'revoked_token_pkiz_hash':
- self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
- }
-
- self.requests_mock.get(BASE_URI, text=VERSION_LIST_v3, status_code=300)
-
- # TODO(jamielennox): auth_token middleware uses a v2 admin token
- # regardless of the auth_version that is set.
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
- self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
-
- self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
- text=self.token_response)
-
- self.set_middleware()
-
- def token_response(self, request, context):
- auth_id = request.headers.get('X-Auth-Token')
- token_id = request.headers.get('X-Subject-Token')
- self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
-
- response = ""
-
- if token_id == ERROR_TOKEN:
- raise auth_token.NetworkError("Network connection error.")
-
- try:
- response = self.examples.JSON_TOKEN_RESPONSES[token_id]
- except KeyError:
- context.status_code = 404
-
- return response
-
- def assert_valid_last_url(self, token_id):
- self.assertLastPath('/testadmin/v3/auth/tokens')
-
- def test_valid_unscoped_uuid_request(self):
- # Remove items that won't be in an unscoped token
- delta_expected_env = {
- 'HTTP_X_PROJECT_ID': None,
- 'HTTP_X_PROJECT_NAME': None,
- 'HTTP_X_PROJECT_DOMAIN_ID': None,
- 'HTTP_X_PROJECT_DOMAIN_NAME': None,
- 'HTTP_X_TENANT_ID': None,
- 'HTTP_X_TENANT_NAME': None,
- 'HTTP_X_ROLES': '',
- 'HTTP_X_TENANT': None,
- 'HTTP_X_ROLE': '',
- }
- self.set_middleware(expected_env=delta_expected_env)
- self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED,
- with_catalog=False)
- self.assertLastPath('/testadmin/v3/auth/tokens')
-
- def test_domain_scoped_uuid_request(self):
- # Modify items compared to default token for a domain scope
- delta_expected_env = {
- 'HTTP_X_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_DOMAIN_NAME': 'domain_name1',
- 'HTTP_X_PROJECT_ID': None,
- 'HTTP_X_PROJECT_NAME': None,
- 'HTTP_X_PROJECT_DOMAIN_ID': None,
- 'HTTP_X_PROJECT_DOMAIN_NAME': None,
- 'HTTP_X_TENANT_ID': None,
- 'HTTP_X_TENANT_NAME': None,
- 'HTTP_X_TENANT': None
- }
- self.set_middleware(expected_env=delta_expected_env)
- self.assert_valid_request_200(
- self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
- self.assertLastPath('/testadmin/v3/auth/tokens')
-
- def test_gives_v2_catalog(self):
- self.set_middleware()
- req = self.assert_valid_request_200(
- self.examples.SIGNED_v3_TOKEN_SCOPED)
-
- catalog = jsonutils.loads(req.headers['X-Service-Catalog'])
-
- for service in catalog:
- for endpoint in service['endpoints']:
- # no point checking everything, just that it's in v2 format
- self.assertIn('adminURL', endpoint)
- self.assertIn('publicURL', endpoint)
- self.assertIn('adminURL', endpoint)
-
-
-class TokenEncodingTest(testtools.TestCase):
- def setUp(self):
- super(TokenEncodingTest, self).setUp()
- self.useFixture(client_fixtures.Deprecations())
-
- def test_unquoted_token(self):
- self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
-
- def test_quoted_token(self):
- self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
-
-
-class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
- def setUp(self):
- super(TokenExpirationTest, self).setUp()
- self.now = timeutils.utcnow()
- self.delta = datetime.timedelta(hours=1)
- self.one_hour_ago = client_utils.isotime(self.now - self.delta,
- subsecond=True)
- self.one_hour_earlier = client_utils.isotime(self.now + self.delta,
- subsecond=True)
-
- def create_v2_token_fixture(self, expires=None):
- v2_fixture = {
- 'access': {
- 'token': {
- 'id': 'blah',
- 'expires': expires or self.one_hour_earlier,
- 'tenant': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- },
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- 'serviceCatalog': {}
- },
- }
-
- return v2_fixture
-
- def create_v3_token_fixture(self, expires=None):
-
- v3_fixture = {
- 'token': {
- 'expires_at': expires or self.one_hour_earlier,
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'domain': {
- 'id': 'domain_id1',
- 'name': 'domain_name1'
- }
- },
- 'project': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- 'domain': {
- 'id': 'domain_id1',
- 'name': 'domain_name1'
- }
- },
- 'roles': [
- {'name': 'role1', 'id': 'Role1'},
- {'name': 'role2', 'id': 'Role2'},
- ],
- 'catalog': {}
- }
- }
-
- return v3_fixture
-
- def test_no_data(self):
- data = {}
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_bad_data(self):
- data = {'my_happy_token_dict': 'woo'}
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_v2_token_not_expired(self):
- data = self.create_v2_token_fixture()
- expected_expires = data['access']['token']['expires']
- actual_expires = auth_token.confirm_token_not_expired(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v2_token_expired(self):
- data = self.create_v2_token_fixture(expires=self.one_hour_ago)
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_v2_token_with_timezone_offset_not_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v2_token_fixture(
- expires='2000-01-01T00:05:10.000123-05:00')
- expected_expires = '2000-01-01T05:05:10.000123Z'
- actual_expires = auth_token.confirm_token_not_expired(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v2_token_with_timezone_offset_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v2_token_fixture(
- expires='2000-01-01T00:05:10.000123+05:00')
- data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_v3_token_not_expired(self):
- data = self.create_v3_token_fixture()
- expected_expires = data['token']['expires_at']
- actual_expires = auth_token.confirm_token_not_expired(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v3_token_expired(self):
- data = self.create_v3_token_fixture(expires=self.one_hour_ago)
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_v3_token_with_timezone_offset_not_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v3_token_fixture(
- expires='2000-01-01T00:05:10.000123-05:00')
- expected_expires = '2000-01-01T05:05:10.000123Z'
-
- actual_expires = auth_token.confirm_token_not_expired(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v3_token_with_timezone_offset_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v3_token_fixture(
- expires='2000-01-01T00:05:10.000123+05:00')
- self.assertRaises(auth_token.InvalidUserToken,
- auth_token.confirm_token_not_expired,
- data)
-
- def test_cached_token_not_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- some_time_later = client_utils.strtime(at=(self.now + self.delta))
- expires = some_time_later
- self.middleware._token_cache.store(token, data, expires)
- self.assertEqual(self.middleware._token_cache._cache_get(token), data)
-
- def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
- """Ensure we cannot retrieve a token from the cache.
-
- Getting a token from the cache should return None when the token data
- in the cache stores the expires time as a \*nix style timestamp.
-
- """
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- some_time_later = self.now + self.delta
- # Store a unix timestamp in the cache.
- expires = calendar.timegm(some_time_later.timetuple())
- token_cache.store(token, data, expires)
- self.assertIsNone(token_cache._cache_get(token))
-
- def test_cached_token_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- some_time_earlier = client_utils.strtime(at=(self.now - self.delta))
- expires = some_time_earlier
- self.middleware._token_cache.store(token, data, expires)
- self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(auth_token.InvalidUserToken))
-
- def test_cached_token_with_timezone_offset_not_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- timezone_offset = datetime.timedelta(hours=2)
- some_time_later = self.now - timezone_offset + self.delta
- expires = client_utils.strtime(some_time_later) + '-02:00'
- self.middleware._token_cache.store(token, data, expires)
- self.assertEqual(self.middleware._token_cache._cache_get(token), data)
-
- def test_cached_token_with_timezone_offset_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- timezone_offset = datetime.timedelta(hours=2)
- some_time_earlier = self.now - timezone_offset - self.delta
- expires = client_utils.strtime(some_time_earlier) + '-02:00'
- self.middleware._token_cache.store(token, data, expires)
- self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(auth_token.InvalidUserToken))
-
-
-class CatalogConversionTests(BaseAuthTokenMiddlewareTest):
-
- PUBLIC_URL = 'http://server:5000/v2.0'
- ADMIN_URL = 'http://admin:35357/v2.0'
- INTERNAL_URL = 'http://internal:5000/v2.0'
-
- REGION_ONE = 'RegionOne'
- REGION_TWO = 'RegionTwo'
- REGION_THREE = 'RegionThree'
-
- def test_basic_convert(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
- s.add_standard_endpoints(public=self.PUBLIC_URL,
- admin=self.ADMIN_URL,
- internal=self.INTERNAL_URL,
- region=self.REGION_ONE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = auth_token._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
- self.assertEqual(1, len(service['endpoints']))
- endpoints = service['endpoints'][0]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(4, len(endpoints))
- self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
- self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
- self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
- self.assertEqual(self.REGION_ONE, endpoints['region'])
-
- def test_multi_region(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
-
- s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
- s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
- s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = auth_token._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
-
- # the 3 regions will come through as 3 separate endpoints
- expected = [{'internalURL': self.INTERNAL_URL,
- 'region': self.REGION_ONE},
- {'publicURL': self.PUBLIC_URL,
- 'region': self.REGION_TWO},
- {'adminURL': self.ADMIN_URL,
- 'region': self.REGION_THREE}]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(3, len(service['endpoints']))
- for e in expected:
- self.assertIn(e, expected)
-
-
-def load_tests(loader, tests, pattern):
- return testresources.OptimisingTestSuite(tests)
diff --git a/keystoneclient/tests/unit/test_https.py b/keystoneclient/tests/unit/test_https.py
index bf93226..607ab9e 100644
--- a/keystoneclient/tests/unit/test_https.py
+++ b/keystoneclient/tests/unit/test_https.py
@@ -19,11 +19,11 @@ from keystoneclient.tests.unit import utils
FAKE_RESPONSE = utils.TestResponse({
"status_code": 200,
- "text": '{"hi": "there"}',
+ "text": b'{"hi": "there"}',
})
REQUEST_URL = 'https://127.0.0.1:5000/hi'
-RESPONSE_BODY = '{"hi": "there"}'
+RESPONSE_BODY = b'{"hi": "there"}'
def get_client():
@@ -42,7 +42,7 @@ def get_authed_client():
class ClientTest(utils.TestCase):
- @mock.patch.object(requests, 'request')
+ @mock.patch.object(requests.Session, 'request')
def test_get(self, MOCK_REQUEST):
MOCK_REQUEST.return_value = FAKE_RESPONSE
cl = get_authed_client()
@@ -53,8 +53,8 @@ class ClientTest(utils.TestCase):
# this may become too tightly couple later
mock_args, mock_kwargs = MOCK_REQUEST.call_args
- self.assertEqual(mock_args[0], 'GET')
- self.assertEqual(mock_args[1], REQUEST_URL)
+ self.assertEqual(mock_kwargs['method'], 'GET')
+ self.assertEqual(mock_kwargs['url'], REQUEST_URL)
self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token')
self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem'))
self.assertEqual(mock_kwargs['verify'], 'ca.pem')
@@ -62,7 +62,7 @@ class ClientTest(utils.TestCase):
# Automatic JSON parsing
self.assertEqual(body, {"hi": "there"})
- @mock.patch.object(requests, 'request')
+ @mock.patch.object(requests.Session, 'request')
def test_post(self, MOCK_REQUEST):
MOCK_REQUEST.return_value = FAKE_RESPONSE
cl = get_authed_client()
@@ -73,14 +73,14 @@ class ClientTest(utils.TestCase):
# this may become too tightly couple later
mock_args, mock_kwargs = MOCK_REQUEST.call_args
- self.assertEqual(mock_args[0], 'POST')
- self.assertEqual(mock_args[1], REQUEST_URL)
+ self.assertEqual(mock_kwargs['method'], 'POST')
+ self.assertEqual(mock_kwargs['url'], REQUEST_URL)
self.assertEqual(mock_kwargs['data'], '[1, 2, 3]')
self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token')
self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem'))
self.assertEqual(mock_kwargs['verify'], 'ca.pem')
- @mock.patch.object(requests, 'request')
+ @mock.patch.object(requests.Session, 'request')
def test_post_auth(self, MOCK_REQUEST):
MOCK_REQUEST.return_value = FAKE_RESPONSE
cl = httpclient.HTTPClient(
@@ -95,8 +95,8 @@ class ClientTest(utils.TestCase):
# this may become too tightly couple later
mock_args, mock_kwargs = MOCK_REQUEST.call_args
- self.assertEqual(mock_args[0], 'POST')
- self.assertEqual(mock_args[1], REQUEST_URL)
+ self.assertEqual(mock_kwargs['method'], 'POST')
+ self.assertEqual(mock_kwargs['url'], REQUEST_URL)
self.assertEqual(mock_kwargs['data'], '[1, 2, 3]')
self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token')
self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem'))
diff --git a/keystoneclient/tests/unit/test_memcache_crypt.py b/keystoneclient/tests/unit/test_memcache_crypt.py
deleted file mode 100644
index 2546121..0000000
--- a/keystoneclient/tests/unit/test_memcache_crypt.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-import testtools
-
-from keystoneclient.middleware import memcache_crypt
-from keystoneclient.tests.unit import client_fixtures
-
-
-class MemcacheCryptPositiveTests(testtools.TestCase):
- def setUp(self):
- super(MemcacheCryptPositiveTests, self).setUp()
- self.useFixture(client_fixtures.Deprecations())
-
- def _setup_keys(self, strategy):
- return memcache_crypt.derive_keys(b'token', b'secret', strategy)
-
- def test_constant_time_compare(self):
- # make sure it works as a compare, the "constant time" aspect
- # isn't appropriate to test in unittests
- ctc = memcache_crypt.constant_time_compare
- self.assertTrue(ctc('abcd', 'abcd'))
- self.assertTrue(ctc('', ''))
- self.assertFalse(ctc('abcd', 'efgh'))
- self.assertFalse(ctc('abc', 'abcd'))
- self.assertFalse(ctc('abc', 'abc\x00'))
- self.assertFalse(ctc('', 'abc'))
-
- # For Python 3, we want to test these functions with both str and bytes
- # as input.
- if six.PY3:
- self.assertTrue(ctc(b'abcd', b'abcd'))
- self.assertTrue(ctc(b'', b''))
- self.assertFalse(ctc(b'abcd', b'efgh'))
- self.assertFalse(ctc(b'abc', b'abcd'))
- self.assertFalse(ctc(b'abc', b'abc\x00'))
- self.assertFalse(ctc(b'', b'abc'))
-
- def test_derive_keys(self):
- keys = self._setup_keys(b'strategy')
- self.assertEqual(len(keys['ENCRYPTION']),
- len(keys['CACHE_KEY']))
- self.assertEqual(len(keys['CACHE_KEY']),
- len(keys['MAC']))
- self.assertNotEqual(keys['ENCRYPTION'],
- keys['MAC'])
- self.assertIn('strategy', keys.keys())
-
- def test_key_strategy_diff(self):
- k1 = self._setup_keys(b'MAC')
- k2 = self._setup_keys(b'ENCRYPT')
- self.assertNotEqual(k1, k2)
-
- def test_sign_data(self):
- keys = self._setup_keys(b'MAC')
- sig = memcache_crypt.sign_data(keys['MAC'], b'data')
- self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
-
- def test_encryption(self):
- keys = self._setup_keys(b'ENCRYPT')
- # what you put in is what you get out
- for data in [b'data', b'1234567890123456', b'\x00\xFF' * 13
- ] + [six.int2byte(x % 256) * x for x in range(768)]:
- crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
- decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
- self.assertEqual(data, decrypt)
- self.assertRaises(memcache_crypt.DecryptError,
- memcache_crypt.decrypt_data,
- keys['ENCRYPTION'], crypt[:-1])
-
- def test_protect_wrappers(self):
- data = b'My Pretty Little Data'
- for strategy in [b'MAC', b'ENCRYPT']:
- keys = self._setup_keys(strategy)
- protected = memcache_crypt.protect_data(keys, data)
- self.assertNotEqual(protected, data)
- if strategy == b'ENCRYPT':
- self.assertNotIn(data, protected)
- unprotected = memcache_crypt.unprotect_data(keys, protected)
- self.assertEqual(data, unprotected)
- self.assertRaises(memcache_crypt.InvalidMacError,
- memcache_crypt.unprotect_data,
- keys, protected[:-1])
- self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
-
- def test_no_pycrypt(self):
- aes = memcache_crypt.AES
- memcache_crypt.AES = None
- self.assertRaises(memcache_crypt.CryptoUnavailableError,
- memcache_crypt.encrypt_data, 'token', 'secret',
- 'data')
- memcache_crypt.AES = aes
diff --git a/keystoneclient/tests/unit/test_s3_token_middleware.py b/keystoneclient/tests/unit/test_s3_token_middleware.py
deleted file mode 100644
index 1f8aa1c..0000000
--- a/keystoneclient/tests/unit/test_s3_token_middleware.py
+++ /dev/null
@@ -1,264 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-from oslo_serialization import jsonutils
-import requests
-import six
-import testtools
-import webob
-
-from keystoneclient.middleware import s3_token
-from keystoneclient.tests.unit import client_fixtures
-from keystoneclient.tests.unit import utils
-
-
-GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
- 'tenant': {'id': 'TENANT_ID'}}}}
-
-
-class FakeApp(object):
- """This represents a WSGI app protected by the auth_token middleware."""
- def __call__(self, env, start_response):
- resp = webob.Response()
- resp.environ = env
- return resp(env, start_response)
-
-
-class S3TokenMiddlewareTestBase(utils.TestCase):
-
- TEST_PROTOCOL = 'https'
- TEST_HOST = 'fakehost'
- TEST_PORT = 35357
- TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL,
- TEST_HOST,
- TEST_PORT)
-
- def setUp(self):
- super(S3TokenMiddlewareTestBase, self).setUp()
-
- self.conf = {
- 'auth_host': self.TEST_HOST,
- 'auth_port': self.TEST_PORT,
- 'auth_protocol': self.TEST_PROTOCOL,
- }
-
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
-
-
-class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
-
- def setUp(self):
- super(S3TokenMiddlewareTestGood, self).setUp()
- self.middleware = s3_token.S3Token(FakeApp(), self.conf)
-
- self.requests_mock.post(self.TEST_URL,
- status_code=201,
- json=GOOD_RESPONSE)
-
- # Ignore the request and pass to the next middleware in the
- # pipeline if no path has been specified.
- def test_no_path_request(self):
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- # Ignore the request and pass to the next middleware in the
- # pipeline if no Authorization header has been specified
- def test_without_authorization(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- def test_without_auth_storage_token(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'badboy'
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- def test_authorized(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
-
- def test_authorized_http(self):
- TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST,
- self.TEST_PORT)
-
- self.requests_mock.post(TEST_URL, status_code=201, json=GOOD_RESPONSE)
-
- self.middleware = (
- s3_token.filter_factory({'auth_protocol': 'http',
- 'auth_host': self.TEST_HOST,
- 'auth_port': self.TEST_PORT})(FakeApp()))
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
-
- def test_authorization_nova_toconnect(self):
- req = webob.Request.blank('/v1/AUTH_swiftint/c/o')
- req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- path = req.environ['PATH_INFO']
- self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
-
- @mock.patch.object(requests, 'post')
- def test_insecure(self, MOCK_REQUEST):
- self.middleware = (
- s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
-
- text_return_value = jsonutils.dumps(GOOD_RESPONSE)
- if six.PY3:
- text_return_value = text_return_value.encode()
- MOCK_REQUEST.return_value = utils.TestResponse({
- 'status_code': 201,
- 'text': text_return_value})
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
-
- self.assertTrue(MOCK_REQUEST.called)
- mock_args, mock_kwargs = MOCK_REQUEST.call_args
- self.assertIs(mock_kwargs['verify'], False)
-
- def test_insecure_option(self):
- # insecure is passed as a string.
-
- # Some non-secure values.
- true_values = ['true', 'True', '1', 'yes']
- for val in true_values:
- config = {'insecure': val, 'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertIs(False, middleware.verify)
-
- # Some "secure" values, including unexpected value.
- false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
- for val in false_values:
- config = {'insecure': val, 'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertEqual('false_ind', middleware.verify)
-
- # Default is secure.
- config = {'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertIs('false_ind', middleware.verify)
-
-
-class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
- def setUp(self):
- super(S3TokenMiddlewareTestBad, self).setUp()
- self.middleware = s3_token.S3Token(FakeApp(), self.conf)
-
- def test_unauthorized_token(self):
- ret = {"error":
- {"message": "EC2 access key not found.",
- "code": 401,
- "title": "Unauthorized"}}
- self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- s3_denied_req = self.middleware.deny_request('AccessDenied')
- self.assertEqual(resp.body, s3_denied_req.body)
- self.assertEqual(resp.status_int, s3_denied_req.status_int)
-
- def test_bogus_authorization(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'badboy'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- self.assertEqual(resp.status_int, 400)
- s3_invalid_req = self.middleware.deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
- def test_fail_to_connect_to_keystone(self):
- with mock.patch.object(self.middleware, '_json_request') as o:
- s3_invalid_req = self.middleware.deny_request('InvalidURI')
- o.side_effect = s3_token.ServiceError(s3_invalid_req)
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
- def test_bad_reply(self):
- self.requests_mock.post(self.TEST_URL,
- status_code=201,
- text="<badreply>")
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- s3_invalid_req = self.middleware.deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
-
-class S3TokenMiddlewareTestUtil(testtools.TestCase):
- def setUp(self):
- super(S3TokenMiddlewareTestUtil, self).setUp()
- self.useFixture(client_fixtures.Deprecations())
-
- def test_split_path_failed(self):
- self.assertRaises(ValueError, s3_token.split_path, '')
- self.assertRaises(ValueError, s3_token.split_path, '/')
- self.assertRaises(ValueError, s3_token.split_path, '//')
- self.assertRaises(ValueError, s3_token.split_path, '//a')
- self.assertRaises(ValueError, s3_token.split_path, '/a/c')
- self.assertRaises(ValueError, s3_token.split_path, '//c')
- self.assertRaises(ValueError, s3_token.split_path, '/a/c/')
- self.assertRaises(ValueError, s3_token.split_path, '/a//')
- self.assertRaises(ValueError, s3_token.split_path, '/a', 2)
- self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3)
- self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3, True)
- self.assertRaises(ValueError, s3_token.split_path, '/a/c/o/r', 3, 3)
- self.assertRaises(ValueError, s3_token.split_path, '/a', 5, 4)
-
- def test_split_path_success(self):
- self.assertEqual(s3_token.split_path('/a'), ['a'])
- self.assertEqual(s3_token.split_path('/a/'), ['a'])
- self.assertEqual(s3_token.split_path('/a/c', 2), ['a', 'c'])
- self.assertEqual(s3_token.split_path('/a/c/o', 3), ['a', 'c', 'o'])
- self.assertEqual(s3_token.split_path('/a/c/o/r', 3, 3, True),
- ['a', 'c', 'o/r'])
- self.assertEqual(s3_token.split_path('/a/c', 2, 3, True),
- ['a', 'c', None])
- self.assertEqual(s3_token.split_path('/a/c/', 2), ['a', 'c'])
- self.assertEqual(s3_token.split_path('/a/c/', 2, 3), ['a', 'c', ''])
-
- def test_split_path_invalid_path(self):
- try:
- s3_token.split_path('o\nn e', 2)
- except ValueError as err:
- self.assertEqual(str(err), 'Invalid path: o%0An%20e')
- try:
- s3_token.split_path('o\nn e', 2, 3, True)
- except ValueError as err:
- self.assertEqual(str(err), 'Invalid path: o%0An%20e')
diff --git a/keystoneclient/tests/unit/v3/test_federation.py b/keystoneclient/tests/unit/v3/test_federation.py
index 5782aa6..7b7f19d 100644
--- a/keystoneclient/tests/unit/v3/test_federation.py
+++ b/keystoneclient/tests/unit/v3/test_federation.py
@@ -413,7 +413,7 @@ class FederatedTokenTests(utils.TestCase):
def test_get_user_domain_id(self):
"""Ensure a federated user's domain ID does not exist."""
- self.assertIsNone(self.federated_token.user_domain_id)
+ self.assertEqual('Federated', self.federated_token.user_domain_id)
class ServiceProviderTests(utils.TestCase, utils.CrudTests):
diff --git a/requirements.txt b/requirements.txt
index 39680bb..4672307 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,9 +6,9 @@ pbr<2.0,>=1.3
argparse
Babel>=1.3
+keystoneauth1 >= 0.3.0
iso8601>=0.1.9
debtcollector>=0.3.0 # Apache-2.0
-netaddr>=0.7.12
oslo.config>=1.11.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
diff --git a/test-requirements.txt b/test-requirements.txt
index 9ccaa3a..6ac4dcf 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -13,7 +13,6 @@ mock>=1.2
oauthlib>=0.6
oslosphinx>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
-pycrypto>=2.6
requests-mock>=0.6.0 # Apache-2.0
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
tempest-lib>=0.6.1