diff options
Diffstat (limited to 'tests')
22 files changed, 1191 insertions, 372 deletions
diff --git a/tests/oauth1/rfc5849/test_client.py b/tests/oauth1/rfc5849/test_client.py index dcb4c3d..777efc2 100644 --- a/tests/oauth1/rfc5849/test_client.py +++ b/tests/oauth1/rfc5849/test_client.py @@ -2,7 +2,8 @@ from __future__ import absolute_import, unicode_literals from oauthlib.common import Request -from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_RSA, +from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_HMAC_SHA1, + SIGNATURE_HMAC_SHA256, SIGNATURE_RSA, SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY) from oauthlib.oauth1.rfc5849 import Client, bytes_type @@ -62,13 +63,48 @@ class ClientConstructorTests(TestCase): self.assertIsInstance(k, bytes_type) self.assertIsInstance(v, bytes_type) + def test_hmac_sha1(self): + client = Client('client_key') + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA1], + client.SIGNATURE_METHODS[client.signature_method]) + + def test_hmac_sha256(self): + client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256) + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA256], + client.SIGNATURE_METHODS[client.signature_method]) + def test_rsa(self): client = Client('client_key', signature_method=SIGNATURE_RSA) - self.assertIsNone(client.rsa_key) # don't need an RSA key to instantiate + # instance is using the correct signer method + self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_RSA], + client.SIGNATURE_METHODS[client.signature_method]) + # don't need an RSA key to instantiate + self.assertIsNone(client.rsa_key) class SignatureMethodTest(TestCase): + def test_hmac_sha1_method(self): + client = Client('client_key', timestamp='1234567890', nonce='abc') + u, h, b = client.sign('http://example.com') + correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", ' + 'oauth_version="1.0", oauth_signature_method="HMAC-SHA1", ' + 'oauth_consumer_key="client_key", ' + 'oauth_signature="hH5BWYVqo7QI4EmPBUUe9owRUUQ%3D"') + self.assertEqual(h['Authorization'], correct) + + def test_hmac_sha256_method(self): + client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256, + timestamp='1234567890', nonce='abc') + u, h, b = client.sign('http://example.com') + correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", ' + 'oauth_version="1.0", oauth_signature_method="HMAC-SHA256", ' + 'oauth_consumer_key="client_key", ' + 'oauth_signature="JzgJWBxX664OiMW3WE4MEjtYwOjI%2FpaUWHqtdHe68Es%3D"') + self.assertEqual(h['Authorization'], correct) + def test_rsa_method(self): private_key = ( "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDk1/bxy" diff --git a/tests/oauth2/rfc6749/clients/test_base.py b/tests/oauth2/rfc6749/clients/test_base.py index c788bc1..d48a944 100644 --- a/tests/oauth2/rfc6749/clients/test_base.py +++ b/tests/oauth2/rfc6749/clients/test_base.py @@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals import datetime from oauthlib import common -from oauthlib.oauth2 import Client, InsecureTransportError +from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError from oauthlib.oauth2.rfc6749 import utils from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY @@ -51,10 +51,26 @@ class ClientTest(TestCase): self.assertFormBodyEqual(body, self.body) self.assertEqual(headers, self.bearer_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + client = Client(self.client_id, access_token=self.access_token, token_type="Bearer") + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers) + # Missing access token client = Client(self.client_id) self.assertRaises(ValueError, client.add_token, self.uri) + # Expired token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, headers=self.headers) + # The default token placement, bearer in auth header client = Client(self.client_id, access_token=self.access_token) uri, headers, body = client.add_token(self.uri, body=self.body, @@ -150,8 +166,26 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_00_header) + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + issue_time=datetime.datetime.now()) - # Add the Authorization header (draft 00) + # Add the Authorization header (draft 01) client = Client(self.client_id, token_type="MAC", access_token=self.access_token, mac_key=self.mac_key, mac_algorithm="hmac-sha-1") @@ -160,7 +194,24 @@ class ClientTest(TestCase): self.assertEqual(uri, self.uri) self.assertEqual(body, self.body) self.assertEqual(headers, self.mac_01_header) - + # Non-HTTPS + insecure_uri = 'http://example.com/path?query=world' + self.assertRaises(InsecureTransportError, client.add_token, insecure_uri, + body=self.body, + headers=self.headers, + draft=1) + # Expired Token + expired = 523549800 + expired_token = { + 'expires_at': expired, + } + client = Client(self.client_id, token=expired_token, token_type="MAC", + access_token=self.access_token, mac_key=self.mac_key, + mac_algorithm="hmac-sha-1") + self.assertRaises(TokenExpiredError, client.add_token, self.uri, + body=self.body, + headers=self.headers, + draft=1) def test_revocation_request(self): client = Client(self.client_id) @@ -208,6 +259,21 @@ class ClientTest(TestCase): # NotImplementedError self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url) + def test_prepare_token_request(self): + redirect_url = 'https://example.com/callback/' + scopes = 'read' + token_url = 'https://example.com/token/' + state = 'fake_state' + + client = Client(self.client_id, scope=scopes, state=state) + + # Non-HTTPS + self.assertRaises(InsecureTransportError, + client.prepare_token_request, 'http://example.com/token/') + + # NotImplementedError + self.assertRaises(NotImplementedError, client.prepare_token_request, token_url) + def test_prepare_refresh_token_request(self): client = Client(self.client_id) diff --git a/tests/oauth2/rfc6749/clients/test_service_application.py b/tests/oauth2/rfc6749/clients/test_service_application.py index 2dc633a..dc337cf 100644 --- a/tests/oauth2/rfc6749/clients/test_service_application.py +++ b/tests/oauth2/rfc6749/clients/test_service_application.py @@ -89,8 +89,8 @@ mfvGGg3xNjTMO7IdrwIDAQAB audience=self.audience, body=self.body) r = Request('https://a.b', body=body) - self.assertEqual(r.isnot, 'empty') - self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) @@ -98,6 +98,72 @@ mfvGGg3xNjTMO7IdrwIDAQAB # audience verification is handled during decode now self.assertEqual(claim['sub'], self.subject) self.assertEqual(claim['iat'], int(t.return_value)) + self.assertNotIn('nbf', claim) + self.assertNotIn('jti', claim) + + # Missing issuer parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=None, subject=self.subject, audience=self.audience, body=self.body) + + # Missing subject parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=None, audience=self.audience, body=self.body) + + # Missing audience parameter + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=None, body=self.body) + + # Optional kwargs + not_before = time() - 3600 + jwt_id = '8zd15df4s35f43sd' + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + not_before=not_before, + jwt_id=jwt_id) + + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + self.assertEqual(claim['nbf'], not_before) + self.assertEqual(claim['jti'], jwt_id) + + @patch('time.time') + def test_request_body_no_initial_private_key(self, t): + t.return_value = time() + self.token['expires_at'] = self.token['expires_in'] + t.return_value + + client = ServiceApplicationClient( + self.client_id, private_key=None) + + # Basic with private key provided + body = client.prepare_request_body(issuer=self.issuer, + subject=self.subject, + audience=self.audience, + body=self.body, + private_key=self.private_key) + r = Request('https://a.b', body=body) + self.assertEqual(r.isnot, 'empty') + self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type) + + claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256']) + + self.assertEqual(claim['iss'], self.issuer) + # audience verification is handled during decode now + self.assertEqual(claim['sub'], self.subject) + self.assertEqual(claim['iat'], int(t.return_value)) + + # No private key provided + self.assertRaises(ValueError, client.prepare_request_body, + issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body) @patch('time.time') def test_parse_token_response(self, t): diff --git a/tests/oauth2/rfc6749/clients/test_web_application.py b/tests/oauth2/rfc6749/clients/test_web_application.py index fa6643e..4ecc3b3 100644 --- a/tests/oauth2/rfc6749/clients/test_web_application.py +++ b/tests/oauth2/rfc6749/clients/test_web_application.py @@ -38,7 +38,7 @@ class WebApplicationClientTest(TestCase): code = "zzzzaaaa" body = "not=empty" - body_code = "not=empty&grant_type=authorization_code&code=%s&client_id=%s" % (code, client_id) + body_code = "not=empty&grant_type=authorization_code&code=%s" % code body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" body_kwargs = body_code + "&some=providers&require=extra+arguments" diff --git a/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py new file mode 100644 index 0000000..7ec8190 --- /dev/null +++ b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from json import loads + +from mock import MagicMock + +from oauthlib.common import urlencode +from oauthlib.oauth2 import RequestValidator, IntrospectEndpoint + +from ....unittest import TestCase + + +class IntrospectEndpointTest(TestCase): + + def setUp(self): + self.validator = MagicMock(wraps=RequestValidator()) + self.validator.client_authentication_required.return_value = True + self.validator.authenticate_client.return_value = True + self.validator.validate_bearer_token.return_value = True + self.validator.introspect_token.return_value = {} + self.endpoint = IntrospectEndpoint(self.validator) + + self.uri = 'should_not_matter' + self.headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + self.resp_h = { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + 'Pragma': 'no-cache' + } + self.resp_b = { + "active": True + } + + def test_introspect_token(self): + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_nohint(self): + # don't specify token_type_hint + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_false(self): + self.validator.introspect_token.return_value = None + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": False}) + self.assertEqual(s, 200) + + def test_introspect_token_claims(self): + self.validator.introspect_token.return_value = {"foo": "bar"} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_claims_spoof_active(self): + self.validator.introspect_token.return_value = {"foo": "bar", "active": False} + body = urlencode([('token', 'foo')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), {"active": True, "foo": "bar"}) + self.assertEqual(s, 200) + + def test_introspect_token_client_authentication_failed(self): + self.validator.authenticate_client.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + def test_introspect_token_public_client_authentication(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = True + for token_type in ('access_token', 'refresh_token', 'invalid'): + body = urlencode([('token', 'foo'), + ('token_type_hint', token_type)]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, self.resp_h) + self.assertEqual(loads(b), self.resp_b) + self.assertEqual(s, 200) + + def test_introspect_token_public_client_authentication_failed(self): + self.validator.client_authentication_required.return_value = False + self.validator.authenticate_client_id.return_value = False + body = urlencode([('token', 'foo'), + ('token_type_hint', 'access_token')]) + h, b, s = self.endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_client') + self.assertEqual(s, 401) + + + def test_introspect_unsupported_token(self): + endpoint = IntrospectEndpoint(self.validator, + supported_token_types=['access_token']) + body = urlencode([('token', 'foo'), + ('token_type_hint', 'refresh_token')]) + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body=body) + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'unsupported_token_type') + self.assertEqual(s, 400) + + h, b, s = endpoint.create_introspect_response(self.uri, + headers=self.headers, body='') + self.assertEqual(h, {}) + self.assertEqual(loads(b)['error'], 'invalid_request') + self.assertEqual(s, 400) diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py index 87781b3..8490c03 100644 --- a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py +++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py @@ -87,7 +87,7 @@ class TestScopeHandling(TestCase): self.assertIn('Location', h) code = get_query_credentials(h['Location'])['code'][0] _, body, _ = getattr(self, backend_server_type).create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code) self.assertEqual(json.loads(body)['scope'], decoded_scope) # implicit grant diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py deleted file mode 100644 index f10d36c..0000000 --- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py +++ /dev/null @@ -1,291 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - -import json - -import mock - -from oauthlib.common import Request -from oauthlib.oauth2.rfc6749.grant_types import (OIDCNoPrompt, - OpenIDConnectAuthCode, - OpenIDConnectHybrid, - OpenIDConnectImplicit) -from oauthlib.oauth2.rfc6749.tokens import BearerToken - -from ....unittest import TestCase -from .test_authorization_code import AuthorizationCodeGrantTest -from .test_implicit import ImplicitGrantTest - - -class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDAuthCodeInterferenceTest, self).setUp() - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - -class OpenIDImplicitInterferenceTest(ImplicitGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDImplicitInterferenceTest, self).setUp() - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - -class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): - """Test that OpenID don't interfere with normal OAuth 2 flows.""" - - def setUp(self): - super(OpenIDHybridInterferenceTest, self).setUp() - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - - -def get_id_token_mock(token, token_handler, request): - return "MOCKED_TOKEN" - - -class OpenIDAuthCodeTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.code = '1234' - self.request.response_type = 'code' - self.request.grant_type = 'authorization_code' - self.request.redirect_uri = 'https://a.b/cb' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.authenticate_client.side_effect = self.set_client - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator) - - self.url_query = 'https://a.b/cb?code=abc&state=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc' - - def set_client(self, request): - request.client = mock.MagicMock() - request.client.client_id = 'mocked' - return True - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_mode = 'query' - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'fragment' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - def set_scopes(self, client_id, code, client, request): - request.scopes = self.request.scopes - request.state = self.request.state - request.user = 'bob' - return True - - def test_create_token_response(self): - self.request.response_type = None - self.mock_validator.validate_code.side_effect = self.set_scopes - - bearer = BearerToken(self.mock_validator) - - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertIn('id_token', token) - self.assertIn('openid', token['scope']) - - self.mock_validator.reset_mock() - - self.request.scopes = ('hello', 'world') - h, token, s = self.auth.create_token_response(self.request, bearer) - token = json.loads(token) - self.assertEqual(self.mock_validator.save_token.call_count, 1) - self.assertIn('access_token', token) - self.assertIn('refresh_token', token) - self.assertIn('expires_in', token) - self.assertIn('scope', token) - self.assertNotIn('id_token', token) - self.assertNotIn('openid', token['scope']) - - -class OpenIDImplicitTest(TestCase): - - def setUp(self): - self.request = Request('http://a.b/path') - self.request.scopes = ('hello', 'openid') - self.request.expires_in = 1800 - self.request.client_id = 'abcdef' - self.request.response_type = 'id_token token' - self.request.redirect_uri = 'https://a.b/cb' - self.request.nonce = 'zxc' - self.request.state = 'abc' - - self.mock_validator = mock.MagicMock() - self.mock_validator.get_id_token.side_effect = get_id_token_mock - self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator) - - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - - @mock.patch('oauthlib.common.generate_token') - def test_authorization(self, generate_token): - scope, info = self.auth.validate_authorization_request(self.request) - - generate_token.return_value = 'abc' - bearer = BearerToken(self.mock_validator) - - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.response_type = 'id_token' - token = 'MOCKED_TOKEN' - url = 'https://a.b/cb#state=abc&id_token=%s' % token - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], url, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.nonce = None - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - @mock.patch('oauthlib.common.generate_token') - def test_no_prompt_authorization(self, generate_token): - generate_token.return_value = 'abc' - scope, info = self.auth.validate_authorization_request(self.request) - self.request.prompt = 'none' - self.assertRaises(OIDCNoPrompt, - self.auth.validate_authorization_request, - self.request) - - # prompt == none requires id token hint - bearer = BearerToken(self.mock_validator) - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=invalid_request', h['Location']) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - self.request.id_token_hint = 'me@email.com' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) - self.assertEqual(b, None) - self.assertEqual(s, 302) - - # Test alernative response modes - self.request.response_mode = 'query' - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertURLEqual(h['Location'], self.url_query) - - # Ensure silent authentication and authorization is done - self.mock_validator.validate_silent_login.return_value = False - self.mock_validator.validate_silent_authorization.return_value = True - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - self.mock_validator.validate_silent_login.return_value = True - self.mock_validator.validate_silent_authorization.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=consent_required', h['Location']) - - # ID token hint must match logged in user - self.mock_validator.validate_silent_authorization.return_value = True - self.mock_validator.validate_user_match.return_value = False - h, b, s = self.auth.create_authorization_response(self.request, bearer) - self.assertIn('error=login_required', h['Location']) - - -class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeTokenTest, self).setUp() - self.request.response_type = 'code token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' - -class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTest, self).setUp() - self.request.response_type = 'code id_token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token - -class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): - - def setUp(self): - super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() - self.request.response_type = 'code id_token token' - self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator) - token = 'MOCKED_TOKEN' - self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token - self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/oauth2/rfc6749/test_parameters.py b/tests/oauth2/rfc6749/test_parameters.py index 1cac879..2a11d33 100644 --- a/tests/oauth2/rfc6749/test_parameters.py +++ b/tests/oauth2/rfc6749/test_parameters.py @@ -116,13 +116,6 @@ class ParameterTests(TestCase): ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' ' "example_parameter": "example_value" }') - json_expires = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",' - ' "token_type": "example",' - ' "expires": 3600,' - ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",' - ' "example_parameter": "example_value",' - ' "scope":"abc def"}') - json_dict = { 'access_token': '2YotnFZFEjr1zCsicMWpAA', 'token_type': 'example', @@ -268,7 +261,3 @@ class ParameterTests(TestCase): finally: signals.scope_changed.disconnect(record_scope_change) del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] - - def test_token_response_with_expires(self): - """Verify fallback for alternate spelling of expires_in. """ - self.assertEqual(parse_token_response(self.json_expires), self.json_dict) diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py index 305b795..bc7a2b7 100644 --- a/tests/oauth2/rfc6749/test_server.py +++ b/tests/oauth2/rfc6749/test_server.py @@ -3,21 +3,17 @@ from __future__ import absolute_import, unicode_literals import json -import jwt import mock from oauthlib import common from oauthlib.oauth2.rfc6749 import errors, tokens from oauthlib.oauth2.rfc6749.endpoints import Server -from oauthlib.oauth2.rfc6749.endpoints.authorization import \ - AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint from oauthlib.oauth2.rfc6749.grant_types import (AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, - OpenIDConnectAuthCode, - OpenIDConnectImplicit, ResourceOwnerPasswordCredentialsGrant) from ...unittest import TestCase @@ -29,40 +25,34 @@ class AuthorizationEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) auth_code.save_authorization_code = mock.MagicMock() implicit = ImplicitGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) implicit.save_token = mock.MagicMock() - openid_connect_auth = OpenIDConnectAuthCode(self.mock_validator) - openid_connect_implicit = OpenIDConnectImplicit(self.mock_validator) - response_types = { - 'code': auth_code, - 'token': implicit, - - 'id_token': openid_connect_implicit, - 'id_token token': openid_connect_implicit, - 'code token': openid_connect_auth, - 'code id_token': openid_connect_auth, - 'code token id_token': openid_connect_auth, - 'none': auth_code + 'code': auth_code, + 'token': implicit, + 'none': auth_code } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) self.endpoint = AuthorizationEndpoint( - default_response_type='code', - default_token_type=token, - response_types=response_types) + default_response_type='code', + default_token_type=token, + response_types=response_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') @@ -71,7 +61,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) @@ -79,7 +69,7 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) self.assertEqual(body, None) @@ -99,9 +89,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.InvalidRequestError()) + side_effect=errors.InvalidRequestError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') @@ -109,9 +99,9 @@ class AuthorizationEndpointTest(TestCase): uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( - side_effect=errors.UnsupportedResponseTypeError()) + side_effect=errors.UnsupportedResponseTypeError()) headers, body, status_code = self.endpoint.create_authorization_response( - uri, scopes=['all', 'of', 'them']) + uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') @@ -129,27 +119,32 @@ class TokenEndpointTest(TestCase): self.mock_validator.authenticate_client.side_effect = set_user self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) password = ResourceOwnerPasswordCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) client = ClientCredentialsGrant( - request_validator=self.mock_validator) + request_validator=self.mock_validator) supported_types = { - 'authorization_code': auth_code, - 'password': password, - 'client_credentials': client, + 'authorization_code': auth_code, + 'password': password, + 'client_credentials': client, } self.expires_in = 1800 - token = tokens.BearerToken(self.mock_validator, - expires_in=self.expires_in) - self.endpoint = TokenEndpoint('authorization_code', - default_token_type=token, grant_types=supported_types) + token = tokens.BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -176,7 +171,7 @@ class TokenEndpointTest(TestCase): def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -190,7 +185,7 @@ class TokenEndpointTest(TestCase): def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, @@ -279,9 +274,9 @@ twIDAQAB @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): - body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -293,9 +288,9 @@ twIDAQAB } self.assertEqual(body, token) - body = 'grant_type=authorization_code&code=abc&state=xyz' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -310,7 +305,7 @@ twIDAQAB def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -325,7 +320,7 @@ twIDAQAB def test_scopes_and_user_id_stored_in_access_token(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) access_token = json.loads(body)['access_token'] @@ -338,7 +333,7 @@ twIDAQAB def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( - '', body=body) + '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', @@ -349,12 +344,12 @@ twIDAQAB self.assertEqual(body, token) def test_missing_type(self): - _, body, _ = self.endpoint.create_token_response('', body='') + _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc') token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) def test_invalid_type(self): - body = 'grant_type=invalid' + body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc' _, body, _ = self.endpoint.create_token_response('', body=body) token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) @@ -366,8 +361,10 @@ class ResourceEndpointTest(TestCase): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) token = tokens.BearerToken(request_validator=self.mock_validator) - self.endpoint = ResourceEndpoint(default_token='Bearer', - token_types={'Bearer': token}) + self.endpoint = ResourceEndpoint( + default_token='Bearer', + token_types={'Bearer': token} + ) def test_defaults(self): uri = 'http://a.b/path?some=query' diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index e2e558d..061754f 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -1,6 +1,11 @@ from __future__ import absolute_import, unicode_literals -from oauthlib.oauth2.rfc6749.tokens import * +from oauthlib.oauth2.rfc6749.tokens import ( + prepare_mac_header, + prepare_bearer_headers, + prepare_bearer_body, + prepare_bearer_uri, +) from ...unittest import TestCase @@ -59,9 +64,22 @@ class TokenTest(TestCase): bearer_headers = { 'Authorization': 'Bearer vF9dft4qmT' } + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9d ft4qmT'}, + ] + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} bearer_body = 'access_token=vF9dft4qmT' bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True + def test_prepare_mac_header(self): """Verify mac signatures correctness diff --git a/tests/openid/__init__.py b/tests/openid/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/__init__.py diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/__init__.py diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/openid/connect/core/__init__.py diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py index 9795c80..37a7cdd 100644 --- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py +++ b/tests/openid/connect/core/endpoints/test_claims_handling.py @@ -10,10 +10,12 @@ from __future__ import absolute_import, unicode_literals import mock -from oauthlib.oauth2 import InvalidRequestError, RequestValidator, Server +from oauthlib.oauth2 import RequestValidator + +from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server from ....unittest import TestCase -from .test_utils import get_fragment_credentials, get_query_credentials +from .test_utils import get_query_credentials class TestClaimsHandling(TestCase): @@ -81,7 +83,7 @@ class TestClaimsHandling(TestCase): } } - claims_urlquoted='%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' + claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D' uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s' h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope') @@ -90,8 +92,10 @@ class TestClaimsHandling(TestCase): code = get_query_credentials(h['Location'])['code'][0] token_uri = 'http://example.com/path' - _, body, _ = self.server.create_token_response(token_uri, - body='grant_type=authorization_code&code=%s' % code) + _, body, _ = self.server.create_token_response( + token_uri, + body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code + ) self.assertDictEqual(self.claims_saved_with_bearer_token, claims) diff --git a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py index 89431b6..89431b6 100644 --- a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py +++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py new file mode 100644 index 0000000..1bad120 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_authorization_code.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.common import Request +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +def get_id_token_mock(token, token_handler, request): + return "MOCKED_TOKEN" + + +class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDAuthCodeInterferenceTest, self).setUp() + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + +class OpenIDAuthCodeTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.code = '1234' + self.request.response_type = 'code' + self.request.grant_type = 'authorization_code' + self.request.redirect_uri = 'https://a.b/cb' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = self.set_client + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator) + + self.url_query = 'https://a.b/cb?code=abc&state=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc' + + def set_client(self, request): + request.client = mock.MagicMock() + request.client.client_id = 'mocked' + return True + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_mode = 'query' + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'fragment' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + def set_scopes(self, client_id, code, client, request): + request.scopes = self.request.scopes + request.state = self.request.state + request.user = 'bob' + return True + + def test_create_token_response(self): + self.request.response_type = None + self.mock_validator.validate_code.side_effect = self.set_scopes + + bearer = BearerToken(self.mock_validator) + + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertIn('id_token', token) + self.assertIn('openid', token['scope']) + + self.mock_validator.reset_mock() + + self.request.scopes = ('hello', 'world') + h, token, s = self.auth.create_token_response(self.request, bearer) + token = json.loads(token) + self.assertEqual(self.mock_validator.save_token.call_count, 1) + self.assertIn('access_token', token) + self.assertIn('refresh_token', token) + self.assertIn('expires_in', token) + self.assertIn('scope', token) + self.assertNotIn('id_token', token) + self.assertNotIn('openid', token['scope']) diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py new file mode 100644 index 0000000..f90ec46 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_dispatchers.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +import mock + +from oauthlib.common import Request + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.dispatchers import ( + ImplicitTokenGrantDispatcher, + AuthorizationTokenGrantDispatcher +) + +from oauthlib.oauth2.rfc6749.grant_types import ( + AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant, + ImplicitGrant as OAuth2ImplicitGrant, +) + + +from ....unittest import TestCase + + +class ImplicitTokenGrantDispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + request_validator = mock.MagicMock() + implicit_grant = OAuth2ImplicitGrant(request_validator) + openid_connect_implicit = ImplicitGrant(request_validator) + + self.dispatcher = ImplicitTokenGrantDispatcher( + default_implicit_grant=implicit_grant, + oidc_implicit_grant=openid_connect_implicit + ) + + def test_create_authorization_response_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_openid(self): + self.request.scopes = ('hello', 'openid') + self.request.response_type = 'id_token' + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_create_authorization_response_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + def test_validate_authorization_request_oauth(self): + self.request.scopes = ('hello', 'world') + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, ImplicitGrant)) + + +class DispatcherTest(TestCase): + def setUp(self): + self.request = Request('http://a.b/path') + self.request.decoded_body = ( + ("client_id", "me"), + ("code", "code"), + ("redirect_url", "https://a.b/cb"), + ) + + self.request_validator = mock.MagicMock() + self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator) + self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator) + + +class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp() + self.request.decoded_body = ( + ("client_id", "me"), + ("code", ""), + ("redirect_url", "https://a.b/cb"), + ) + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_openid_without_code(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called) + + +class AuthTokenGrantDispatcherOAuthTest(DispatcherTest): + + def setUp(self): + super(AuthTokenGrantDispatcherOAuthTest, self).setUp() + self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world') + self.dispatcher = AuthorizationTokenGrantDispatcher( + self.request_validator, + default_token_grant=self.auth_grant, + oidc_token_grant=self.openid_connect_auth + ) + + def test_create_token_response_oauth(self): + handler = self.dispatcher._handler_for_request(self.request) + self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant)) + self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called) diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py new file mode 100644 index 0000000..531ae7f --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_hybrid.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest + + +class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDHybridInterferenceTest, self).setUp() + self.auth = HybridGrant(request_validator=self.mock_validator) diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py new file mode 100644 index 0000000..56247d9 --- /dev/null +++ b/tests/openid/connect/core/grant_types/test_implicit.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.common import Request + +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant +from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt + +from ....unittest import TestCase +from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest + +from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest + + +class OpenIDImplicitInterferenceTest(ImplicitGrantTest): + """Test that OpenID don't interfere with normal OAuth 2 flows.""" + + def setUp(self): + super(OpenIDImplicitInterferenceTest, self).setUp() + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + +class OpenIDImplicitTest(TestCase): + + def setUp(self): + self.request = Request('http://a.b/path') + self.request.scopes = ('hello', 'openid') + self.request.expires_in = 1800 + self.request.client_id = 'abcdef' + self.request.response_type = 'id_token token' + self.request.redirect_uri = 'https://a.b/cb' + self.request.nonce = 'zxc' + self.request.state = 'abc' + + self.mock_validator = mock.MagicMock() + self.mock_validator.get_id_token.side_effect = get_id_token_mock + self.auth = ImplicitGrant(request_validator=self.mock_validator) + + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + + @mock.patch('oauthlib.common.generate_token') + def test_authorization(self, generate_token): + scope, info = self.auth.validate_authorization_request(self.request) + + generate_token.return_value = 'abc' + bearer = BearerToken(self.mock_validator) + + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.response_type = 'id_token' + token = 'MOCKED_TOKEN' + url = 'https://a.b/cb#state=abc&id_token=%s' % token + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], url, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.nonce = None + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + @mock.patch('oauthlib.common.generate_token') + def test_no_prompt_authorization(self, generate_token): + generate_token.return_value = 'abc' + scope, info = self.auth.validate_authorization_request(self.request) + self.request.prompt = 'none' + self.assertRaises(OIDCNoPrompt, + self.auth.validate_authorization_request, + self.request) + + # prompt == none requires id token hint + bearer = BearerToken(self.mock_validator) + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=invalid_request', h['Location']) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + self.request.id_token_hint = 'me@email.com' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True) + self.assertEqual(b, None) + self.assertEqual(s, 302) + + # Test alernative response modes + self.request.response_mode = 'query' + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertURLEqual(h['Location'], self.url_query) + + # Ensure silent authentication and authorization is done + self.mock_validator.validate_silent_login.return_value = False + self.mock_validator.validate_silent_authorization.return_value = True + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + self.mock_validator.validate_silent_login.return_value = True + self.mock_validator.validate_silent_authorization.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=consent_required', h['Location']) + + # ID token hint must match logged in user + self.mock_validator.validate_silent_authorization.return_value = True + self.mock_validator.validate_user_match.return_value = False + h, b, s = self.auth.create_authorization_response(self.request, bearer) + self.assertIn('error=login_required', h['Location']) + + +class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeTokenTest, self).setUp() + self.request.response_type = 'code token' + self.auth = HybridGrant(request_validator=self.mock_validator) + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc' + + +class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTest, self).setUp() + self.request.response_type = 'code id_token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token + + +class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest): + + def setUp(self): + super(OpenIDHybridCodeIdTokenTokenTest, self).setUp() + self.request.response_type = 'code id_token token' + self.auth = HybridGrant(request_validator=self.mock_validator) + token = 'MOCKED_TOKEN' + self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token + self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py new file mode 100644 index 0000000..14a7c23 --- /dev/null +++ b/tests/openid/connect/core/test_request_validator.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +from oauthlib.openid.connect.core.request_validator import RequestValidator + +from ....unittest import TestCase + + +class RequestValidatorTest(TestCase): + + def test_method_contracts(self): + v = RequestValidator() + self.assertRaises( + NotImplementedError, + v.get_authorization_code_scopes, + 'client_id', 'code', 'redirect_uri', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_jwt_bearer_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.get_id_token, + 'token', 'token_handler', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_jwt_bearer_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_id_token, + 'token', 'scopes', 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_authorization, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_silent_login, + 'request' + ) + self.assertRaises( + NotImplementedError, + v.validate_user_match, + 'id_token_hint', 'scopes', 'claims', 'request' + ) diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py new file mode 100644 index 0000000..83290db --- /dev/null +++ b/tests/openid/connect/core/test_server.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import json + +import mock + +from oauthlib.oauth2.rfc6749 import errors +from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint +from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint +from oauthlib.oauth2.rfc6749.tokens import BearerToken + +from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant +from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant +from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant + +from ....unittest import TestCase + + +class AuthorizationEndpointTest(TestCase): + + def setUp(self): + self.mock_validator = mock.MagicMock() + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator) + auth_code.save_authorization_code = mock.MagicMock() + implicit = ImplicitGrant( + request_validator=self.mock_validator) + implicit.save_token = mock.MagicMock() + hybrid = HybridGrant(self.mock_validator) + + response_types = { + 'code': auth_code, + 'token': implicit, + 'id_token': implicit, + 'id_token token': implicit, + 'code token': hybrid, + 'code id_token': hybrid, + 'code token id_token': hybrid, + 'none': auth_code + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = AuthorizationEndpoint( + default_response_type='code', + default_token_type=token, + response_types=response_types + ) + + # TODO: Add hybrid grant test + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_implicit_grant(self): + uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) + + def test_none_grant(self): + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + # and without the state parameter + uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) + self.assertEqual(body, None) + self.assertEqual(status_code, 302) + + def test_missing_type(self): + uri = 'http://i.b/l?client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.InvalidRequestError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') + + def test_invalid_type(self): + uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' + uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' + self.mock_validator.validate_request = mock.MagicMock( + side_effect=errors.UnsupportedResponseTypeError()) + headers, body, status_code = self.endpoint.create_authorization_response( + uri, scopes=['all', 'of', 'them']) + self.assertIn('Location', headers) + self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') + + +class TokenEndpointTest(TestCase): + + def setUp(self): + def set_user(request): + request.user = mock.MagicMock() + request.client = mock.MagicMock() + request.client.client_id = 'mocked_client_id' + return True + + self.mock_validator = mock.MagicMock() + self.mock_validator.authenticate_client.side_effect = set_user + self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) + auth_code = AuthorizationCodeGrant( + request_validator=self.mock_validator) + supported_types = { + 'authorization_code': auth_code, + } + self.expires_in = 1800 + token = BearerToken( + self.mock_validator, + expires_in=self.expires_in + ) + self.endpoint = TokenEndpoint( + 'authorization_code', + default_token_type=token, + grant_types=supported_types + ) + + @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') + def test_authorization_grant(self): + body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'scope': 'all of them', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + body = 'grant_type=authorization_code&code=abc&state=xyz' + headers, body, status_code = self.endpoint.create_token_response( + '', body=body) + token = { + 'token_type': 'Bearer', + 'expires_in': self.expires_in, + 'access_token': 'abc', + 'refresh_token': 'abc', + 'state': 'xyz' + } + self.assertEqual(json.loads(body), token) + + def test_missing_type(self): + _, body, _ = self.endpoint.create_token_response('', body='') + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) + + def test_invalid_type(self): + body = 'grant_type=invalid' + _, body, _ = self.endpoint.create_token_response('', body=body) + token = {'error': 'unsupported_grant_type'} + self.assertEqual(json.loads(body), token) diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py new file mode 100644 index 0000000..12c75f1 --- /dev/null +++ b/tests/openid/connect/core/test_tokens.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from oauthlib.openid.connect.core.tokens import JWTToken + +from ....unittest import TestCase + + +class JWTTokenTestCase(TestCase): + + def test_create_token_callable_expires_in(self): + """ + Test retrieval of the expires in value by calling the callable expires_in property + """ + + expires_in_mock = mock.MagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + expires_in_mock.assert_called_once_with(request_mock) + + def test_create_token_non_callable_expires_in(self): + """ + When a non callable expires in is set this should just be set to the request + """ + + expires_in_mock = mock.NonCallableMagicMock() + request_mock = mock.MagicMock() + + token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock()) + token.create_token(request=request_mock) + + self.assertFalse(expires_in_mock.called) + self.assertEqual(request_mock.expires_in, expires_in_mock) + + def test_create_token_calls_get_id_token(self): + """ + When create_token is called the call should be forwarded to the get_id_token on the token validator + """ + request_mock = mock.MagicMock() + + with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + + request_validator = RequestValidatorMock() + + token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator) + token.create_token(request=request_mock) + + request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock) + + def test_validate_request_token_from_headers(self): + """ + Bearer token get retrieved from headers. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.headers = { + 'Authorization': 'Bearer some-token-from-header' + } + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header', + request.scopes, + request) + + def test_validate_token_from_request(self): + """ + Token get retrieved from request object. + """ + + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \ + mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator', + autospec=True) as RequestValidatorMock: + request_validator_mock = RequestValidatorMock() + + token = JWTToken(request_validator=request_validator_mock) + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.scopes = mock.MagicMock() + request.access_token = 'some-token-from-request-object' + request.headers = {} + + token.validate_request(request=request) + + request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object', + request.scopes, + request) + + def test_estimate_type(self): + """ + Estimate type results for a jwt token + """ + + def test_token(token, expected_result): + with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock: + jwt_token = JWTToken() + + request = RequestMock('/uri') + # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch + # with autospec=True + request.headers = { + 'Authorization': 'Bearer {}'.format(token) + } + + result = jwt_token.estimate_type(request=request) + + self.assertEqual(result, expected_result) + + test_items = ( + ('eyfoo.foo.foo', 10), + ('eyfoo.foo.foo.foo.foo', 10), + ('eyfoobar', 0) + ) + + for token, expected_result in test_items: + test_token(token, expected_result) |