1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import datetime
import os
from mock import patch
from oauthlib import common, signals
from oauthlib.oauth2 import (BackendApplicationClient, Client,
LegacyApplicationClient, MobileApplicationClient,
WebApplicationClient)
from oauthlib.oauth2.rfc6749 import errors, utils
from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY
from ....unittest import TestCase
@patch('time.time', new=lambda: 1000)
class WebApplicationClientTest(TestCase):
client_id = "someclientid"
uri = "https://example.com/path?query=world"
uri_id = uri + "&response_type=code&client_id=" + client_id
uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
redirect_uri = "http://my.page.com/callback"
scope = ["/profile"]
state = "xyz"
uri_scope = uri_id + "&scope=%2Fprofile"
uri_state = uri_id + "&state=" + state
kwargs = {
"some": "providers",
"require": "extra arguments"
}
uri_kwargs = uri_id + "&some=providers&require=extra+arguments"
uri_authorize_code = uri_redirect + "&scope=%2Fprofile&state=" + state
code = "zzzzaaaa"
body = "not=empty"
body_code = "not=empty&grant_type=authorization_code&code=%s" % code
body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback"
body_kwargs = body_code + "&some=providers&require=extra+arguments"
response_uri = "https://client.example.com/cb?code=zzzzaaaa&state=xyz"
response = {"code": "zzzzaaaa", "state": "xyz"}
token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
' "token_type":"example",'
' "expires_in":3600,'
' "scope":"/profile",'
' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",'
' "example_parameter":"example_value"}')
token = {
"access_token": "2YotnFZFEjr1zCsicMWpAA",
"token_type": "example",
"expires_in": 3600,
"expires_at": 4600,
"scope": scope,
"refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
"example_parameter": "example_value"
}
def test_auth_grant_uri(self):
client = WebApplicationClient(self.client_id)
# Basic, no extra arguments
uri = client.prepare_request_uri(self.uri)
self.assertURLEqual(uri, self.uri_id)
# With redirection uri
uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri)
self.assertURLEqual(uri, self.uri_redirect)
# With scope
uri = client.prepare_request_uri(self.uri, scope=self.scope)
self.assertURLEqual(uri, self.uri_scope)
# With state
uri = client.prepare_request_uri(self.uri, state=self.state)
self.assertURLEqual(uri, self.uri_state)
# With extra parameters through kwargs
uri = client.prepare_request_uri(self.uri, **self.kwargs)
self.assertURLEqual(uri, self.uri_kwargs)
def test_request_body(self):
client = WebApplicationClient(self.client_id, code=self.code)
# Basic, no extra arguments
body = client.prepare_request_body(body=self.body)
self.assertFormBodyEqual(body, self.body_code)
rclient = WebApplicationClient(self.client_id)
body = rclient.prepare_request_body(code=self.code, body=self.body)
self.assertFormBodyEqual(body, self.body_code)
# With redirection uri
body = client.prepare_request_body(body=self.body, redirect_uri=self.redirect_uri)
self.assertFormBodyEqual(body, self.body_redirect)
# With extra parameters
body = client.prepare_request_body(body=self.body, **self.kwargs)
self.assertFormBodyEqual(body, self.body_kwargs)
def test_parse_grant_uri_response(self):
client = WebApplicationClient(self.client_id)
# Parse code and state
response = client.parse_request_uri_response(self.response_uri, state=self.state)
self.assertEqual(response, self.response)
self.assertEqual(client.code, self.code)
# Mismatching state
self.assertRaises(errors.MismatchingStateError,
client.parse_request_uri_response,
self.response_uri,
state="invalid")
def test_populate_attributes(self):
client = WebApplicationClient(self.client_id)
response_uri = (self.response_uri +
"&access_token=EVIL-TOKEN"
"&refresh_token=EVIL-TOKEN"
"&mac_key=EVIL-KEY")
client.parse_request_uri_response(response_uri, self.state)
self.assertEqual(client.code, self.code)
# We must not accidentally pick up any further security
# credentials at this point.
self.assertIsNone(client.access_token)
self.assertIsNone(client.refresh_token)
self.assertIsNone(client.mac_key)
def test_parse_token_response(self):
client = WebApplicationClient(self.client_id)
# Parse code and state
response = client.parse_request_body_response(self.token_json, scope=self.scope)
self.assertEqual(response, self.token)
self.assertEqual(client.access_token, response.get("access_token"))
self.assertEqual(client.refresh_token, response.get("refresh_token"))
self.assertEqual(client.token_type, response.get("token_type"))
# Mismatching state
self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
token = client.parse_request_body_response(self.token_json, scope="invalid")
self.assertTrue(token.scope_changed)
scope_changes_recorded = []
def record_scope_change(sender, message, old, new):
scope_changes_recorded.append((message, old, new))
signals.scope_changed.connect(record_scope_change)
try:
client.parse_request_body_response(self.token_json, scope="invalid")
self.assertEqual(len(scope_changes_recorded), 1)
message, old, new = scope_changes_recorded[0]
self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".')
self.assertEqual(old, ['invalid'])
self.assertEqual(new, ['/profile'])
finally:
signals.scope_changed.disconnect(record_scope_change)
del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
def test_prepare_authorization_requeset(self):
client = WebApplicationClient(self.client_id)
url, header, body = client.prepare_authorization_request(
self.uri, redirect_url=self.redirect_uri, state=self.state, scope=self.scope)
self.assertURLEqual(url, self.uri_authorize_code)
# verify default header and body only
self.assertEqual(header, {'Content-Type': 'application/x-www-form-urlencoded'})
self.assertEqual(body, '')
|