diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-10-07 17:17:57 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-10-07 17:17:57 +0000 |
commit | e9fb6c7c8f5e37a5b94141bcd5b94fcacf41c075 (patch) | |
tree | 45e3cb9963b273a3bdbc65ff1e2ce53650775a30 | |
parent | 0341f933caf91a522dffe42a5092c1e4a7925adb (diff) | |
parent | a2e7b17810ed34719dc101f93dc480e2f9fdce6e (diff) | |
download | python-keystoneclient-e9fb6c7c8f5e37a5b94141bcd5b94fcacf41c075.tar.gz |
Merge "Refactor for testability of an upcoming change"
-rw-r--r-- | keystoneclient/middleware/auth_token.py | 48 | ||||
-rw-r--r-- | keystoneclient/tests/test_auth_token_middleware.py | 20 |
2 files changed, 45 insertions, 23 deletions
diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py index 740dff2..cd89cf1 100644 --- a/keystoneclient/middleware/auth_token.py +++ b/keystoneclient/middleware/auth_token.py @@ -309,6 +309,29 @@ def will_expire_soon(expiry): return expiry < soon +def _token_is_v2(token_info): + return ('access' in token_info) + + +def _token_is_v3(token_info): + return ('token' in token_info) + + +def confirm_token_not_expired(data): + if not data: + raise InvalidUserToken('Token authorization failed') + if _token_is_v2(data): + timestamp = data['access']['token']['expires'] + elif _token_is_v3(data): + timestamp = data['token']['expires_at'] + else: + raise InvalidUserToken('Token authorization failed') + expires = timeutils.parse_isotime(timestamp).strftime('%s') + if time.time() >= float(expires): + raise InvalidUserToken('Token authorization failed') + return expires + + def safe_quote(s): """URL-encode strings that are not already URL-encoded.""" return urllib.quote(s) if s == urllib.unquote(s) else s @@ -783,7 +806,7 @@ class AuthProtocol(object): data = jsonutils.loads(verified) else: data = self.verify_uuid_token(user_token, retry) - expires = self._confirm_token_not_expired(data) + expires = confirm_token_not_expired(data) self._cache_put(token_id, data, expires) return data except NetworkError: @@ -797,12 +820,6 @@ class AuthProtocol(object): self.LOG.warn("Authorization failed for token %s", token_id) raise InvalidUserToken('Token authorization failed') - def _token_is_v2(self, token_info): - return ('access' in token_info) - - def _token_is_v3(self, token_info): - return ('token' in token_info) - def _build_user_headers(self, token_info): """Convert token object into headers. @@ -846,7 +863,7 @@ class AuthProtocol(object): project_domain_id = None project_domain_name = None - if self._token_is_v2(token_info): + if _token_is_v2(token_info): user = token_info['access']['user'] token = token_info['access']['token'] roles = ','.join([role['name'] for role in user.get('roles', [])]) @@ -1019,21 +1036,6 @@ class AuthProtocol(object): data_to_store, timeout=self.token_cache_time) - def _confirm_token_not_expired(self, data): - if not data: - raise InvalidUserToken('Token authorization failed') - if self._token_is_v2(data): - timestamp = data['access']['token']['expires'] - elif self._token_is_v3(data): - timestamp = data['token']['expires_at'] - else: - raise InvalidUserToken('Token authorization failed') - expires = timeutils.parse_isotime(timestamp).strftime('%s') - if time.time() >= float(expires): - self.LOG.debug('Token expired a %s', timestamp) - raise InvalidUserToken('Token authorization failed') - return expires - def _cache_put(self, token_id, data, expires): """Put token data into the cache. diff --git a/keystoneclient/tests/test_auth_token_middleware.py b/keystoneclient/tests/test_auth_token_middleware.py index 4f4c594..25ff514 100644 --- a/keystoneclient/tests/test_auth_token_middleware.py +++ b/keystoneclient/tests/test_auth_token_middleware.py @@ -704,6 +704,26 @@ class CommonAuthTokenMiddlewareTest(object): seconds=40) self.assertFalse(auth_token.will_expire_soon(fortyseconds)) + def test_token_is_v2_accepts_v2(self): + token = client_fixtures.UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v2(token_response)) + + def test_token_is_v2_rejects_v3(self): + token = client_fixtures.v3_UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v2(token_response)) + + def test_token_is_v3_rejects_v2(self): + token = client_fixtures.UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v3(token_response)) + + def test_token_is_v3_accepts_v3(self): + token = client_fixtures.v3_UUID_TOKEN_DEFAULT + token_response = client_fixtures.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v3(token_response)) + def test_encrypt_cache_data(self): httpretty.disable() conf = { |