# -*- coding: utf-8 -*- import json from unittest import mock from oauthlib import common from oauthlib.oauth2.rfc6749 import errors, tokens from oauthlib.oauth2.rfc6749.endpoints import Server from oauthlib.oauth2.rfc6749.endpoints.authorization import ( AuthorizationEndpoint, ) from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint from oauthlib.oauth2.rfc6749.grant_types import ( AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant, ResourceOwnerPasswordCredentialsGrant, ) from tests.unittest import TestCase class AuthorizationEndpointTest(TestCase): def setUp(self): self.mock_validator = mock.MagicMock() self.mock_validator.get_code_challenge.return_value = None self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( request_validator=self.mock_validator) auth_code.save_authorization_code = mock.MagicMock() implicit = ImplicitGrant( request_validator=self.mock_validator) implicit.save_token = mock.MagicMock() response_types = { 'code': auth_code, 'token': implicit, 'none': auth_code } self.expires_in = 1800 token = tokens.BearerToken( self.mock_validator, expires_in=self.expires_in ) self.endpoint = AuthorizationEndpoint( default_response_type='code', default_token_type=token, response_types=response_types ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz') @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_implicit_grant(self): uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True) def test_none_grant(self): uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True) self.assertIsNone(body) self.assertEqual(status_code, 302) # and without the state parameter uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True) self.assertIsNone(body) self.assertEqual(status_code, 302) def test_missing_type(self): uri = 'http://i.b/l?client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( side_effect=errors.InvalidRequestError()) headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.') def test_invalid_type(self): uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them' uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme' self.mock_validator.validate_request = mock.MagicMock( side_effect=errors.UnsupportedResponseTypeError()) headers, body, status_code = self.endpoint.create_authorization_response( uri, scopes=['all', 'of', 'them']) self.assertIn('Location', headers) self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type') class TokenEndpointTest(TestCase): def setUp(self): def set_user(request): request.user = mock.MagicMock() request.client = mock.MagicMock() request.client.client_id = 'mocked_client_id' return True self.mock_validator = mock.MagicMock() self.mock_validator.authenticate_client.side_effect = set_user self.mock_validator.get_code_challenge.return_value = None self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) auth_code = AuthorizationCodeGrant( request_validator=self.mock_validator) password = ResourceOwnerPasswordCredentialsGrant( request_validator=self.mock_validator) client = ClientCredentialsGrant( request_validator=self.mock_validator) supported_types = { 'authorization_code': auth_code, 'password': password, 'client_credentials': client, } self.expires_in = 1800 token = tokens.BearerToken( self.mock_validator, expires_in=self.expires_in ) self.endpoint = TokenEndpoint( 'authorization_code', default_token_type=token, grant_types=supported_types ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'grant_type=authorization_code&code=abc&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': 'abc', 'refresh_token': 'abc', 'scope': 'all of them' } self.assertEqual(json.loads(body), token) body = 'grant_type=authorization_code&code=abc' headers, body, status_code = self.endpoint.create_token_response( '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': 'abc', 'refresh_token': 'abc' } self.assertEqual(json.loads(body), token) # try with additional custom variables body = 'grant_type=authorization_code&code=abc&state=foobar' headers, body, status_code = self.endpoint.create_token_response( '', body=body) self.assertEqual(json.loads(body), token) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': 'abc', 'refresh_token': 'abc', 'scope': 'all of them', } self.assertEqual(json.loads(body), token) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': 'abc', 'scope': 'all of them', } self.assertEqual(json.loads(body), token) def test_missing_type(self): _, body, _ = self.endpoint.create_token_response('', body='') token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) def test_invalid_type(self): body = 'grant_type=invalid' _, body, _ = self.endpoint.create_token_response('', body=body) token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) class SignedTokenEndpointTest(TestCase): def setUp(self): self.expires_in = 1800 def set_user(request): request.user = mock.MagicMock() request.client = mock.MagicMock() request.client.client_id = 'mocked_client_id' return True self.mock_validator = mock.MagicMock() self.mock_validator.get_code_challenge.return_value = None self.mock_validator.authenticate_client.side_effect = set_user self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) self.private_pem = """ -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEA6TtDhWGwzEOWZP6m/zHoZnAPLABfetvoMPmxPGjFjtDuMRPv EvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo5laMGbj1pPyCPxlVi9LK82HQNX0B YK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGIQ0tT7yQcBm2A8FEaP0bZYCLMjwMN WfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3NwsFuJgOa/KuodXaAmf8pJnx8t1Wn nxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT7xn4K/ghLgNs09v6Yge0pmPl9Oz+ +bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OXtwIDAQABAoIBAGFTTbXXMkPK4HN8 oItVdDlrAanG7hECuz3UtFUVE3upS/xG6TjqweVLwRqYCh2ssDXFwjy4mXRGDzF4 e/e/6s9Txlrlh/w1MtTJ6ZzTdcViR9RKOczysjZ7S5KRlI3KnGFAuWPcG2SuOWjZ dZfzcj1Crd/ZHajBAVFHRsCo/ATVNKbTRprFfb27xKpQ2BwH/GG781sLE3ZVNIhs aRRaED4622kI1E/WXws2qQMqbFKzo0m1tPbLb3Z89WgZJ/tRQwuDype1Vfm7k6oX xfbp3948qSe/yWKRlMoPkleji/WxPkSIalzWSAi9ziN/0Uzhe65FURgrfHL3XR1A B8UR+aECgYEA7NPQZV4cAikk02Hv65JgISofqV49P8MbLXk8sdnI1n7Mj10TgzU3 lyQGDEX4hqvT0bTXe4KAOxQZx9wumu05ejfzhdtSsEm6ptGHyCdmYDQeV0C/pxDX JNCK8XgMku2370XG0AnyBCT7NGlgtDcNCQufcesF2gEuoKiXg6Zjo7sCgYEA/Bzs 9fWGZZnSsMSBSW2OYbFuhF3Fne0HcxXQHipl0Rujc/9g0nccwqKGizn4fGOE7a8F usQgJoeGcinL7E9OEP/uQ9VX1C9RNVjIxP1O5/Guw1zjxQQYetOvbPhN2QhD1Ye7 0TRKrW1BapcjwLpFQlVg1ZeTPOi5lv24W/wX9jUCgYEAkrMSX/hPuTbrTNVZ3L6r NV/2hN+PaTPeXei/pBuXwOaCqDurnpcUfFcgN/IP5LwDVd+Dq0pHTFFDNv45EFbq R77o5n3ZVsIVEMiyJ1XgoK8oLDw7e61+15smtjT69Piz+09pu+ytMcwGn4y3Dmsb dALzHYnL8iLRU0ubrz0ec4kCgYAJiVKRTzNBPptQom49h85d9ac3jJCAE8o3WTjh Gzt0uHXrWlqgO280EY/DTnMOyXjqwLcXxHlu26uDP/99tdY/IF8z46sJ1KxetzgI 84f7kBHLRAU9m5UNeFpnZdEUB5MBTbwWAsNcYgiabpMkpCcghjg+fBhOsoLqqjhC CnwhjQKBgQDkv0QTdyBU84TE8J0XY3eLQwXbrvG2yD5A2ntN3PyxGEneX5WTJGMZ xJxwaFYQiDS3b9E7b8Q5dg8qa5Y1+epdhx3cuQAWPm+AoHKshDfbRve4txBDQAqh c6MxSWgsa+2Ld5SWSNbGtpPcmEM3Fl5ttMCNCKtNc0UE16oHwaPAIw== -----END RSA PRIVATE KEY----- """ self.public_pem = """ -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6TtDhWGwzEOWZP6m/zHo ZnAPLABfetvoMPmxPGjFjtDuMRPvEvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo 5laMGbj1pPyCPxlVi9LK82HQNX0BYK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGI Q0tT7yQcBm2A8FEaP0bZYCLMjwMNWfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3N wsFuJgOa/KuodXaAmf8pJnx8t1WnnxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT 7xn4K/ghLgNs09v6Yge0pmPl9Oz++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OX twIDAQAB -----END PUBLIC KEY----- """ signed_token = tokens.signed_token_generator(self.private_pem, user_id=123) self.endpoint = Server( self.mock_validator, token_expires_in=self.expires_in, token_generator=signed_token, refresh_token_generator=tokens.random_token_generator ) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_authorization_grant(self): body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': body['access_token'], 'refresh_token': 'abc', 'scope': 'all of them' } self.assertEqual(body, token) body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': body['access_token'], 'refresh_token': 'abc' } self.assertEqual(body, token) # try with additional custom variables body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=foobar' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': body['access_token'], 'refresh_token': 'abc' } self.assertEqual(body, token) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_password_grant(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': body['access_token'], 'refresh_token': 'abc', 'scope': 'all of them', } self.assertEqual(body, token) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_scopes_and_user_id_stored_in_access_token(self): body = 'grant_type=password&username=a&password=hello&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) access_token = json.loads(body)['access_token'] claims = common.verify_signed_token(self.public_pem, access_token) self.assertEqual(claims['scope'], 'all of them') self.assertEqual(claims['user_id'], 123) @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc') def test_client_grant(self): body = 'grant_type=client_credentials&scope=all+of+them' headers, body, status_code = self.endpoint.create_token_response( '', body=body) body = json.loads(body) token = { 'token_type': 'Bearer', 'expires_in': self.expires_in, 'access_token': body['access_token'], 'scope': 'all of them', } self.assertEqual(body, token) def test_missing_type(self): _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc') token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) def test_invalid_type(self): body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc' _, body, _ = self.endpoint.create_token_response('', body=body) token = {'error': 'unsupported_grant_type'} self.assertEqual(json.loads(body), token) class ResourceEndpointTest(TestCase): def setUp(self): self.mock_validator = mock.MagicMock() self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock()) token = tokens.BearerToken(request_validator=self.mock_validator) self.endpoint = ResourceEndpoint( default_token='Bearer', token_types={'Bearer': token} ) def test_defaults(self): uri = 'http://a.b/path?some=query' self.mock_validator.validate_bearer_token.return_value = False valid, request = self.endpoint.verify_request(uri) self.assertFalse(valid) self.assertEqual(request.token_type, 'Bearer')