summaryrefslogtreecommitdiff
path: root/keystoneclient
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-10-07 17:17:57 +0000
committerGerrit Code Review <review@openstack.org>2013-10-07 17:17:57 +0000
commite9fb6c7c8f5e37a5b94141bcd5b94fcacf41c075 (patch)
tree45e3cb9963b273a3bdbc65ff1e2ce53650775a30 /keystoneclient
parent0341f933caf91a522dffe42a5092c1e4a7925adb (diff)
parenta2e7b17810ed34719dc101f93dc480e2f9fdce6e (diff)
downloadpython-keystoneclient-e9fb6c7c8f5e37a5b94141bcd5b94fcacf41c075.tar.gz
Merge "Refactor for testability of an upcoming change"
Diffstat (limited to 'keystoneclient')
-rw-r--r--keystoneclient/middleware/auth_token.py48
-rw-r--r--keystoneclient/tests/test_auth_token_middleware.py20
2 files changed, 45 insertions, 23 deletions
diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py
index 740dff2..cd89cf1 100644
--- a/keystoneclient/middleware/auth_token.py
+++ b/keystoneclient/middleware/auth_token.py
@@ -309,6 +309,29 @@ def will_expire_soon(expiry):
return expiry < soon
+def _token_is_v2(token_info):
+ return ('access' in token_info)
+
+
+def _token_is_v3(token_info):
+ return ('token' in token_info)
+
+
+def confirm_token_not_expired(data):
+ if not data:
+ raise InvalidUserToken('Token authorization failed')
+ if _token_is_v2(data):
+ timestamp = data['access']['token']['expires']
+ elif _token_is_v3(data):
+ timestamp = data['token']['expires_at']
+ else:
+ raise InvalidUserToken('Token authorization failed')
+ expires = timeutils.parse_isotime(timestamp).strftime('%s')
+ if time.time() >= float(expires):
+ raise InvalidUserToken('Token authorization failed')
+ return expires
+
+
def safe_quote(s):
"""URL-encode strings that are not already URL-encoded."""
return urllib.quote(s) if s == urllib.unquote(s) else s
@@ -783,7 +806,7 @@ class AuthProtocol(object):
data = jsonutils.loads(verified)
else:
data = self.verify_uuid_token(user_token, retry)
- expires = self._confirm_token_not_expired(data)
+ expires = confirm_token_not_expired(data)
self._cache_put(token_id, data, expires)
return data
except NetworkError:
@@ -797,12 +820,6 @@ class AuthProtocol(object):
self.LOG.warn("Authorization failed for token %s", token_id)
raise InvalidUserToken('Token authorization failed')
- def _token_is_v2(self, token_info):
- return ('access' in token_info)
-
- def _token_is_v3(self, token_info):
- return ('token' in token_info)
-
def _build_user_headers(self, token_info):
"""Convert token object into headers.
@@ -846,7 +863,7 @@ class AuthProtocol(object):
project_domain_id = None
project_domain_name = None
- if self._token_is_v2(token_info):
+ if _token_is_v2(token_info):
user = token_info['access']['user']
token = token_info['access']['token']
roles = ','.join([role['name'] for role in user.get('roles', [])])
@@ -1019,21 +1036,6 @@ class AuthProtocol(object):
data_to_store,
timeout=self.token_cache_time)
- def _confirm_token_not_expired(self, data):
- if not data:
- raise InvalidUserToken('Token authorization failed')
- if self._token_is_v2(data):
- timestamp = data['access']['token']['expires']
- elif self._token_is_v3(data):
- timestamp = data['token']['expires_at']
- else:
- raise InvalidUserToken('Token authorization failed')
- expires = timeutils.parse_isotime(timestamp).strftime('%s')
- if time.time() >= float(expires):
- self.LOG.debug('Token expired a %s', timestamp)
- raise InvalidUserToken('Token authorization failed')
- return expires
-
def _cache_put(self, token_id, data, expires):
"""Put token data into the cache.
diff --git a/keystoneclient/tests/test_auth_token_middleware.py b/keystoneclient/tests/test_auth_token_middleware.py
index 4f4c594..25ff514 100644
--- a/keystoneclient/tests/test_auth_token_middleware.py
+++ b/keystoneclient/tests/test_auth_token_middleware.py
@@ -704,6 +704,26 @@ class CommonAuthTokenMiddlewareTest(object):
seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
+ def test_token_is_v2_accepts_v2(self):
+ token = client_fixtures.UUID_TOKEN_DEFAULT
+ token_response = client_fixtures.TOKEN_RESPONSES[token]
+ self.assertTrue(auth_token._token_is_v2(token_response))
+
+ def test_token_is_v2_rejects_v3(self):
+ token = client_fixtures.v3_UUID_TOKEN_DEFAULT
+ token_response = client_fixtures.TOKEN_RESPONSES[token]
+ self.assertFalse(auth_token._token_is_v2(token_response))
+
+ def test_token_is_v3_rejects_v2(self):
+ token = client_fixtures.UUID_TOKEN_DEFAULT
+ token_response = client_fixtures.TOKEN_RESPONSES[token]
+ self.assertFalse(auth_token._token_is_v3(token_response))
+
+ def test_token_is_v3_accepts_v3(self):
+ token = client_fixtures.v3_UUID_TOKEN_DEFAULT
+ token_response = client_fixtures.TOKEN_RESPONSES[token]
+ self.assertTrue(auth_token._token_is_v3(token_response))
+
def test_encrypt_cache_data(self):
httpretty.disable()
conf = {