summaryrefslogtreecommitdiff
path: root/tests/openid/connect
diff options
context:
space:
mode:
Diffstat (limited to 'tests/openid/connect')
-rw-r--r--tests/openid/connect/__init__.py0
-rw-r--r--tests/openid/connect/core/__init__.py0
-rw-r--r--tests/openid/connect/core/endpoints/test_claims_handling.py109
-rw-r--r--tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py85
-rw-r--r--tests/openid/connect/core/grant_types/test_authorization_code.py153
-rw-r--r--tests/openid/connect/core/grant_types/test_dispatchers.py125
-rw-r--r--tests/openid/connect/core/grant_types/test_hybrid.py13
-rw-r--r--tests/openid/connect/core/grant_types/test_implicit.py148
-rw-r--r--tests/openid/connect/core/test_request_validator.py52
-rw-r--r--tests/openid/connect/core/test_server.py178
-rw-r--r--tests/openid/connect/core/test_tokens.py133
11 files changed, 996 insertions, 0 deletions
diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/openid/connect/__init__.py
diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/openid/connect/core/__init__.py
diff --git a/tests/openid/connect/core/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py
new file mode 100644
index 0000000..37a7cdd
--- /dev/null
+++ b/tests/openid/connect/core/endpoints/test_claims_handling.py
@@ -0,0 +1,109 @@
+"""Ensure OpenID Connect Authorization Request 'claims' are preserved across authorization.
+
+The claims parameter is an optional query param for the Authorization Request endpoint
+ but if it is provided and is valid it needs to be deserialized (from urlencoded JSON)
+ and persisted with the authorization code itself, then in the subsequent Access Token
+ request the claims should be transferred (via the oauthlib request) to be persisted
+ with the Access Token when it is created.
+"""
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.oauth2 import RequestValidator
+
+from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server
+
+from ....unittest import TestCase
+from .test_utils import get_query_credentials
+
+
+class TestClaimsHandling(TestCase):
+
+ DEFAULT_REDIRECT_URI = 'http://i.b./path'
+
+ def set_scopes(self, scopes):
+ def set_request_scopes(client_id, code, client, request):
+ request.scopes = scopes
+ return True
+ return set_request_scopes
+
+ def set_user(self, request):
+ request.user = 'foo'
+ request.client_id = 'bar'
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ def save_claims_with_code(self, client_id, code, request, *args, **kwargs):
+ # a real validator would save the claims with the code during save_authorization_code()
+ self.claims_from_auth_code_request = request.claims
+ self.scopes = request.scopes.split()
+
+ def retrieve_claims_saved_with_code(self, client_id, code, client, request, *args, **kwargs):
+ request.claims = self.claims_from_auth_code_request
+ request.scopes = self.scopes
+
+ return True
+
+ def save_claims_with_bearer_token(self, token, request, *args, **kwargs):
+ # a real validator would save the claims with the access token during save_bearer_token()
+ self.claims_saved_with_bearer_token = request.claims
+
+ def setUp(self):
+ self.validator = mock.MagicMock(spec=RequestValidator)
+ self.validator.get_default_redirect_uri.return_value = TestClaimsHandling.DEFAULT_REDIRECT_URI
+ self.validator.authenticate_client.side_effect = self.set_client
+
+ self.validator.save_authorization_code.side_effect = self.save_claims_with_code
+ self.validator.validate_code.side_effect = self.retrieve_claims_saved_with_code
+ self.validator.save_token.side_effect = self.save_claims_with_bearer_token
+
+ self.server = Server(self.validator)
+
+ def test_claims_stored_on_code_creation(self):
+
+ claims = {
+ "id_token": {
+ "claim_1": None,
+ "claim_2": {
+ "essential": True
+ }
+ },
+ "userinfo": {
+ "claim_3": {
+ "essential": True
+ },
+ "claim_4": None
+ }
+ }
+
+ claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D'
+ uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s'
+
+ h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope')
+
+ self.assertDictEqual(self.claims_from_auth_code_request, claims)
+
+ code = get_query_credentials(h['Location'])['code'][0]
+ token_uri = 'http://example.com/path'
+ _, body, _ = self.server.create_token_response(
+ token_uri,
+ body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code
+ )
+
+ self.assertDictEqual(self.claims_saved_with_bearer_token, claims)
+
+ def test_invalid_claims(self):
+ uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=this-is-not-json'
+
+ h, b, s = self.server.create_authorization_response(uri, scopes='openid test_scope')
+ error = get_query_credentials(h['Location'])['error'][0]
+ error_desc = get_query_credentials(h['Location'])['error_description'][0]
+ self.assertEqual(error, 'invalid_request')
+ self.assertEqual(error_desc, "Malformed claims parameter")
diff --git a/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py
new file mode 100644
index 0000000..89431b6
--- /dev/null
+++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py
@@ -0,0 +1,85 @@
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.oauth2 import InvalidRequestError
+from oauthlib.oauth2.rfc6749.endpoints.authorization import \
+ AuthorizationEndpoint
+from oauthlib.oauth2.rfc6749.grant_types import OpenIDConnectAuthCode
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from ....unittest import TestCase
+
+try:
+ from urllib.parse import urlencode
+except ImportError:
+ from urllib import urlencode
+
+
+
+
+class OpenIDConnectEndpointTest(TestCase):
+
+ def setUp(self):
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = self.set_client
+ grant = OpenIDConnectAuthCode(request_validator=self.mock_validator)
+ bearer = BearerToken(self.mock_validator)
+ self.endpoint = AuthorizationEndpoint(grant, bearer,
+ response_types={'code': grant})
+ params = {
+ 'prompt': 'consent',
+ 'display': 'touch',
+ 'nonce': 'abcd',
+ 'state': 'abc',
+ 'redirect_uri': 'https://a.b/cb',
+ 'response_type': 'code',
+ 'client_id': 'abcdef',
+ 'scope': 'hello openid'
+ }
+ self.url = 'http://a.b/path?' + urlencode(params)
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_authorization_endpoint_handles_prompt(self, generate_token):
+ generate_token.return_value = "MOCK_CODE"
+ # In the GET view:
+ scopes, creds = self.endpoint.validate_authorization_request(self.url)
+ # In the POST view:
+ creds['scopes'] = scopes
+ h, b, s = self.endpoint.create_authorization_response(self.url,
+ credentials=creds)
+ expected = 'https://a.b/cb?state=abc&code=MOCK_CODE'
+ self.assertURLEqual(h['Location'], expected)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ def test_prompt_none_exclusiveness(self):
+ """
+ Test that prompt=none can't be used with another prompt value.
+ """
+ params = {
+ 'prompt': 'none consent',
+ 'state': 'abc',
+ 'redirect_uri': 'https://a.b/cb',
+ 'response_type': 'code',
+ 'client_id': 'abcdef',
+ 'scope': 'hello openid'
+ }
+ url = 'http://a.b/path?' + urlencode(params)
+ with self.assertRaises(InvalidRequestError):
+ self.endpoint.validate_authorization_request(url)
+
+ def test_oidc_params_preservation(self):
+ """
+ Test that the nonce parameter is passed through.
+ """
+ scopes, creds = self.endpoint.validate_authorization_request(self.url)
+
+ self.assertEqual(creds['prompt'], {'consent'})
+ self.assertEqual(creds['nonce'], 'abcd')
+ self.assertEqual(creds['display'], 'touch')
diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py
new file mode 100644
index 0000000..1bad120
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_authorization_code.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import json
+
+import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt
+
+from ....unittest import TestCase
+from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest
+
+
+def get_id_token_mock(token, token_handler, request):
+ return "MOCKED_TOKEN"
+
+
+class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDAuthCodeInterferenceTest, self).setUp()
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+
+class OpenIDAuthCodeTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'openid')
+ self.request.expires_in = 1800
+ self.request.client_id = 'abcdef'
+ self.request.code = '1234'
+ self.request.response_type = 'code'
+ self.request.grant_type = 'authorization_code'
+ self.request.redirect_uri = 'https://a.b/cb'
+ self.request.state = 'abc'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = self.set_client
+ self.mock_validator.get_id_token.side_effect = get_id_token_mock
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+ self.url_query = 'https://a.b/cb?code=abc&state=abc'
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc'
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_authorization(self, generate_token):
+
+ scope, info = self.auth.validate_authorization_request(self.request)
+
+ generate_token.return_value = 'abc'
+ bearer = BearerToken(self.mock_validator)
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_mode = 'fragment'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_no_prompt_authorization(self, generate_token):
+ generate_token.return_value = 'abc'
+ scope, info = self.auth.validate_authorization_request(self.request)
+ self.request.prompt = 'none'
+ self.assertRaises(OIDCNoPrompt,
+ self.auth.validate_authorization_request,
+ self.request)
+
+ # prompt == none requires id token hint
+ bearer = BearerToken(self.mock_validator)
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_mode = 'query'
+ self.request.id_token_hint = 'me@email.com'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ # Test alernative response modes
+ self.request.response_mode = 'fragment'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+
+ # Ensure silent authentication and authorization is done
+ self.mock_validator.validate_silent_login.return_value = False
+ self.mock_validator.validate_silent_authorization.return_value = True
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ self.mock_validator.validate_silent_login.return_value = True
+ self.mock_validator.validate_silent_authorization.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=consent_required', h['Location'])
+
+ # ID token hint must match logged in user
+ self.mock_validator.validate_silent_authorization.return_value = True
+ self.mock_validator.validate_user_match.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ def set_scopes(self, client_id, code, client, request):
+ request.scopes = self.request.scopes
+ request.state = self.request.state
+ request.user = 'bob'
+ return True
+
+ def test_create_token_response(self):
+ self.request.response_type = None
+ self.mock_validator.validate_code.side_effect = self.set_scopes
+
+ bearer = BearerToken(self.mock_validator)
+
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertIn('id_token', token)
+ self.assertIn('openid', token['scope'])
+
+ self.mock_validator.reset_mock()
+
+ self.request.scopes = ('hello', 'world')
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertNotIn('id_token', token)
+ self.assertNotIn('openid', token['scope'])
diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py
new file mode 100644
index 0000000..f90ec46
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_dispatchers.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+import mock
+
+from oauthlib.common import Request
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.dispatchers import (
+ ImplicitTokenGrantDispatcher,
+ AuthorizationTokenGrantDispatcher
+)
+
+from oauthlib.oauth2.rfc6749.grant_types import (
+ AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant,
+ ImplicitGrant as OAuth2ImplicitGrant,
+)
+
+
+from ....unittest import TestCase
+
+
+class ImplicitTokenGrantDispatcherTest(TestCase):
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ request_validator = mock.MagicMock()
+ implicit_grant = OAuth2ImplicitGrant(request_validator)
+ openid_connect_implicit = ImplicitGrant(request_validator)
+
+ self.dispatcher = ImplicitTokenGrantDispatcher(
+ default_implicit_grant=implicit_grant,
+ oidc_implicit_grant=openid_connect_implicit
+ )
+
+ def test_create_authorization_response_openid(self):
+ self.request.scopes = ('hello', 'openid')
+ self.request.response_type = 'id_token'
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_validate_authorization_request_openid(self):
+ self.request.scopes = ('hello', 'openid')
+ self.request.response_type = 'id_token'
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_create_authorization_response_oauth(self):
+ self.request.scopes = ('hello', 'world')
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_validate_authorization_request_oauth(self):
+ self.request.scopes = ('hello', 'world')
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+
+class DispatcherTest(TestCase):
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.decoded_body = (
+ ("client_id", "me"),
+ ("code", "code"),
+ ("redirect_url", "https://a.b/cb"),
+ )
+
+ self.request_validator = mock.MagicMock()
+ self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator)
+ self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator)
+
+
+class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOpenIdTest, self).setUp()
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_openid(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, AuthorizationCodeGrant))
+ self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called)
+
+
+class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp()
+ self.request.decoded_body = (
+ ("client_id", "me"),
+ ("code", ""),
+ ("redirect_url", "https://a.b/cb"),
+ )
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_openid_without_code(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant))
+ self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called)
+
+
+class AuthTokenGrantDispatcherOAuthTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOAuthTest, self).setUp()
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_oauth(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant))
+ self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called)
diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py
new file mode 100644
index 0000000..531ae7f
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_hybrid.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+
+from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest
+
+
+class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDHybridInterferenceTest, self).setUp()
+ self.auth = HybridGrant(request_validator=self.mock_validator)
diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py
new file mode 100644
index 0000000..56247d9
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_implicit.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.common import Request
+
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt
+
+from ....unittest import TestCase
+from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest
+
+from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest
+
+
+class OpenIDImplicitInterferenceTest(ImplicitGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDImplicitInterferenceTest, self).setUp()
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+
+class OpenIDImplicitTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'openid')
+ self.request.expires_in = 1800
+ self.request.client_id = 'abcdef'
+ self.request.response_type = 'id_token token'
+ self.request.redirect_uri = 'https://a.b/cb'
+ self.request.nonce = 'zxc'
+ self.request.state = 'abc'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.get_id_token.side_effect = get_id_token_mock
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_authorization(self, generate_token):
+ scope, info = self.auth.validate_authorization_request(self.request)
+
+ generate_token.return_value = 'abc'
+ bearer = BearerToken(self.mock_validator)
+
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_type = 'id_token'
+ token = 'MOCKED_TOKEN'
+ url = 'https://a.b/cb#state=abc&id_token=%s' % token
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], url, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.nonce = None
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_no_prompt_authorization(self, generate_token):
+ generate_token.return_value = 'abc'
+ scope, info = self.auth.validate_authorization_request(self.request)
+ self.request.prompt = 'none'
+ self.assertRaises(OIDCNoPrompt,
+ self.auth.validate_authorization_request,
+ self.request)
+
+ # prompt == none requires id token hint
+ bearer = BearerToken(self.mock_validator)
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.id_token_hint = 'me@email.com'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ # Test alernative response modes
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+
+ # Ensure silent authentication and authorization is done
+ self.mock_validator.validate_silent_login.return_value = False
+ self.mock_validator.validate_silent_authorization.return_value = True
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ self.mock_validator.validate_silent_login.return_value = True
+ self.mock_validator.validate_silent_authorization.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=consent_required', h['Location'])
+
+ # ID token hint must match logged in user
+ self.mock_validator.validate_silent_authorization.return_value = True
+ self.mock_validator.validate_user_match.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+
+class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeTokenTest, self).setUp()
+ self.request.response_type = 'code token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
+
+
+class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeIdTokenTest, self).setUp()
+ self.request.response_type = 'code id_token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token
+
+
+class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeIdTokenTokenTest, self).setUp()
+ self.request.response_type = 'code id_token token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py
new file mode 100644
index 0000000..14a7c23
--- /dev/null
+++ b/tests/openid/connect/core/test_request_validator.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from oauthlib.openid.connect.core.request_validator import RequestValidator
+
+from ....unittest import TestCase
+
+
+class RequestValidatorTest(TestCase):
+
+ def test_method_contracts(self):
+ v = RequestValidator()
+ self.assertRaises(
+ NotImplementedError,
+ v.get_authorization_code_scopes,
+ 'client_id', 'code', 'redirect_uri', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.get_jwt_bearer_token,
+ 'token', 'token_handler', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.get_id_token,
+ 'token', 'token_handler', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_jwt_bearer_token,
+ 'token', 'scopes', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_id_token,
+ 'token', 'scopes', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_silent_authorization,
+ 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_silent_login,
+ 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_user_match,
+ 'id_token_hint', 'scopes', 'claims', 'request'
+ )
diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py
new file mode 100644
index 0000000..83290db
--- /dev/null
+++ b/tests/openid/connect/core/test_server.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import json
+
+import mock
+
+from oauthlib.oauth2.rfc6749 import errors
+from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint
+from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+
+from ....unittest import TestCase
+
+
+class AuthorizationEndpointTest(TestCase):
+
+ def setUp(self):
+ self.mock_validator = mock.MagicMock()
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator)
+ auth_code.save_authorization_code = mock.MagicMock()
+ implicit = ImplicitGrant(
+ request_validator=self.mock_validator)
+ implicit.save_token = mock.MagicMock()
+ hybrid = HybridGrant(self.mock_validator)
+
+ response_types = {
+ 'code': auth_code,
+ 'token': implicit,
+ 'id_token': implicit,
+ 'id_token token': implicit,
+ 'code token': hybrid,
+ 'code id_token': hybrid,
+ 'code token id_token': hybrid,
+ 'none': auth_code
+ }
+ self.expires_in = 1800
+ token = BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = AuthorizationEndpoint(
+ default_response_type='code',
+ default_token_type=token,
+ response_types=response_types
+ )
+
+ # TODO: Add hybrid grant test
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_implicit_grant(self):
+ uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
+
+ def test_none_grant(self):
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
+ self.assertEqual(body, None)
+ self.assertEqual(status_code, 302)
+
+ # and without the state parameter
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True)
+ self.assertEqual(body, None)
+ self.assertEqual(status_code, 302)
+
+ def test_missing_type(self):
+ uri = 'http://i.b/l?client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.InvalidRequestError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
+
+ def test_invalid_type(self):
+ uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.UnsupportedResponseTypeError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
+
+
+class TokenEndpointTest(TestCase):
+
+ def setUp(self):
+ def set_user(request):
+ request.user = mock.MagicMock()
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked_client_id'
+ return True
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = set_user
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(
+ request_validator=self.mock_validator)
+ supported_types = {
+ 'authorization_code': auth_code,
+ }
+ self.expires_in = 1800
+ token = BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = TokenEndpoint(
+ 'authorization_code',
+ default_token_type=token,
+ grant_types=supported_types
+ )
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'scope': 'all of them',
+ 'state': 'xyz'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ body = 'grant_type=authorization_code&code=abc&state=xyz'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'state': 'xyz'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ def test_missing_type(self):
+ _, body, _ = self.endpoint.create_token_response('', body='')
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+ def test_invalid_type(self):
+ body = 'grant_type=invalid'
+ _, body, _ = self.endpoint.create_token_response('', body=body)
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py
new file mode 100644
index 0000000..12c75f1
--- /dev/null
+++ b/tests/openid/connect/core/test_tokens.py
@@ -0,0 +1,133 @@
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.openid.connect.core.tokens import JWTToken
+
+from ....unittest import TestCase
+
+
+class JWTTokenTestCase(TestCase):
+
+ def test_create_token_callable_expires_in(self):
+ """
+ Test retrieval of the expires in value by calling the callable expires_in property
+ """
+
+ expires_in_mock = mock.MagicMock()
+ request_mock = mock.MagicMock()
+
+ token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock())
+ token.create_token(request=request_mock)
+
+ expires_in_mock.assert_called_once_with(request_mock)
+
+ def test_create_token_non_callable_expires_in(self):
+ """
+ When a non callable expires in is set this should just be set to the request
+ """
+
+ expires_in_mock = mock.NonCallableMagicMock()
+ request_mock = mock.MagicMock()
+
+ token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock())
+ token.create_token(request=request_mock)
+
+ self.assertFalse(expires_in_mock.called)
+ self.assertEqual(request_mock.expires_in, expires_in_mock)
+
+ def test_create_token_calls_get_id_token(self):
+ """
+ When create_token is called the call should be forwarded to the get_id_token on the token validator
+ """
+ request_mock = mock.MagicMock()
+
+ with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+
+ request_validator = RequestValidatorMock()
+
+ token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator)
+ token.create_token(request=request_mock)
+
+ request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock)
+
+ def test_validate_request_token_from_headers(self):
+ """
+ Bearer token get retrieved from headers.
+ """
+
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \
+ mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+ request_validator_mock = RequestValidatorMock()
+
+ token = JWTToken(request_validator=request_validator_mock)
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.scopes = mock.MagicMock()
+ request.headers = {
+ 'Authorization': 'Bearer some-token-from-header'
+ }
+
+ token.validate_request(request=request)
+
+ request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header',
+ request.scopes,
+ request)
+
+ def test_validate_token_from_request(self):
+ """
+ Token get retrieved from request object.
+ """
+
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \
+ mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+ request_validator_mock = RequestValidatorMock()
+
+ token = JWTToken(request_validator=request_validator_mock)
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.scopes = mock.MagicMock()
+ request.access_token = 'some-token-from-request-object'
+ request.headers = {}
+
+ token.validate_request(request=request)
+
+ request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object',
+ request.scopes,
+ request)
+
+ def test_estimate_type(self):
+ """
+ Estimate type results for a jwt token
+ """
+
+ def test_token(token, expected_result):
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock:
+ jwt_token = JWTToken()
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.headers = {
+ 'Authorization': 'Bearer {}'.format(token)
+ }
+
+ result = jwt_token.estimate_type(request=request)
+
+ self.assertEqual(result, expected_result)
+
+ test_items = (
+ ('eyfoo.foo.foo', 10),
+ ('eyfoo.foo.foo.foo.foo', 10),
+ ('eyfoobar', 0)
+ )
+
+ for token, expected_result in test_items:
+ test_token(token, expected_result)