summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cherrypy/_cptools.py3
-rw-r--r--cherrypy/lib/auth_basic.py82
-rw-r--r--cherrypy/lib/auth_digest.py355
-rw-r--r--cherrypy/test/test_auth_basic.py89
-rw-r--r--cherrypy/test/test_auth_digest.py120
5 files changed, 649 insertions, 0 deletions
diff --git a/cherrypy/_cptools.py b/cherrypy/_cptools.py
index 6319d3ae..b6c011a5 100644
--- a/cherrypy/_cptools.py
+++ b/cherrypy/_cptools.py
@@ -235,6 +235,7 @@ class ErrorTool(Tool):
from cherrypy.lib import cptools, encoding, auth, static, tidy, jsontools
from cherrypy.lib import sessions as _sessions, xmlrpc as _xmlrpc
from cherrypy.lib import caching as _caching, wsgiapp as _wsgiapp
+from cherrypy.lib import auth_basic, auth_digest
class SessionTool(Tool):
@@ -500,5 +501,7 @@ _d.redirect = Tool('on_start_resource', cptools.redirect)
_d.autovary = Tool('on_start_resource', cptools.autovary, priority=0)
_d.json_in = Tool('before_handler', jsontools.json_in, priority=30)
_d.json_out = Tool('before_handler', jsontools.json_out, priority=30)
+_d.auth_basic = Tool('before_handler', auth_basic.basic_auth, priority=1)
+_d.auth_digest = Tool('before_handler', auth_digest.digest_auth, priority=1)
del _d, cptools, encoding, auth, static, tidy
diff --git a/cherrypy/lib/auth_basic.py b/cherrypy/lib/auth_basic.py
new file mode 100644
index 00000000..82770f3c
--- /dev/null
+++ b/cherrypy/lib/auth_basic.py
@@ -0,0 +1,82 @@
+# This file is part of CherryPy <http://www.cherrypy.org/>
+# -*- coding: utf-8 -*-
+# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
+
+__doc__ = """Module auth_basic.py provides a CherryPy 3.x tool which implements
+the server-side of HTTP Basic Access Authentication, as described in RFC 2617.
+
+Example usage, using the built-in checkpassword function which uses a dict
+as the credentials store:
+
+userpassdict = {'bird' : 'bebop'}
+checkpassword = cherrypy.lib.auth_basic.checkpassword_dict(userpassdict)
+basic_auth = {'tools.auth_basic.on': True,
+ 'tools.auth_basic.realm': 'earth',
+ 'tools.auth_basic.checkpassword_func': checkpassword,
+}
+app_config = { '/' : basic_auth }
+"""
+
+__author__ = 'visteya'
+__date__ = 'April 2009'
+
+
+import base64
+import cherrypy
+
+
+def checkpassword_dict(user_password_dict):
+ """Returns a checkpassword function which checks credentials
+ against a dictionary of the form: {username : password}.
+
+ If you want a simple dictionary-based authentication scheme, use
+ checkpassword_dict(my_credentials_dict) as the value for the
+ checkpassword_func argument to basic_auth().
+ """
+ def checkpassword(realm, user, password):
+ p = user_password_dict.get(user)
+ return p and p == password or False
+
+ return checkpassword
+
+
+def basic_auth(realm, checkpassword_func):
+ """basic_auth is a CherryPy tool which hooks at before_handler to perform
+ HTTP Basic Access Authentication, as specified in RFC 2617.
+
+ If the request has an 'authorization' header with a 'Basic' scheme, this
+ tool attempts to authenticate the credentials supplied in that header. If
+ the request has no 'authorization' header, or if it does but the scheme is
+ not 'Basic', or if authentication fails, the tool sends a 401 response with
+ a 'WWW-Authenticate' Basic header.
+
+ Arguments:
+ realm: a string containing the authentication realm.
+
+ checkpassword_func: a function which checks the authentication credentials.
+ Its signature is checkpassword(realm, username, password). where
+ username and password are the values obtained from the request's
+ 'authorization' header. If authentication succeeds, checkpassword_func
+ returns True, else it returns False.
+ """
+ if '"' in realm:
+ raise ValueError, 'Realm cannot contain the " (quote) character.'
+
+ auth_header = cherrypy.request.headers.get('authorization')
+ if auth_header is not None:
+ try:
+ scheme, params = auth_header.split(' ', 1)
+ if scheme.lower() == 'basic':
+ # since CherryPy claims compability with Python 2.3, we must use
+ # the legacy API of base64
+ username, password = base64.decodestring(params).split(':', 1)
+ if checkpassword_func(realm, username, password):
+ cherrypy.request.login = username
+ return # successful authentication
+ except (ValueError, binascii.Error): # split() error, base64.decodestring() error
+ raise cherrypy.HTTPError(400, 'Bad Request')
+
+ # Respond with 401 status and a WWW-Authenticate header
+ cherrypy.response.headers['www-authenticate'] = 'Basic realm="%s"' % realm
+ raise cherrypy.HTTPError(401, "You are not authorized to access that resource")
+
diff --git a/cherrypy/lib/auth_digest.py b/cherrypy/lib/auth_digest.py
new file mode 100644
index 00000000..00a4f1d7
--- /dev/null
+++ b/cherrypy/lib/auth_digest.py
@@ -0,0 +1,355 @@
+# This file is part of CherryPy <http://www.cherrypy.org/>
+# -*- coding: utf-8 -*-
+# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
+
+__doc__ = """An implementation of the server-side of HTTP Digest Access
+Authentication, which is described in RFC 2617.
+
+Example usage, using the built-in get_ha1 function which uses a dict
+of plaintext passwords as the credentials store:
+
+userpassdict = {'alice' : '4x5istwelve'}
+get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(userpassdict)
+digest_auth = {'tools.auth_digest.on': True,
+ 'tools.auth_digest.realm': 'wonderland',
+ 'tools.auth_digest.get_ha1_func': get_ha1,
+ 'tools.auth_digest.key': 'a565c27146791cfb',
+}
+app_config = { '/' : digest_auth }
+"""
+
+__author__ = 'visteya'
+__date__ = 'April 2009'
+
+
+try:
+ from hashlib import md5
+except ImportError:
+ # Python 2.4 and earlier
+ from md5 import new as md5
+import time
+import base64
+import urllib2
+
+import cherrypy
+
+qop_auth = 'auth'
+qop_auth_int = 'auth-int'
+valid_qops = (qop_auth, qop_auth_int)
+
+valid_algorithms = ('MD5', 'MD5-sess')
+
+
+def TRACE(msg):
+ cherrypy.log(msg, context='auth_digest')
+
+# Three helper functions for users of the tool, providing three variants
+# of get_ha1() functions for three different kinds of credential stores.
+def get_ha1_dict_plain(user_password_dict):
+ """Returns a get_ha1 function which obtains a plaintext password from a
+ dictionary of the form: {username : password}.
+
+ If you want a simple dictionary-based authentication scheme, with plaintext
+ passwords, use get_ha1_dict_plain(my_userpass_dict) as the value for the
+ get_ha1_func argument to digest_auth().
+ """
+ def get_ha1(realm, username):
+ password = user_password_dict.get(username)
+ if password:
+ return md5('%s:%s:%s' % (username, realm, password)).hexdigest()
+ return None
+
+ return get_ha1
+
+def get_ha1_dict(user_ha1_dict):
+ """Returns a get_ha1 function which obtains a HA1 password hash from a
+ dictionary of the form: {username : HA1}.
+
+ If you want a dictionary-based authentication scheme, but with
+ pre-computed HA1 hashes instead of plain-text passwords, use
+ getpass_dict_ha1(my_userha1_dict) as the value for the get_ha1_func
+ argument to digest_auth().
+ """
+ def get_ha1(realm, username):
+ return user_ha1_dict.get(user)
+
+ return get_ha1
+
+def get_ha1_file_htdigest(filename):
+ """Returns a get_ha1 function which obtains a HA1 password hash from a
+ flat file with lines of the same format as that produced by the Apache
+ htdigest utility. For example, for realm 'wonderland', username 'alice',
+ and password '4x5istwelve', the htdigest line would be:
+
+ alice:wonderland:3238cdfe91a8b2ed8e39646921a02d4c
+
+ If you want to use an Apache htdigest file as the credentials store,
+ then use get_ha1_file_htdigest(my_htdigest_file) as the value for the
+ get_ha1_func argument to digest_auth(). It is recommended that the filename
+ argument be an absolute path, to avoid problems.
+ """
+ def get_ha1(realm, username):
+ result = None
+ f = open(filename, 'r')
+ for line in f:
+ u, r, ha1 = line.rstrip().split(':')
+ if u == username and r == realm:
+ result = ha1
+ break
+ f.close()
+ return result
+
+ return get_ha1
+
+
+def synthesize_nonce(s, key, timestamp=None):
+ """Synthesize a nonce value which resists spoofing and can be checked for staleness.
+ Returns a string suitable as the value for 'nonce' in the www-authenticate header.
+
+ Args:
+ s: a string related to the resource, such as the hostname of the server.
+ key: a secret string known only to the server.
+ timestamp: an integer seconds-since-the-epoch timestamp
+ """
+ if timestamp is None:
+ timestamp = int(time.time())
+ h = md5('%s:%s:%s' % (timestamp, s, key)).hexdigest()
+ nonce = '%s:%s' % (timestamp, h)
+ return nonce
+
+
+def H(s):
+ """The hash function H"""
+ return md5(s).hexdigest()
+
+
+class HttpDigestAuthorization (object):
+ """Class to parse a Digest Authorization header and perform re-calculation
+ of the digest.
+ """
+
+ def errmsg(self, s):
+ return 'Digest Authorization header: %s' % s
+
+ def __init__(self, auth_header, http_method, debug=False):
+ self.http_method = http_method
+ self.debug = debug
+ scheme, params = auth_header.split(" ", 1)
+ self.scheme = scheme.lower()
+ if self.scheme != 'digest':
+ raise ValueError('Authorization scheme is not "Digest"')
+
+ self.auth_header = auth_header
+
+ # make a dict of the params
+ items = urllib2.parse_http_list(params)
+ paramsd = urllib2.parse_keqv_list(items)
+
+ self.realm = paramsd.get('realm')
+ self.username = paramsd.get('username')
+ self.nonce = paramsd.get('nonce')
+ self.uri = paramsd.get('uri')
+ self.method = paramsd.get('method')
+ self.response = paramsd.get('response') # the response digest
+ self.algorithm = paramsd.get('algorithm', 'MD5')
+ self.cnonce = paramsd.get('cnonce')
+ self.opaque = paramsd.get('opaque')
+ self.qop = paramsd.get('qop') # qop
+ self.nc = paramsd.get('nc') # nonce count
+
+ # perform some correctness checks
+ if self.algorithm not in valid_algorithms:
+ raise ValueError(self.errmsg("Unsupported value for algorithm: '%s'" % self.algorithm))
+
+ has_reqd = self.username and \
+ self.realm and \
+ self.nonce and \
+ self.uri and \
+ self.response
+ if not has_reqd:
+ raise ValueError(self.errmsg("Not all required parameters are present."))
+
+ if self.qop:
+ if self.qop not in valid_qops:
+ raise ValueError(self.errmsg("Unsupported value for qop: '%s'" % self.qop))
+ if not (self.cnonce and self.nc):
+ raise ValueError(self.errmsg("If qop is sent then cnonce and nc MUST be present"))
+ else:
+ if self.cnonce or self.nc:
+ raise ValueError(self.errmsg("If qop is not sent, neither cnonce nor nc can be present"))
+
+
+ def __str__(self):
+ return 'authorization : %s' % self.auth_header
+
+ def validate_nonce(self, s, key):
+ """Validate the nonce.
+ Returns True if nonce was generated by synthesize_nonce() and the timestamp
+ is not spoofed, else returns False.
+
+ Args:
+ s: a string related to the resource, such as the hostname of the server.
+ key: a secret string known only to the server.
+ Both s and key must be the same values which were used to synthesize the nonce
+ we are trying to validate.
+ """
+ try:
+ timestamp, hashpart = self.nonce.split(':', 1)
+ s_timestamp, s_hashpart = synthesize_nonce(s, key, timestamp).split(':', 1)
+ is_valid = s_hashpart == hashpart
+ if self.debug:
+ TRACE('validate_nonce: %s' % is_valid)
+ return is_valid
+ except ValueError: # split() error
+ pass
+ return False
+
+
+ def is_nonce_stale(self, max_age_seconds=600):
+ """Returns True if a validated nonce is stale. The nonce contains a
+ timestamp in plaintext and also a secure hash of the timestamp. You should
+ first validate the nonce to ensure the plaintext timestamp is not spoofed.
+ """
+ try:
+ timestamp, hashpart = self.nonce.split(':', 1)
+ if int(timestamp) + max_age_seconds > int(time.time()):
+ return False
+ except ValueError: # int() error
+ pass
+ if self.debug:
+ TRACE("nonce is stale")
+ return True
+
+
+ def HA2(self, entity_body=''):
+ """Returns the H(A2) string. See RFC 2617 3.2.2.3."""
+ # RFC 2617 3.2.2.3
+ # If the "qop" directive's value is "auth" or is unspecified, then A2 is:
+ # A2 = method ":" digest-uri-value
+ #
+ # If the "qop" value is "auth-int", then A2 is:
+ # A2 = method ":" digest-uri-value ":" H(entity-body)
+ if self.qop is None or self.qop == "auth":
+ a2 = '%s:%s' % (self.http_method, self.uri)
+ elif self.qop == "auth-int":
+ a2 = "%s:%s:%s" % (self.http_method, self.uri, H(entity_body))
+ else:
+ # in theory, this should never happen, since I validate qop in __init__()
+ raise ValueError(self.errmsg("Unrecognized value for qop!"))
+ return H(a2)
+
+
+ def request_digest(self, ha1, entity_body=''):
+ """Calculates the Request-Digest. See RFC 2617 3.2.2.1.
+ Arguments:
+
+ ha1 : the HA1 string obtained from the credentials store.
+
+ entity_body : if 'qop' is set to 'auth-int', then A2 includes a hash
+ of the "entity body". The entity body is the part of the
+ message which follows the HTTP headers. See RFC 2617 section
+ 4.3. This refers to the entity the user agent sent in the request which
+ has the Authorization header. Typically GET requests don't have an entity,
+ and POST requests do.
+ """
+ ha2 = self.HA2(entity_body)
+ # Request-Digest -- RFC 2617 3.2.2.1
+ if self.qop:
+ req = "%s:%s:%s:%s:%s" % (self.nonce, self.nc, self.cnonce, self.qop, ha2)
+ else:
+ req = "%s:%s" % (self.nonce, ha2)
+
+ # RFC 2617 3.2.2.2
+ #
+ # If the "algorithm" directive's value is "MD5" or is unspecified, then A1 is:
+ # A1 = unq(username-value) ":" unq(realm-value) ":" passwd
+ #
+ # If the "algorithm" directive's value is "MD5-sess", then A1 is
+ # calculated only once - on the first request by the client following
+ # receipt of a WWW-Authenticate challenge from the server.
+ # A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd )
+ # ":" unq(nonce-value) ":" unq(cnonce-value)
+ if self.algorithm == 'MD5-sess':
+ ha1 = H('%s:%s:%s' % (ha1, self.nonce, self.cnonce))
+
+ digest = H('%s:%s' % (ha1, req))
+ return digest
+
+
+
+def www_authenticate(realm, key, algorithm='MD5', nonce=None, qop=qop_auth, stale=False):
+ """Constructs a WWW-Authenticate header for Digest authentication."""
+ if qop not in valid_qops:
+ raise ValueError("Unsupported value for qop: '%s'" % qop)
+ if algorithm not in valid_algorithms:
+ raise ValueError("Unsupported value for algorithm: '%s'" % algorithm)
+
+ if nonce is None:
+ nonce = synthesize_nonce(realm, key)
+ s = 'Digest realm="%s", nonce="%s", algorithm="%s", qop="%s"' % (
+ realm, nonce, algorithm, qop)
+ if stale:
+ s += ', stale="true"'
+ return s
+
+
+def digest_auth(realm, get_ha1_func, key, debug=False):
+ """digest_auth is a CherryPy tool which hooks at before_handler to perform
+ HTTP Digest Access Authentication, as specified in RFC 2617.
+
+ If the request has an 'authorization' header with a 'Digest' scheme, this
+ tool authenticates the credentials supplied in that header. If
+ the request has no 'authorization' header, or if it does but the scheme is
+ not "Digest", or if authentication fails, the tool sends a 401 response with
+ a 'WWW-Authenticate' Digest header.
+
+ Arguments:
+ realm: a string containing the authentication realm.
+
+ get_ha1_func: a function which looks up a username in a credentials store
+ and returns the HA1 string, which is defined in the RFC to be
+ MD5(username : realm : password). The function's signature is:
+ get_ha1(realm, username)
+ where username is obtained from the request's 'authorization' header.
+ If username is not found in the credentials store, get_ha1() returns
+ None.
+
+ key: a secret string known only to the server, used in the synthesis of nonces.
+ """
+
+ auth_header = cherrypy.request.headers.get('authorization')
+ nonce_is_stale = False
+ if auth_header is not None:
+ try:
+ auth = HttpDigestAuthorization(auth_header, cherrypy.request.method, debug=debug)
+ except ValueError, e:
+ raise cherrypy.HTTPError(400, 'Bad Request: %s' % e)
+
+ if debug:
+ TRACE(str(auth))
+
+ if auth.validate_nonce(realm, key):
+ ha1 = get_ha1_func(realm, auth.username)
+ if ha1 is not None:
+ # note that for request.body to be available we need to hook in at
+ # before_handler, not on_start_resource like 3.1.x digest_auth does.
+ digest = auth.request_digest(ha1, entity_body=cherrypy.request.body)
+ if digest == auth.response: # authenticated
+ if debug:
+ TRACE("digest matches auth.response")
+ # Now check if nonce is stale.
+ # The choice of ten minutes' lifetime for nonce is somewhat arbitrary
+ nonce_is_stale = auth.is_nonce_stale(max_age_seconds=600)
+ if not nonce_is_stale:
+ cherrypy.request.login = auth.username
+ if debug:
+ TRACE("authentication of %s successful" % auth.username)
+ return
+
+ # Respond with 401 status and a WWW-Authenticate header
+ header = www_authenticate(realm, key, stale=nonce_is_stale)
+ if debug:
+ TRACE(header)
+ cherrypy.response.headers['WWW-Authenticate'] = header
+ raise cherrypy.HTTPError(401, "You are not authorized to access that resource")
+
diff --git a/cherrypy/test/test_auth_basic.py b/cherrypy/test/test_auth_basic.py
new file mode 100644
index 00000000..982bb9a1
--- /dev/null
+++ b/cherrypy/test/test_auth_basic.py
@@ -0,0 +1,89 @@
+# This file is part of CherryPy <http://www.cherrypy.org/>
+# -*- coding: utf-8 -*-
+# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
+
+from cherrypy.test import test
+test.prefer_parent_path()
+
+try:
+ from hashlib import md5
+except ImportError:
+ # Python 2.4 and earlier
+ from md5 import new as md5
+
+import cherrypy
+from cherrypy.lib import auth_basic
+
+def setup_server():
+ class Root:
+ def index(self):
+ return "This is public."
+ index.exposed = True
+
+ class BasicProtected:
+ def index(self):
+ return "Hello %s, you've been authorized." % cherrypy.request.login
+ index.exposed = True
+
+ class BasicProtected2:
+ def index(self):
+ return "Hello %s, you've been authorized." % cherrypy.request.login
+ index.exposed = True
+
+ userpassdict = {'xuser' : 'xpassword'}
+ userhashdict = {'xuser' : md5('xpassword').hexdigest()}
+
+ def checkpasshash(realm, user, password):
+ p = userhashdict.get(user)
+ return p and p == md5(password).hexdigest() or False
+
+ conf = {'/basic': {'tools.auth_basic.on': True,
+ 'tools.auth_basic.realm': 'wonderland',
+ 'tools.auth_basic.checkpassword_func': auth_basic.checkpassword_dict(userpassdict)},
+ '/basic2': {'tools.auth_basic.on': True,
+ 'tools.auth_basic.realm': 'wonderland',
+ 'tools.auth_basic.checkpassword_func': checkpasshash},
+ }
+
+ root = Root()
+ root.basic = BasicProtected()
+ root.basic2 = BasicProtected2()
+ cherrypy.tree.mount(root, config=conf)
+
+from cherrypy.test import helper
+
+class BasicAuthTest(helper.CPWebCase):
+
+ def testPublic(self):
+ self.getPage("/")
+ self.assertStatus('200 OK')
+ self.assertHeader('Content-Type', 'text/html')
+ self.assertBody('This is public.')
+
+ def testBasic(self):
+ self.getPage("/basic/")
+ self.assertStatus(401)
+ self.assertHeader('WWW-Authenticate', 'Basic realm="wonderland"')
+
+ self.getPage('/basic/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3JX')])
+ self.assertStatus(401)
+
+ self.getPage('/basic/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3Jk')])
+ self.assertStatus('200 OK')
+ self.assertBody("Hello xuser, you've been authorized.")
+
+ def testBasic2(self):
+ self.getPage("/basic2/")
+ self.assertStatus(401)
+ self.assertHeader('WWW-Authenticate', 'Basic realm="wonderland"')
+
+ self.getPage('/basic2/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3JX')])
+ self.assertStatus(401)
+
+ self.getPage('/basic2/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3Jk')])
+ self.assertStatus('200 OK')
+ self.assertBody("Hello xuser, you've been authorized.")
+
+
+if __name__ == "__main__":
+ helper.testmain()
diff --git a/cherrypy/test/test_auth_digest.py b/cherrypy/test/test_auth_digest.py
new file mode 100644
index 00000000..15000bdf
--- /dev/null
+++ b/cherrypy/test/test_auth_digest.py
@@ -0,0 +1,120 @@
+# This file is part of CherryPy <http://www.cherrypy.org/>
+# -*- coding: utf-8 -*-
+# vim:ts=4:sw=4:expandtab:fileencoding=utf-8
+
+from cherrypy.test import test
+test.prefer_parent_path()
+
+
+import cherrypy
+from cherrypy.lib import auth_digest
+
+def setup_server():
+ class Root:
+ def index(self):
+ return "This is public."
+ index.exposed = True
+
+ class DigestProtected:
+ def index(self):
+ return "Hello %s, you've been authorized." % cherrypy.request.login
+ index.exposed = True
+
+ def fetch_users():
+ return {'test': 'test'}
+
+
+ get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users())
+ conf = {'/digest': {'tools.auth_digest.on': True,
+ 'tools.auth_digest.realm': 'localhost',
+ 'tools.auth_digest.get_ha1_func': get_ha1,
+ 'tools.auth_digest.key': 'a565c27146791cfb',
+ 'tools.auth_digest.debug': 'True'}}
+
+ root = Root()
+ root.digest = DigestProtected()
+ cherrypy.tree.mount(root, config=conf)
+
+from cherrypy.test import helper
+
+class DigestAuthTest(helper.CPWebCase):
+
+ def testPublic(self):
+ self.getPage("/")
+ self.assertStatus('200 OK')
+ self.assertHeader('Content-Type', 'text/html')
+ self.assertBody('This is public.')
+
+ def testDigest(self):
+ self.getPage("/digest/")
+ self.assertStatus(401)
+
+ value = None
+ for k, v in self.headers:
+ if k.lower() == "www-authenticate":
+ if v.startswith("Digest"):
+ value = v
+ break
+
+ if value is None:
+ self._handlewebError("Digest authentification scheme was not found")
+
+ value = value[7:]
+ items = value.split(', ')
+ tokens = {}
+ for item in items:
+ key, value = item.split('=')
+ tokens[key.lower()] = value
+
+ missing_msg = "%s is missing"
+ bad_value_msg = "'%s' was expecting '%s' but found '%s'"
+ nonce = None
+ if 'realm' not in tokens:
+ self._handlewebError(missing_msg % 'realm')
+ elif tokens['realm'] != '"localhost"':
+ self._handlewebError(bad_value_msg % ('realm', '"localhost"', tokens['realm']))
+ if 'nonce' not in tokens:
+ self._handlewebError(missing_msg % 'nonce')
+ else:
+ nonce = tokens['nonce'].strip('"')
+ if 'algorithm' not in tokens:
+ self._handlewebError(missing_msg % 'algorithm')
+ elif tokens['algorithm'] != '"MD5"':
+ self._handlewebError(bad_value_msg % ('algorithm', '"MD5"', tokens['algorithm']))
+ if 'qop' not in tokens:
+ self._handlewebError(missing_msg % 'qop')
+ elif tokens['qop'] != '"auth"':
+ self._handlewebError(bad_value_msg % ('qop', '"auth"', tokens['qop']))
+
+ get_ha1_func = auth_digest.get_ha1_dict_plain({'test' : 'test'})
+
+ # Test user agent response with a wrong value for 'realm'
+ base_auth = 'Digest username="test", realm="wrong realm", nonce="%s", uri="/digest/", algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"'
+
+ auth_header = base_auth % (nonce, '11111111111111111111111111111111', '00000001')
+ auth = auth_digest.HttpDigestAuthorization(auth_header, 'GET')
+ # calculate the response digest
+ ha1 = get_ha1_func(auth.realm, 'test')
+ response = auth.request_digest(ha1)
+ # send response with correct response digest, but wrong realm
+ auth_header = base_auth % (nonce, response, '00000001')
+ self.getPage('/digest/', [('Authorization', auth_header)])
+ self.assertStatus(401)
+
+ # Test that must pass
+ base_auth = 'Digest username="test", realm="localhost", nonce="%s", uri="/digest/", algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"'
+
+ auth_header = base_auth % (nonce, '11111111111111111111111111111111', '00000001')
+ auth = auth_digest.HttpDigestAuthorization(auth_header, 'GET')
+ # calculate the response digest
+ ha1 = get_ha1_func('localhost', 'test')
+ response = auth.request_digest(ha1)
+ # send response with correct response digest
+ auth_header = base_auth % (nonce, response, '00000001')
+ self.getPage('/digest/', [('Authorization', auth_header)])
+ self.assertStatus('200 OK')
+ self.assertBody("Hello test, you've been authorized.")
+
+if __name__ == "__main__":
+ helper.testmain()
+