summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJonathan Huot <jonathan.huot@thomsonreuters.com>2018-08-02 10:29:22 +0200
committerJonathan Huot <jonathan.huot@thomsonreuters.com>2018-08-02 10:29:22 +0200
commit1d07167210297cd9691e5397f09477fea5df5279 (patch)
treef7e9b97dc966161b11c21945b58922f50441f187 /tests
parentc9ead44e9c3bef100a6434ffbe56a002d54f0475 (diff)
parentfbacd77b602e4c60f8da2413c150fa7f20b2f83c (diff)
downloadoauthlib-431-customerrors.tar.gz
Merge branch 'master' into 431-customerrors431-customerrors
Diffstat (limited to 'tests')
-rw-r--r--tests/oauth1/rfc5849/test_client.py40
-rw-r--r--tests/oauth2/rfc6749/clients/test_base.py72
-rw-r--r--tests/oauth2/rfc6749/clients/test_service_application.py70
-rw-r--r--tests/oauth2/rfc6749/clients/test_web_application.py2
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py132
-rw-r--r--tests/oauth2/rfc6749/endpoints/test_scope_handling.py2
-rw-r--r--tests/oauth2/rfc6749/grant_types/test_openid_connect.py291
-rw-r--r--tests/oauth2/rfc6749/test_parameters.py11
-rw-r--r--tests/oauth2/rfc6749/test_server.py107
-rw-r--r--tests/oauth2/rfc6749/test_tokens.py20
-rw-r--r--tests/openid/__init__.py0
-rw-r--r--tests/openid/connect/__init__.py0
-rw-r--r--tests/openid/connect/core/__init__.py0
-rw-r--r--tests/openid/connect/core/endpoints/test_claims_handling.py (renamed from tests/oauth2/rfc6749/endpoints/test_claims_handling.py)14
-rw-r--r--tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py (renamed from tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py)0
-rw-r--r--tests/openid/connect/core/grant_types/test_authorization_code.py153
-rw-r--r--tests/openid/connect/core/grant_types/test_dispatchers.py125
-rw-r--r--tests/openid/connect/core/grant_types/test_hybrid.py13
-rw-r--r--tests/openid/connect/core/grant_types/test_implicit.py148
-rw-r--r--tests/openid/connect/core/test_request_validator.py52
-rw-r--r--tests/openid/connect/core/test_server.py178
-rw-r--r--tests/openid/connect/core/test_tokens.py133
22 files changed, 1191 insertions, 372 deletions
diff --git a/tests/oauth1/rfc5849/test_client.py b/tests/oauth1/rfc5849/test_client.py
index dcb4c3d..777efc2 100644
--- a/tests/oauth1/rfc5849/test_client.py
+++ b/tests/oauth1/rfc5849/test_client.py
@@ -2,7 +2,8 @@
from __future__ import absolute_import, unicode_literals
from oauthlib.common import Request
-from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_RSA,
+from oauthlib.oauth1 import (SIGNATURE_PLAINTEXT, SIGNATURE_HMAC_SHA1,
+ SIGNATURE_HMAC_SHA256, SIGNATURE_RSA,
SIGNATURE_TYPE_BODY, SIGNATURE_TYPE_QUERY)
from oauthlib.oauth1.rfc5849 import Client, bytes_type
@@ -62,13 +63,48 @@ class ClientConstructorTests(TestCase):
self.assertIsInstance(k, bytes_type)
self.assertIsInstance(v, bytes_type)
+ def test_hmac_sha1(self):
+ client = Client('client_key')
+ # instance is using the correct signer method
+ self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA1],
+ client.SIGNATURE_METHODS[client.signature_method])
+
+ def test_hmac_sha256(self):
+ client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256)
+ # instance is using the correct signer method
+ self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_HMAC_SHA256],
+ client.SIGNATURE_METHODS[client.signature_method])
+
def test_rsa(self):
client = Client('client_key', signature_method=SIGNATURE_RSA)
- self.assertIsNone(client.rsa_key) # don't need an RSA key to instantiate
+ # instance is using the correct signer method
+ self.assertEqual(Client.SIGNATURE_METHODS[SIGNATURE_RSA],
+ client.SIGNATURE_METHODS[client.signature_method])
+ # don't need an RSA key to instantiate
+ self.assertIsNone(client.rsa_key)
class SignatureMethodTest(TestCase):
+ def test_hmac_sha1_method(self):
+ client = Client('client_key', timestamp='1234567890', nonce='abc')
+ u, h, b = client.sign('http://example.com')
+ correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", '
+ 'oauth_version="1.0", oauth_signature_method="HMAC-SHA1", '
+ 'oauth_consumer_key="client_key", '
+ 'oauth_signature="hH5BWYVqo7QI4EmPBUUe9owRUUQ%3D"')
+ self.assertEqual(h['Authorization'], correct)
+
+ def test_hmac_sha256_method(self):
+ client = Client('client_key', signature_method=SIGNATURE_HMAC_SHA256,
+ timestamp='1234567890', nonce='abc')
+ u, h, b = client.sign('http://example.com')
+ correct = ('OAuth oauth_nonce="abc", oauth_timestamp="1234567890", '
+ 'oauth_version="1.0", oauth_signature_method="HMAC-SHA256", '
+ 'oauth_consumer_key="client_key", '
+ 'oauth_signature="JzgJWBxX664OiMW3WE4MEjtYwOjI%2FpaUWHqtdHe68Es%3D"')
+ self.assertEqual(h['Authorization'], correct)
+
def test_rsa_method(self):
private_key = (
"-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDk1/bxy"
diff --git a/tests/oauth2/rfc6749/clients/test_base.py b/tests/oauth2/rfc6749/clients/test_base.py
index c788bc1..d48a944 100644
--- a/tests/oauth2/rfc6749/clients/test_base.py
+++ b/tests/oauth2/rfc6749/clients/test_base.py
@@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals
import datetime
from oauthlib import common
-from oauthlib.oauth2 import Client, InsecureTransportError
+from oauthlib.oauth2 import Client, InsecureTransportError, TokenExpiredError
from oauthlib.oauth2.rfc6749 import utils
from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY
@@ -51,10 +51,26 @@ class ClientTest(TestCase):
self.assertFormBodyEqual(body, self.body)
self.assertEqual(headers, self.bearer_header)
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ client = Client(self.client_id, access_token=self.access_token, token_type="Bearer")
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers)
+
# Missing access token
client = Client(self.client_id)
self.assertRaises(ValueError, client.add_token, self.uri)
+ # Expired token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, access_token=self.access_token, token_type="Bearer")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body, headers=self.headers)
+
# The default token placement, bearer in auth header
client = Client(self.client_id, access_token=self.access_token)
uri, headers, body = client.add_token(self.uri, body=self.body,
@@ -150,8 +166,26 @@ class ClientTest(TestCase):
self.assertEqual(uri, self.uri)
self.assertEqual(body, self.body)
self.assertEqual(headers, self.mac_00_header)
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers,
+ issue_time=datetime.datetime.now())
+ # Expired Token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body,
+ headers=self.headers,
+ issue_time=datetime.datetime.now())
- # Add the Authorization header (draft 00)
+ # Add the Authorization header (draft 01)
client = Client(self.client_id, token_type="MAC",
access_token=self.access_token, mac_key=self.mac_key,
mac_algorithm="hmac-sha-1")
@@ -160,7 +194,24 @@ class ClientTest(TestCase):
self.assertEqual(uri, self.uri)
self.assertEqual(body, self.body)
self.assertEqual(headers, self.mac_01_header)
-
+ # Non-HTTPS
+ insecure_uri = 'http://example.com/path?query=world'
+ self.assertRaises(InsecureTransportError, client.add_token, insecure_uri,
+ body=self.body,
+ headers=self.headers,
+ draft=1)
+ # Expired Token
+ expired = 523549800
+ expired_token = {
+ 'expires_at': expired,
+ }
+ client = Client(self.client_id, token=expired_token, token_type="MAC",
+ access_token=self.access_token, mac_key=self.mac_key,
+ mac_algorithm="hmac-sha-1")
+ self.assertRaises(TokenExpiredError, client.add_token, self.uri,
+ body=self.body,
+ headers=self.headers,
+ draft=1)
def test_revocation_request(self):
client = Client(self.client_id)
@@ -208,6 +259,21 @@ class ClientTest(TestCase):
# NotImplementedError
self.assertRaises(NotImplementedError, client.prepare_authorization_request, auth_url)
+ def test_prepare_token_request(self):
+ redirect_url = 'https://example.com/callback/'
+ scopes = 'read'
+ token_url = 'https://example.com/token/'
+ state = 'fake_state'
+
+ client = Client(self.client_id, scope=scopes, state=state)
+
+ # Non-HTTPS
+ self.assertRaises(InsecureTransportError,
+ client.prepare_token_request, 'http://example.com/token/')
+
+ # NotImplementedError
+ self.assertRaises(NotImplementedError, client.prepare_token_request, token_url)
+
def test_prepare_refresh_token_request(self):
client = Client(self.client_id)
diff --git a/tests/oauth2/rfc6749/clients/test_service_application.py b/tests/oauth2/rfc6749/clients/test_service_application.py
index 2dc633a..dc337cf 100644
--- a/tests/oauth2/rfc6749/clients/test_service_application.py
+++ b/tests/oauth2/rfc6749/clients/test_service_application.py
@@ -89,8 +89,8 @@ mfvGGg3xNjTMO7IdrwIDAQAB
audience=self.audience,
body=self.body)
r = Request('https://a.b', body=body)
- self.assertEqual(r.isnot, 'empty')
- self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
@@ -98,6 +98,72 @@ mfvGGg3xNjTMO7IdrwIDAQAB
# audience verification is handled during decode now
self.assertEqual(claim['sub'], self.subject)
self.assertEqual(claim['iat'], int(t.return_value))
+ self.assertNotIn('nbf', claim)
+ self.assertNotIn('jti', claim)
+
+ # Missing issuer parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=None, subject=self.subject, audience=self.audience, body=self.body)
+
+ # Missing subject parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=None, audience=self.audience, body=self.body)
+
+ # Missing audience parameter
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=self.subject, audience=None, body=self.body)
+
+ # Optional kwargs
+ not_before = time() - 3600
+ jwt_id = '8zd15df4s35f43sd'
+ body = client.prepare_request_body(issuer=self.issuer,
+ subject=self.subject,
+ audience=self.audience,
+ body=self.body,
+ not_before=not_before,
+ jwt_id=jwt_id)
+
+ r = Request('https://a.b', body=body)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+
+ claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
+
+ self.assertEqual(claim['iss'], self.issuer)
+ # audience verification is handled during decode now
+ self.assertEqual(claim['sub'], self.subject)
+ self.assertEqual(claim['iat'], int(t.return_value))
+ self.assertEqual(claim['nbf'], not_before)
+ self.assertEqual(claim['jti'], jwt_id)
+
+ @patch('time.time')
+ def test_request_body_no_initial_private_key(self, t):
+ t.return_value = time()
+ self.token['expires_at'] = self.token['expires_in'] + t.return_value
+
+ client = ServiceApplicationClient(
+ self.client_id, private_key=None)
+
+ # Basic with private key provided
+ body = client.prepare_request_body(issuer=self.issuer,
+ subject=self.subject,
+ audience=self.audience,
+ body=self.body,
+ private_key=self.private_key)
+ r = Request('https://a.b', body=body)
+ self.assertEqual(r.isnot, 'empty')
+ self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
+
+ claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
+
+ self.assertEqual(claim['iss'], self.issuer)
+ # audience verification is handled during decode now
+ self.assertEqual(claim['sub'], self.subject)
+ self.assertEqual(claim['iat'], int(t.return_value))
+
+ # No private key provided
+ self.assertRaises(ValueError, client.prepare_request_body,
+ issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)
@patch('time.time')
def test_parse_token_response(self, t):
diff --git a/tests/oauth2/rfc6749/clients/test_web_application.py b/tests/oauth2/rfc6749/clients/test_web_application.py
index fa6643e..4ecc3b3 100644
--- a/tests/oauth2/rfc6749/clients/test_web_application.py
+++ b/tests/oauth2/rfc6749/clients/test_web_application.py
@@ -38,7 +38,7 @@ class WebApplicationClientTest(TestCase):
code = "zzzzaaaa"
body = "not=empty"
- body_code = "not=empty&grant_type=authorization_code&code=%s&client_id=%s" % (code, client_id)
+ body_code = "not=empty&grant_type=authorization_code&code=%s" % code
body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
body_kwargs = body_code + "&some=providers&require=extra+arguments"
diff --git a/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py
new file mode 100644
index 0000000..7ec8190
--- /dev/null
+++ b/tests/oauth2/rfc6749/endpoints/test_introspect_endpoint.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from json import loads
+
+from mock import MagicMock
+
+from oauthlib.common import urlencode
+from oauthlib.oauth2 import RequestValidator, IntrospectEndpoint
+
+from ....unittest import TestCase
+
+
+class IntrospectEndpointTest(TestCase):
+
+ def setUp(self):
+ self.validator = MagicMock(wraps=RequestValidator())
+ self.validator.client_authentication_required.return_value = True
+ self.validator.authenticate_client.return_value = True
+ self.validator.validate_bearer_token.return_value = True
+ self.validator.introspect_token.return_value = {}
+ self.endpoint = IntrospectEndpoint(self.validator)
+
+ self.uri = 'should_not_matter'
+ self.headers = {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ }
+ self.resp_h = {
+ 'Cache-Control': 'no-store',
+ 'Content-Type': 'application/json',
+ 'Pragma': 'no-cache'
+ }
+ self.resp_b = {
+ "active": True
+ }
+
+ def test_introspect_token(self):
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_nohint(self):
+ # don't specify token_type_hint
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_false(self):
+ self.validator.introspect_token.return_value = None
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": False})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_claims(self):
+ self.validator.introspect_token.return_value = {"foo": "bar"}
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": True, "foo": "bar"})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_claims_spoof_active(self):
+ self.validator.introspect_token.return_value = {"foo": "bar", "active": False}
+ body = urlencode([('token', 'foo')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), {"active": True, "foo": "bar"})
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_client_authentication_failed(self):
+ self.validator.authenticate_client.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+ def test_introspect_token_public_client_authentication(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = True
+ for token_type in ('access_token', 'refresh_token', 'invalid'):
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', token_type)])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, self.resp_h)
+ self.assertEqual(loads(b), self.resp_b)
+ self.assertEqual(s, 200)
+
+ def test_introspect_token_public_client_authentication_failed(self):
+ self.validator.client_authentication_required.return_value = False
+ self.validator.authenticate_client_id.return_value = False
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'access_token')])
+ h, b, s = self.endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(loads(b)['error'], 'invalid_client')
+ self.assertEqual(s, 401)
+
+
+ def test_introspect_unsupported_token(self):
+ endpoint = IntrospectEndpoint(self.validator,
+ supported_token_types=['access_token'])
+ body = urlencode([('token', 'foo'),
+ ('token_type_hint', 'refresh_token')])
+ h, b, s = endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body=body)
+ self.assertEqual(h, {})
+ self.assertEqual(loads(b)['error'], 'unsupported_token_type')
+ self.assertEqual(s, 400)
+
+ h, b, s = endpoint.create_introspect_response(self.uri,
+ headers=self.headers, body='')
+ self.assertEqual(h, {})
+ self.assertEqual(loads(b)['error'], 'invalid_request')
+ self.assertEqual(s, 400)
diff --git a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
index 87781b3..8490c03 100644
--- a/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
+++ b/tests/oauth2/rfc6749/endpoints/test_scope_handling.py
@@ -87,7 +87,7 @@ class TestScopeHandling(TestCase):
self.assertIn('Location', h)
code = get_query_credentials(h['Location'])['code'][0]
_, body, _ = getattr(self, backend_server_type).create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
+ body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code)
self.assertEqual(json.loads(body)['scope'], decoded_scope)
# implicit grant
diff --git a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py b/tests/oauth2/rfc6749/grant_types/test_openid_connect.py
deleted file mode 100644
index f10d36c..0000000
--- a/tests/oauth2/rfc6749/grant_types/test_openid_connect.py
+++ /dev/null
@@ -1,291 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import, unicode_literals
-
-import json
-
-import mock
-
-from oauthlib.common import Request
-from oauthlib.oauth2.rfc6749.grant_types import (OIDCNoPrompt,
- OpenIDConnectAuthCode,
- OpenIDConnectHybrid,
- OpenIDConnectImplicit)
-from oauthlib.oauth2.rfc6749.tokens import BearerToken
-
-from ....unittest import TestCase
-from .test_authorization_code import AuthorizationCodeGrantTest
-from .test_implicit import ImplicitGrantTest
-
-
-class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest):
- """Test that OpenID don't interfere with normal OAuth 2 flows."""
-
- def setUp(self):
- super(OpenIDAuthCodeInterferenceTest, self).setUp()
- self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator)
-
-class OpenIDImplicitInterferenceTest(ImplicitGrantTest):
- """Test that OpenID don't interfere with normal OAuth 2 flows."""
-
- def setUp(self):
- super(OpenIDImplicitInterferenceTest, self).setUp()
- self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator)
-
-
-class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest):
- """Test that OpenID don't interfere with normal OAuth 2 flows."""
-
- def setUp(self):
- super(OpenIDHybridInterferenceTest, self).setUp()
- self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator)
-
-
-def get_id_token_mock(token, token_handler, request):
- return "MOCKED_TOKEN"
-
-
-class OpenIDAuthCodeTest(TestCase):
-
- def setUp(self):
- self.request = Request('http://a.b/path')
- self.request.scopes = ('hello', 'openid')
- self.request.expires_in = 1800
- self.request.client_id = 'abcdef'
- self.request.code = '1234'
- self.request.response_type = 'code'
- self.request.grant_type = 'authorization_code'
- self.request.redirect_uri = 'https://a.b/cb'
- self.request.state = 'abc'
-
- self.mock_validator = mock.MagicMock()
- self.mock_validator.authenticate_client.side_effect = self.set_client
- self.mock_validator.get_id_token.side_effect = get_id_token_mock
- self.auth = OpenIDConnectAuthCode(request_validator=self.mock_validator)
-
- self.url_query = 'https://a.b/cb?code=abc&state=abc'
- self.url_fragment = 'https://a.b/cb#code=abc&state=abc'
-
- def set_client(self, request):
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked'
- return True
-
- @mock.patch('oauthlib.common.generate_token')
- def test_authorization(self, generate_token):
-
- scope, info = self.auth.validate_authorization_request(self.request)
-
- generate_token.return_value = 'abc'
- bearer = BearerToken(self.mock_validator)
- self.request.response_mode = 'query'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_query)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- self.request.response_mode = 'fragment'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- @mock.patch('oauthlib.common.generate_token')
- def test_no_prompt_authorization(self, generate_token):
- generate_token.return_value = 'abc'
- scope, info = self.auth.validate_authorization_request(self.request)
- self.request.prompt = 'none'
- self.assertRaises(OIDCNoPrompt,
- self.auth.validate_authorization_request,
- self.request)
-
- # prompt == none requires id token hint
- bearer = BearerToken(self.mock_validator)
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=invalid_request', h['Location'])
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- self.request.response_mode = 'query'
- self.request.id_token_hint = 'me@email.com'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_query)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- # Test alernative response modes
- self.request.response_mode = 'fragment'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
-
- # Ensure silent authentication and authorization is done
- self.mock_validator.validate_silent_login.return_value = False
- self.mock_validator.validate_silent_authorization.return_value = True
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=login_required', h['Location'])
-
- self.mock_validator.validate_silent_login.return_value = True
- self.mock_validator.validate_silent_authorization.return_value = False
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=consent_required', h['Location'])
-
- # ID token hint must match logged in user
- self.mock_validator.validate_silent_authorization.return_value = True
- self.mock_validator.validate_user_match.return_value = False
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=login_required', h['Location'])
-
- def set_scopes(self, client_id, code, client, request):
- request.scopes = self.request.scopes
- request.state = self.request.state
- request.user = 'bob'
- return True
-
- def test_create_token_response(self):
- self.request.response_type = None
- self.mock_validator.validate_code.side_effect = self.set_scopes
-
- bearer = BearerToken(self.mock_validator)
-
- h, token, s = self.auth.create_token_response(self.request, bearer)
- token = json.loads(token)
- self.assertEqual(self.mock_validator.save_token.call_count, 1)
- self.assertIn('access_token', token)
- self.assertIn('refresh_token', token)
- self.assertIn('expires_in', token)
- self.assertIn('scope', token)
- self.assertIn('id_token', token)
- self.assertIn('openid', token['scope'])
-
- self.mock_validator.reset_mock()
-
- self.request.scopes = ('hello', 'world')
- h, token, s = self.auth.create_token_response(self.request, bearer)
- token = json.loads(token)
- self.assertEqual(self.mock_validator.save_token.call_count, 1)
- self.assertIn('access_token', token)
- self.assertIn('refresh_token', token)
- self.assertIn('expires_in', token)
- self.assertIn('scope', token)
- self.assertNotIn('id_token', token)
- self.assertNotIn('openid', token['scope'])
-
-
-class OpenIDImplicitTest(TestCase):
-
- def setUp(self):
- self.request = Request('http://a.b/path')
- self.request.scopes = ('hello', 'openid')
- self.request.expires_in = 1800
- self.request.client_id = 'abcdef'
- self.request.response_type = 'id_token token'
- self.request.redirect_uri = 'https://a.b/cb'
- self.request.nonce = 'zxc'
- self.request.state = 'abc'
-
- self.mock_validator = mock.MagicMock()
- self.mock_validator.get_id_token.side_effect = get_id_token_mock
- self.auth = OpenIDConnectImplicit(request_validator=self.mock_validator)
-
- token = 'MOCKED_TOKEN'
- self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
- self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
-
- @mock.patch('oauthlib.common.generate_token')
- def test_authorization(self, generate_token):
- scope, info = self.auth.validate_authorization_request(self.request)
-
- generate_token.return_value = 'abc'
- bearer = BearerToken(self.mock_validator)
-
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- self.request.response_type = 'id_token'
- token = 'MOCKED_TOKEN'
- url = 'https://a.b/cb#state=abc&id_token=%s' % token
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], url, parse_fragment=True)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- self.request.nonce = None
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=invalid_request', h['Location'])
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- @mock.patch('oauthlib.common.generate_token')
- def test_no_prompt_authorization(self, generate_token):
- generate_token.return_value = 'abc'
- scope, info = self.auth.validate_authorization_request(self.request)
- self.request.prompt = 'none'
- self.assertRaises(OIDCNoPrompt,
- self.auth.validate_authorization_request,
- self.request)
-
- # prompt == none requires id token hint
- bearer = BearerToken(self.mock_validator)
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=invalid_request', h['Location'])
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- self.request.id_token_hint = 'me@email.com'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
- self.assertEqual(b, None)
- self.assertEqual(s, 302)
-
- # Test alernative response modes
- self.request.response_mode = 'query'
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertURLEqual(h['Location'], self.url_query)
-
- # Ensure silent authentication and authorization is done
- self.mock_validator.validate_silent_login.return_value = False
- self.mock_validator.validate_silent_authorization.return_value = True
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=login_required', h['Location'])
-
- self.mock_validator.validate_silent_login.return_value = True
- self.mock_validator.validate_silent_authorization.return_value = False
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=consent_required', h['Location'])
-
- # ID token hint must match logged in user
- self.mock_validator.validate_silent_authorization.return_value = True
- self.mock_validator.validate_user_match.return_value = False
- h, b, s = self.auth.create_authorization_response(self.request, bearer)
- self.assertIn('error=login_required', h['Location'])
-
-
-class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest):
-
- def setUp(self):
- super(OpenIDHybridCodeTokenTest, self).setUp()
- self.request.response_type = 'code token'
- self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator)
- self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
- self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
-
-class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest):
-
- def setUp(self):
- super(OpenIDHybridCodeIdTokenTest, self).setUp()
- self.request.response_type = 'code id_token'
- self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator)
- token = 'MOCKED_TOKEN'
- self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token
- self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token
-
-class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest):
-
- def setUp(self):
- super(OpenIDHybridCodeIdTokenTokenTest, self).setUp()
- self.request.response_type = 'code id_token token'
- self.auth = OpenIDConnectHybrid(request_validator=self.mock_validator)
- token = 'MOCKED_TOKEN'
- self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
- self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
diff --git a/tests/oauth2/rfc6749/test_parameters.py b/tests/oauth2/rfc6749/test_parameters.py
index 1cac879..2a11d33 100644
--- a/tests/oauth2/rfc6749/test_parameters.py
+++ b/tests/oauth2/rfc6749/test_parameters.py
@@ -116,13 +116,6 @@ class ParameterTests(TestCase):
' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
' "example_parameter": "example_value" }')
- json_expires = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
- ' "token_type": "example",'
- ' "expires": 3600,'
- ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
- ' "example_parameter": "example_value",'
- ' "scope":"abc def"}')
-
json_dict = {
'access_token': '2YotnFZFEjr1zCsicMWpAA',
'token_type': 'example',
@@ -268,7 +261,3 @@ class ParameterTests(TestCase):
finally:
signals.scope_changed.disconnect(record_scope_change)
del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
-
- def test_token_response_with_expires(self):
- """Verify fallback for alternate spelling of expires_in. """
- self.assertEqual(parse_token_response(self.json_expires), self.json_dict)
diff --git a/tests/oauth2/rfc6749/test_server.py b/tests/oauth2/rfc6749/test_server.py
index 305b795..bc7a2b7 100644
--- a/tests/oauth2/rfc6749/test_server.py
+++ b/tests/oauth2/rfc6749/test_server.py
@@ -3,21 +3,17 @@ from __future__ import absolute_import, unicode_literals
import json
-import jwt
import mock
from oauthlib import common
from oauthlib.oauth2.rfc6749 import errors, tokens
from oauthlib.oauth2.rfc6749.endpoints import Server
-from oauthlib.oauth2.rfc6749.endpoints.authorization import \
- AuthorizationEndpoint
+from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint
from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint
from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
from oauthlib.oauth2.rfc6749.grant_types import (AuthorizationCodeGrant,
ClientCredentialsGrant,
ImplicitGrant,
- OpenIDConnectAuthCode,
- OpenIDConnectImplicit,
ResourceOwnerPasswordCredentialsGrant)
from ...unittest import TestCase
@@ -29,40 +25,34 @@ class AuthorizationEndpointTest(TestCase):
self.mock_validator = mock.MagicMock()
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
auth_code = AuthorizationCodeGrant(
- request_validator=self.mock_validator)
+ request_validator=self.mock_validator)
auth_code.save_authorization_code = mock.MagicMock()
implicit = ImplicitGrant(
- request_validator=self.mock_validator)
+ request_validator=self.mock_validator)
implicit.save_token = mock.MagicMock()
- openid_connect_auth = OpenIDConnectAuthCode(self.mock_validator)
- openid_connect_implicit = OpenIDConnectImplicit(self.mock_validator)
-
response_types = {
- 'code': auth_code,
- 'token': implicit,
-
- 'id_token': openid_connect_implicit,
- 'id_token token': openid_connect_implicit,
- 'code token': openid_connect_auth,
- 'code id_token': openid_connect_auth,
- 'code token id_token': openid_connect_auth,
- 'none': auth_code
+ 'code': auth_code,
+ 'token': implicit,
+ 'none': auth_code
}
self.expires_in = 1800
- token = tokens.BearerToken(self.mock_validator,
- expires_in=self.expires_in)
+ token = tokens.BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
self.endpoint = AuthorizationEndpoint(
- default_response_type='code',
- default_token_type=token,
- response_types=response_types)
+ default_response_type='code',
+ default_token_type=token,
+ response_types=response_types
+ )
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
+ uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
@@ -71,7 +61,7 @@ class AuthorizationEndpointTest(TestCase):
uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
+ uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
@@ -79,7 +69,7 @@ class AuthorizationEndpointTest(TestCase):
uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
+ uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
self.assertEqual(body, None)
@@ -99,9 +89,9 @@ class AuthorizationEndpointTest(TestCase):
uri = 'http://i.b/l?client_id=me&scope=all+of+them'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
self.mock_validator.validate_request = mock.MagicMock(
- side_effect=errors.InvalidRequestError())
+ side_effect=errors.InvalidRequestError())
headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
+ uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
@@ -109,9 +99,9 @@ class AuthorizationEndpointTest(TestCase):
uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
self.mock_validator.validate_request = mock.MagicMock(
- side_effect=errors.UnsupportedResponseTypeError())
+ side_effect=errors.UnsupportedResponseTypeError())
headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
+ uri, scopes=['all', 'of', 'them'])
self.assertIn('Location', headers)
self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
@@ -129,27 +119,32 @@ class TokenEndpointTest(TestCase):
self.mock_validator.authenticate_client.side_effect = set_user
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
auth_code = AuthorizationCodeGrant(
- request_validator=self.mock_validator)
+ request_validator=self.mock_validator)
password = ResourceOwnerPasswordCredentialsGrant(
- request_validator=self.mock_validator)
+ request_validator=self.mock_validator)
client = ClientCredentialsGrant(
- request_validator=self.mock_validator)
+ request_validator=self.mock_validator)
supported_types = {
- 'authorization_code': auth_code,
- 'password': password,
- 'client_credentials': client,
+ 'authorization_code': auth_code,
+ 'password': password,
+ 'client_credentials': client,
}
self.expires_in = 1800
- token = tokens.BearerToken(self.mock_validator,
- expires_in=self.expires_in)
- self.endpoint = TokenEndpoint('authorization_code',
- default_token_type=token, grant_types=supported_types)
+ token = tokens.BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = TokenEndpoint(
+ 'authorization_code',
+ default_token_type=token,
+ grant_types=supported_types
+ )
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
@@ -176,7 +171,7 @@ class TokenEndpointTest(TestCase):
def test_password_grant(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
@@ -190,7 +185,7 @@ class TokenEndpointTest(TestCase):
def test_client_grant(self):
body = 'grant_type=client_credentials&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
token = {
'token_type': 'Bearer',
'expires_in': self.expires_in,
@@ -279,9 +274,9 @@ twIDAQAB
@mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
def test_authorization_grant(self):
- body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz'
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
@@ -293,9 +288,9 @@ twIDAQAB
}
self.assertEqual(body, token)
- body = 'grant_type=authorization_code&code=abc&state=xyz'
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=xyz'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
@@ -310,7 +305,7 @@ twIDAQAB
def test_password_grant(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
@@ -325,7 +320,7 @@ twIDAQAB
def test_scopes_and_user_id_stored_in_access_token(self):
body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
access_token = json.loads(body)['access_token']
@@ -338,7 +333,7 @@ twIDAQAB
def test_client_grant(self):
body = 'grant_type=client_credentials&scope=all+of+them'
headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
+ '', body=body)
body = json.loads(body)
token = {
'token_type': 'Bearer',
@@ -349,12 +344,12 @@ twIDAQAB
self.assertEqual(body, token)
def test_missing_type(self):
- _, body, _ = self.endpoint.create_token_response('', body='')
+ _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc')
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
def test_invalid_type(self):
- body = 'grant_type=invalid'
+ body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc'
_, body, _ = self.endpoint.create_token_response('', body=body)
token = {'error': 'unsupported_grant_type'}
self.assertEqual(json.loads(body), token)
@@ -366,8 +361,10 @@ class ResourceEndpointTest(TestCase):
self.mock_validator = mock.MagicMock()
self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
token = tokens.BearerToken(request_validator=self.mock_validator)
- self.endpoint = ResourceEndpoint(default_token='Bearer',
- token_types={'Bearer': token})
+ self.endpoint = ResourceEndpoint(
+ default_token='Bearer',
+ token_types={'Bearer': token}
+ )
def test_defaults(self):
uri = 'http://a.b/path?some=query'
diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py
index e2e558d..061754f 100644
--- a/tests/oauth2/rfc6749/test_tokens.py
+++ b/tests/oauth2/rfc6749/test_tokens.py
@@ -1,6 +1,11 @@
from __future__ import absolute_import, unicode_literals
-from oauthlib.oauth2.rfc6749.tokens import *
+from oauthlib.oauth2.rfc6749.tokens import (
+ prepare_mac_header,
+ prepare_bearer_headers,
+ prepare_bearer_body,
+ prepare_bearer_uri,
+)
from ...unittest import TestCase
@@ -59,9 +64,22 @@ class TokenTest(TestCase):
bearer_headers = {
'Authorization': 'Bearer vF9dft4qmT'
}
+ fake_bearer_headers = [
+ {'Authorization': 'Beaver vF9dft4qmT'},
+ {'Authorization': 'BeavervF9dft4qmT'},
+ {'Authorization': 'Beaver vF9dft4qmT'},
+ {'Authorization': 'BearerF9dft4qmT'},
+ {'Authorization': 'Bearer vF9d ft4qmT'},
+ ]
+ valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'}
bearer_body = 'access_token=vF9dft4qmT'
bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT'
+ def _mocked_validate_bearer_token(self, token, scopes, request):
+ if not token:
+ return False
+ return True
+
def test_prepare_mac_header(self):
"""Verify mac signatures correctness
diff --git a/tests/openid/__init__.py b/tests/openid/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/openid/__init__.py
diff --git a/tests/openid/connect/__init__.py b/tests/openid/connect/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/openid/connect/__init__.py
diff --git a/tests/openid/connect/core/__init__.py b/tests/openid/connect/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/openid/connect/core/__init__.py
diff --git a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py b/tests/openid/connect/core/endpoints/test_claims_handling.py
index 9795c80..37a7cdd 100644
--- a/tests/oauth2/rfc6749/endpoints/test_claims_handling.py
+++ b/tests/openid/connect/core/endpoints/test_claims_handling.py
@@ -10,10 +10,12 @@ from __future__ import absolute_import, unicode_literals
import mock
-from oauthlib.oauth2 import InvalidRequestError, RequestValidator, Server
+from oauthlib.oauth2 import RequestValidator
+
+from oauthlib.oauth2.rfc6749.endpoints.pre_configured import Server
from ....unittest import TestCase
-from .test_utils import get_fragment_credentials, get_query_credentials
+from .test_utils import get_query_credentials
class TestClaimsHandling(TestCase):
@@ -81,7 +83,7 @@ class TestClaimsHandling(TestCase):
}
}
- claims_urlquoted='%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D'
+ claims_urlquoted = '%7B%22id_token%22%3A%20%7B%22claim_2%22%3A%20%7B%22essential%22%3A%20true%7D%2C%20%22claim_1%22%3A%20null%7D%2C%20%22userinfo%22%3A%20%7B%22claim_4%22%3A%20null%2C%20%22claim_3%22%3A%20%7B%22essential%22%3A%20true%7D%7D%7D'
uri = 'http://example.com/path?client_id=abc&scope=openid+test_scope&response_type=code&claims=%s'
h, b, s = self.server.create_authorization_response(uri % claims_urlquoted, scopes='openid test_scope')
@@ -90,8 +92,10 @@ class TestClaimsHandling(TestCase):
code = get_query_credentials(h['Location'])['code'][0]
token_uri = 'http://example.com/path'
- _, body, _ = self.server.create_token_response(token_uri,
- body='grant_type=authorization_code&code=%s' % code)
+ _, body, _ = self.server.create_token_response(
+ token_uri,
+ body='client_id=me&redirect_uri=http://back.to/me&grant_type=authorization_code&code=%s' % code
+ )
self.assertDictEqual(self.claims_saved_with_bearer_token, claims)
diff --git a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py
index 89431b6..89431b6 100644
--- a/tests/oauth2/rfc6749/endpoints/test_openid_connect_params_handling.py
+++ b/tests/openid/connect/core/endpoints/test_openid_connect_params_handling.py
diff --git a/tests/openid/connect/core/grant_types/test_authorization_code.py b/tests/openid/connect/core/grant_types/test_authorization_code.py
new file mode 100644
index 0000000..1bad120
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_authorization_code.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import json
+
+import mock
+
+from oauthlib.common import Request
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt
+
+from ....unittest import TestCase
+from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest
+
+
+def get_id_token_mock(token, token_handler, request):
+ return "MOCKED_TOKEN"
+
+
+class OpenIDAuthCodeInterferenceTest(AuthorizationCodeGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDAuthCodeInterferenceTest, self).setUp()
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+
+class OpenIDAuthCodeTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'openid')
+ self.request.expires_in = 1800
+ self.request.client_id = 'abcdef'
+ self.request.code = '1234'
+ self.request.response_type = 'code'
+ self.request.grant_type = 'authorization_code'
+ self.request.redirect_uri = 'https://a.b/cb'
+ self.request.state = 'abc'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = self.set_client
+ self.mock_validator.get_id_token.side_effect = get_id_token_mock
+ self.auth = AuthorizationCodeGrant(request_validator=self.mock_validator)
+
+ self.url_query = 'https://a.b/cb?code=abc&state=abc'
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc'
+
+ def set_client(self, request):
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked'
+ return True
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_authorization(self, generate_token):
+
+ scope, info = self.auth.validate_authorization_request(self.request)
+
+ generate_token.return_value = 'abc'
+ bearer = BearerToken(self.mock_validator)
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_mode = 'fragment'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_no_prompt_authorization(self, generate_token):
+ generate_token.return_value = 'abc'
+ scope, info = self.auth.validate_authorization_request(self.request)
+ self.request.prompt = 'none'
+ self.assertRaises(OIDCNoPrompt,
+ self.auth.validate_authorization_request,
+ self.request)
+
+ # prompt == none requires id token hint
+ bearer = BearerToken(self.mock_validator)
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_mode = 'query'
+ self.request.id_token_hint = 'me@email.com'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ # Test alernative response modes
+ self.request.response_mode = 'fragment'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+
+ # Ensure silent authentication and authorization is done
+ self.mock_validator.validate_silent_login.return_value = False
+ self.mock_validator.validate_silent_authorization.return_value = True
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ self.mock_validator.validate_silent_login.return_value = True
+ self.mock_validator.validate_silent_authorization.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=consent_required', h['Location'])
+
+ # ID token hint must match logged in user
+ self.mock_validator.validate_silent_authorization.return_value = True
+ self.mock_validator.validate_user_match.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ def set_scopes(self, client_id, code, client, request):
+ request.scopes = self.request.scopes
+ request.state = self.request.state
+ request.user = 'bob'
+ return True
+
+ def test_create_token_response(self):
+ self.request.response_type = None
+ self.mock_validator.validate_code.side_effect = self.set_scopes
+
+ bearer = BearerToken(self.mock_validator)
+
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertIn('id_token', token)
+ self.assertIn('openid', token['scope'])
+
+ self.mock_validator.reset_mock()
+
+ self.request.scopes = ('hello', 'world')
+ h, token, s = self.auth.create_token_response(self.request, bearer)
+ token = json.loads(token)
+ self.assertEqual(self.mock_validator.save_token.call_count, 1)
+ self.assertIn('access_token', token)
+ self.assertIn('refresh_token', token)
+ self.assertIn('expires_in', token)
+ self.assertIn('scope', token)
+ self.assertNotIn('id_token', token)
+ self.assertNotIn('openid', token['scope'])
diff --git a/tests/openid/connect/core/grant_types/test_dispatchers.py b/tests/openid/connect/core/grant_types/test_dispatchers.py
new file mode 100644
index 0000000..f90ec46
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_dispatchers.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+import mock
+
+from oauthlib.common import Request
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.dispatchers import (
+ ImplicitTokenGrantDispatcher,
+ AuthorizationTokenGrantDispatcher
+)
+
+from oauthlib.oauth2.rfc6749.grant_types import (
+ AuthorizationCodeGrant as OAuth2AuthorizationCodeGrant,
+ ImplicitGrant as OAuth2ImplicitGrant,
+)
+
+
+from ....unittest import TestCase
+
+
+class ImplicitTokenGrantDispatcherTest(TestCase):
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ request_validator = mock.MagicMock()
+ implicit_grant = OAuth2ImplicitGrant(request_validator)
+ openid_connect_implicit = ImplicitGrant(request_validator)
+
+ self.dispatcher = ImplicitTokenGrantDispatcher(
+ default_implicit_grant=implicit_grant,
+ oidc_implicit_grant=openid_connect_implicit
+ )
+
+ def test_create_authorization_response_openid(self):
+ self.request.scopes = ('hello', 'openid')
+ self.request.response_type = 'id_token'
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_validate_authorization_request_openid(self):
+ self.request.scopes = ('hello', 'openid')
+ self.request.response_type = 'id_token'
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_create_authorization_response_oauth(self):
+ self.request.scopes = ('hello', 'world')
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+ def test_validate_authorization_request_oauth(self):
+ self.request.scopes = ('hello', 'world')
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, ImplicitGrant))
+
+
+class DispatcherTest(TestCase):
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.decoded_body = (
+ ("client_id", "me"),
+ ("code", "code"),
+ ("redirect_url", "https://a.b/cb"),
+ )
+
+ self.request_validator = mock.MagicMock()
+ self.auth_grant = OAuth2AuthorizationCodeGrant(self.request_validator)
+ self.openid_connect_auth = OAuth2AuthorizationCodeGrant(self.request_validator)
+
+
+class AuthTokenGrantDispatcherOpenIdTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOpenIdTest, self).setUp()
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_openid(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, AuthorizationCodeGrant))
+ self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called)
+
+
+class AuthTokenGrantDispatcherOpenIdWithoutCodeTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOpenIdWithoutCodeTest, self).setUp()
+ self.request.decoded_body = (
+ ("client_id", "me"),
+ ("code", ""),
+ ("redirect_url", "https://a.b/cb"),
+ )
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'openid')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_openid_without_code(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant))
+ self.assertFalse(self.dispatcher.request_validator.get_authorization_code_scopes.called)
+
+
+class AuthTokenGrantDispatcherOAuthTest(DispatcherTest):
+
+ def setUp(self):
+ super(AuthTokenGrantDispatcherOAuthTest, self).setUp()
+ self.request_validator.get_authorization_code_scopes.return_value = ('hello', 'world')
+ self.dispatcher = AuthorizationTokenGrantDispatcher(
+ self.request_validator,
+ default_token_grant=self.auth_grant,
+ oidc_token_grant=self.openid_connect_auth
+ )
+
+ def test_create_token_response_oauth(self):
+ handler = self.dispatcher._handler_for_request(self.request)
+ self.assertTrue(isinstance(handler, OAuth2AuthorizationCodeGrant))
+ self.assertTrue(self.dispatcher.request_validator.get_authorization_code_scopes.called)
diff --git a/tests/openid/connect/core/grant_types/test_hybrid.py b/tests/openid/connect/core/grant_types/test_hybrid.py
new file mode 100644
index 0000000..531ae7f
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_hybrid.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+
+from ....oauth2.rfc6749.grant_types.test_authorization_code import AuthorizationCodeGrantTest
+
+
+class OpenIDHybridInterferenceTest(AuthorizationCodeGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDHybridInterferenceTest, self).setUp()
+ self.auth = HybridGrant(request_validator=self.mock_validator)
diff --git a/tests/openid/connect/core/grant_types/test_implicit.py b/tests/openid/connect/core/grant_types/test_implicit.py
new file mode 100644
index 0000000..56247d9
--- /dev/null
+++ b/tests/openid/connect/core/grant_types/test_implicit.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.common import Request
+
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+from oauthlib.openid.connect.core.grant_types.exceptions import OIDCNoPrompt
+
+from ....unittest import TestCase
+from .test_authorization_code import get_id_token_mock, OpenIDAuthCodeTest
+
+from ....oauth2.rfc6749.grant_types.test_implicit import ImplicitGrantTest
+
+
+class OpenIDImplicitInterferenceTest(ImplicitGrantTest):
+ """Test that OpenID don't interfere with normal OAuth 2 flows."""
+
+ def setUp(self):
+ super(OpenIDImplicitInterferenceTest, self).setUp()
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+
+class OpenIDImplicitTest(TestCase):
+
+ def setUp(self):
+ self.request = Request('http://a.b/path')
+ self.request.scopes = ('hello', 'openid')
+ self.request.expires_in = 1800
+ self.request.client_id = 'abcdef'
+ self.request.response_type = 'id_token token'
+ self.request.redirect_uri = 'https://a.b/cb'
+ self.request.nonce = 'zxc'
+ self.request.state = 'abc'
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.get_id_token.side_effect = get_id_token_mock
+ self.auth = ImplicitGrant(request_validator=self.mock_validator)
+
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_authorization(self, generate_token):
+ scope, info = self.auth.validate_authorization_request(self.request)
+
+ generate_token.return_value = 'abc'
+ bearer = BearerToken(self.mock_validator)
+
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.response_type = 'id_token'
+ token = 'MOCKED_TOKEN'
+ url = 'https://a.b/cb#state=abc&id_token=%s' % token
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], url, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.nonce = None
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ @mock.patch('oauthlib.common.generate_token')
+ def test_no_prompt_authorization(self, generate_token):
+ generate_token.return_value = 'abc'
+ scope, info = self.auth.validate_authorization_request(self.request)
+ self.request.prompt = 'none'
+ self.assertRaises(OIDCNoPrompt,
+ self.auth.validate_authorization_request,
+ self.request)
+
+ # prompt == none requires id token hint
+ bearer = BearerToken(self.mock_validator)
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=invalid_request', h['Location'])
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ self.request.id_token_hint = 'me@email.com'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_fragment, parse_fragment=True)
+ self.assertEqual(b, None)
+ self.assertEqual(s, 302)
+
+ # Test alernative response modes
+ self.request.response_mode = 'query'
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertURLEqual(h['Location'], self.url_query)
+
+ # Ensure silent authentication and authorization is done
+ self.mock_validator.validate_silent_login.return_value = False
+ self.mock_validator.validate_silent_authorization.return_value = True
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+ self.mock_validator.validate_silent_login.return_value = True
+ self.mock_validator.validate_silent_authorization.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=consent_required', h['Location'])
+
+ # ID token hint must match logged in user
+ self.mock_validator.validate_silent_authorization.return_value = True
+ self.mock_validator.validate_user_match.return_value = False
+ h, b, s = self.auth.create_authorization_response(self.request, bearer)
+ self.assertIn('error=login_required', h['Location'])
+
+
+class OpenIDHybridCodeTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeTokenTest, self).setUp()
+ self.request.response_type = 'code token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc'
+
+
+class OpenIDHybridCodeIdTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeIdTokenTest, self).setUp()
+ self.request.response_type = 'code id_token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&id_token=%s' % token
+
+
+class OpenIDHybridCodeIdTokenTokenTest(OpenIDAuthCodeTest):
+
+ def setUp(self):
+ super(OpenIDHybridCodeIdTokenTokenTest, self).setUp()
+ self.request.response_type = 'code id_token token'
+ self.auth = HybridGrant(request_validator=self.mock_validator)
+ token = 'MOCKED_TOKEN'
+ self.url_query = 'https://a.b/cb?code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
+ self.url_fragment = 'https://a.b/cb#code=abc&state=abc&token_type=Bearer&expires_in=3600&scope=hello+openid&access_token=abc&id_token=%s' % token
diff --git a/tests/openid/connect/core/test_request_validator.py b/tests/openid/connect/core/test_request_validator.py
new file mode 100644
index 0000000..14a7c23
--- /dev/null
+++ b/tests/openid/connect/core/test_request_validator.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+from oauthlib.openid.connect.core.request_validator import RequestValidator
+
+from ....unittest import TestCase
+
+
+class RequestValidatorTest(TestCase):
+
+ def test_method_contracts(self):
+ v = RequestValidator()
+ self.assertRaises(
+ NotImplementedError,
+ v.get_authorization_code_scopes,
+ 'client_id', 'code', 'redirect_uri', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.get_jwt_bearer_token,
+ 'token', 'token_handler', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.get_id_token,
+ 'token', 'token_handler', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_jwt_bearer_token,
+ 'token', 'scopes', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_id_token,
+ 'token', 'scopes', 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_silent_authorization,
+ 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_silent_login,
+ 'request'
+ )
+ self.assertRaises(
+ NotImplementedError,
+ v.validate_user_match,
+ 'id_token_hint', 'scopes', 'claims', 'request'
+ )
diff --git a/tests/openid/connect/core/test_server.py b/tests/openid/connect/core/test_server.py
new file mode 100644
index 0000000..83290db
--- /dev/null
+++ b/tests/openid/connect/core/test_server.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import json
+
+import mock
+
+from oauthlib.oauth2.rfc6749 import errors
+from oauthlib.oauth2.rfc6749.endpoints.authorization import AuthorizationEndpoint
+from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
+from oauthlib.oauth2.rfc6749.tokens import BearerToken
+
+from oauthlib.openid.connect.core.grant_types.authorization_code import AuthorizationCodeGrant
+from oauthlib.openid.connect.core.grant_types.implicit import ImplicitGrant
+from oauthlib.openid.connect.core.grant_types.hybrid import HybridGrant
+
+from ....unittest import TestCase
+
+
+class AuthorizationEndpointTest(TestCase):
+
+ def setUp(self):
+ self.mock_validator = mock.MagicMock()
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(request_validator=self.mock_validator)
+ auth_code.save_authorization_code = mock.MagicMock()
+ implicit = ImplicitGrant(
+ request_validator=self.mock_validator)
+ implicit.save_token = mock.MagicMock()
+ hybrid = HybridGrant(self.mock_validator)
+
+ response_types = {
+ 'code': auth_code,
+ 'token': implicit,
+ 'id_token': implicit,
+ 'id_token token': implicit,
+ 'code token': hybrid,
+ 'code id_token': hybrid,
+ 'code token id_token': hybrid,
+ 'none': auth_code
+ }
+ self.expires_in = 1800
+ token = BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = AuthorizationEndpoint(
+ default_response_type='code',
+ default_token_type=token,
+ response_types=response_types
+ )
+
+ # TODO: Add hybrid grant test
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_implicit_grant(self):
+ uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
+
+ def test_none_grant(self):
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
+ self.assertEqual(body, None)
+ self.assertEqual(status_code, 302)
+
+ # and without the state parameter
+ uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True)
+ self.assertEqual(body, None)
+ self.assertEqual(status_code, 302)
+
+ def test_missing_type(self):
+ uri = 'http://i.b/l?client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.InvalidRequestError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
+
+ def test_invalid_type(self):
+ uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
+ uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
+ self.mock_validator.validate_request = mock.MagicMock(
+ side_effect=errors.UnsupportedResponseTypeError())
+ headers, body, status_code = self.endpoint.create_authorization_response(
+ uri, scopes=['all', 'of', 'them'])
+ self.assertIn('Location', headers)
+ self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
+
+
+class TokenEndpointTest(TestCase):
+
+ def setUp(self):
+ def set_user(request):
+ request.user = mock.MagicMock()
+ request.client = mock.MagicMock()
+ request.client.client_id = 'mocked_client_id'
+ return True
+
+ self.mock_validator = mock.MagicMock()
+ self.mock_validator.authenticate_client.side_effect = set_user
+ self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
+ auth_code = AuthorizationCodeGrant(
+ request_validator=self.mock_validator)
+ supported_types = {
+ 'authorization_code': auth_code,
+ }
+ self.expires_in = 1800
+ token = BearerToken(
+ self.mock_validator,
+ expires_in=self.expires_in
+ )
+ self.endpoint = TokenEndpoint(
+ 'authorization_code',
+ default_token_type=token,
+ grant_types=supported_types
+ )
+
+ @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
+ def test_authorization_grant(self):
+ body = 'grant_type=authorization_code&code=abc&scope=all+of+them&state=xyz'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'scope': 'all of them',
+ 'state': 'xyz'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ body = 'grant_type=authorization_code&code=abc&state=xyz'
+ headers, body, status_code = self.endpoint.create_token_response(
+ '', body=body)
+ token = {
+ 'token_type': 'Bearer',
+ 'expires_in': self.expires_in,
+ 'access_token': 'abc',
+ 'refresh_token': 'abc',
+ 'state': 'xyz'
+ }
+ self.assertEqual(json.loads(body), token)
+
+ def test_missing_type(self):
+ _, body, _ = self.endpoint.create_token_response('', body='')
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
+
+ def test_invalid_type(self):
+ body = 'grant_type=invalid'
+ _, body, _ = self.endpoint.create_token_response('', body=body)
+ token = {'error': 'unsupported_grant_type'}
+ self.assertEqual(json.loads(body), token)
diff --git a/tests/openid/connect/core/test_tokens.py b/tests/openid/connect/core/test_tokens.py
new file mode 100644
index 0000000..12c75f1
--- /dev/null
+++ b/tests/openid/connect/core/test_tokens.py
@@ -0,0 +1,133 @@
+from __future__ import absolute_import, unicode_literals
+
+import mock
+
+from oauthlib.openid.connect.core.tokens import JWTToken
+
+from ....unittest import TestCase
+
+
+class JWTTokenTestCase(TestCase):
+
+ def test_create_token_callable_expires_in(self):
+ """
+ Test retrieval of the expires in value by calling the callable expires_in property
+ """
+
+ expires_in_mock = mock.MagicMock()
+ request_mock = mock.MagicMock()
+
+ token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock())
+ token.create_token(request=request_mock)
+
+ expires_in_mock.assert_called_once_with(request_mock)
+
+ def test_create_token_non_callable_expires_in(self):
+ """
+ When a non callable expires in is set this should just be set to the request
+ """
+
+ expires_in_mock = mock.NonCallableMagicMock()
+ request_mock = mock.MagicMock()
+
+ token = JWTToken(expires_in=expires_in_mock, request_validator=mock.MagicMock())
+ token.create_token(request=request_mock)
+
+ self.assertFalse(expires_in_mock.called)
+ self.assertEqual(request_mock.expires_in, expires_in_mock)
+
+ def test_create_token_calls_get_id_token(self):
+ """
+ When create_token is called the call should be forwarded to the get_id_token on the token validator
+ """
+ request_mock = mock.MagicMock()
+
+ with mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+
+ request_validator = RequestValidatorMock()
+
+ token = JWTToken(expires_in=mock.MagicMock(), request_validator=request_validator)
+ token.create_token(request=request_mock)
+
+ request_validator.get_jwt_bearer_token.assert_called_once_with(None, None, request_mock)
+
+ def test_validate_request_token_from_headers(self):
+ """
+ Bearer token get retrieved from headers.
+ """
+
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \
+ mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+ request_validator_mock = RequestValidatorMock()
+
+ token = JWTToken(request_validator=request_validator_mock)
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.scopes = mock.MagicMock()
+ request.headers = {
+ 'Authorization': 'Bearer some-token-from-header'
+ }
+
+ token.validate_request(request=request)
+
+ request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-header',
+ request.scopes,
+ request)
+
+ def test_validate_token_from_request(self):
+ """
+ Token get retrieved from request object.
+ """
+
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock, \
+ mock.patch('oauthlib.oauth2.rfc6749.request_validator.RequestValidator',
+ autospec=True) as RequestValidatorMock:
+ request_validator_mock = RequestValidatorMock()
+
+ token = JWTToken(request_validator=request_validator_mock)
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.scopes = mock.MagicMock()
+ request.access_token = 'some-token-from-request-object'
+ request.headers = {}
+
+ token.validate_request(request=request)
+
+ request_validator_mock.validate_jwt_bearer_token.assert_called_once_with('some-token-from-request-object',
+ request.scopes,
+ request)
+
+ def test_estimate_type(self):
+ """
+ Estimate type results for a jwt token
+ """
+
+ def test_token(token, expected_result):
+ with mock.patch('oauthlib.common.Request', autospec=True) as RequestMock:
+ jwt_token = JWTToken()
+
+ request = RequestMock('/uri')
+ # Scopes is retrieved using the __call__ method which is not picked up correctly by mock.patch
+ # with autospec=True
+ request.headers = {
+ 'Authorization': 'Bearer {}'.format(token)
+ }
+
+ result = jwt_token.estimate_type(request=request)
+
+ self.assertEqual(result, expected_result)
+
+ test_items = (
+ ('eyfoo.foo.foo', 10),
+ ('eyfoo.foo.foo.foo.foo', 10),
+ ('eyfoobar', 0)
+ )
+
+ for token, expected_result in test_items:
+ test_token(token, expected_result)