diff options
-rw-r--r-- | cherrypy/_cptools.py | 3 | ||||
-rw-r--r-- | cherrypy/lib/auth_basic.py | 82 | ||||
-rw-r--r-- | cherrypy/lib/auth_digest.py | 355 | ||||
-rw-r--r-- | cherrypy/test/test_auth_basic.py | 89 | ||||
-rw-r--r-- | cherrypy/test/test_auth_digest.py | 120 |
5 files changed, 649 insertions, 0 deletions
diff --git a/cherrypy/_cptools.py b/cherrypy/_cptools.py index 6319d3ae..b6c011a5 100644 --- a/cherrypy/_cptools.py +++ b/cherrypy/_cptools.py @@ -235,6 +235,7 @@ class ErrorTool(Tool): from cherrypy.lib import cptools, encoding, auth, static, tidy, jsontools from cherrypy.lib import sessions as _sessions, xmlrpc as _xmlrpc from cherrypy.lib import caching as _caching, wsgiapp as _wsgiapp +from cherrypy.lib import auth_basic, auth_digest class SessionTool(Tool): @@ -500,5 +501,7 @@ _d.redirect = Tool('on_start_resource', cptools.redirect) _d.autovary = Tool('on_start_resource', cptools.autovary, priority=0) _d.json_in = Tool('before_handler', jsontools.json_in, priority=30) _d.json_out = Tool('before_handler', jsontools.json_out, priority=30) +_d.auth_basic = Tool('before_handler', auth_basic.basic_auth, priority=1) +_d.auth_digest = Tool('before_handler', auth_digest.digest_auth, priority=1) del _d, cptools, encoding, auth, static, tidy diff --git a/cherrypy/lib/auth_basic.py b/cherrypy/lib/auth_basic.py new file mode 100644 index 00000000..82770f3c --- /dev/null +++ b/cherrypy/lib/auth_basic.py @@ -0,0 +1,82 @@ +# This file is part of CherryPy <http://www.cherrypy.org/> +# -*- coding: utf-8 -*- +# vim:ts=4:sw=4:expandtab:fileencoding=utf-8 + +__doc__ = """Module auth_basic.py provides a CherryPy 3.x tool which implements +the server-side of HTTP Basic Access Authentication, as described in RFC 2617. + +Example usage, using the built-in checkpassword function which uses a dict +as the credentials store: + +userpassdict = {'bird' : 'bebop'} +checkpassword = cherrypy.lib.auth_basic.checkpassword_dict(userpassdict) +basic_auth = {'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'earth', + 'tools.auth_basic.checkpassword_func': checkpassword, +} +app_config = { '/' : basic_auth } +""" + +__author__ = 'visteya' +__date__ = 'April 2009' + + +import base64 +import cherrypy + + +def checkpassword_dict(user_password_dict): + """Returns a checkpassword function which checks credentials + against a dictionary of the form: {username : password}. + + If you want a simple dictionary-based authentication scheme, use + checkpassword_dict(my_credentials_dict) as the value for the + checkpassword_func argument to basic_auth(). + """ + def checkpassword(realm, user, password): + p = user_password_dict.get(user) + return p and p == password or False + + return checkpassword + + +def basic_auth(realm, checkpassword_func): + """basic_auth is a CherryPy tool which hooks at before_handler to perform + HTTP Basic Access Authentication, as specified in RFC 2617. + + If the request has an 'authorization' header with a 'Basic' scheme, this + tool attempts to authenticate the credentials supplied in that header. If + the request has no 'authorization' header, or if it does but the scheme is + not 'Basic', or if authentication fails, the tool sends a 401 response with + a 'WWW-Authenticate' Basic header. + + Arguments: + realm: a string containing the authentication realm. + + checkpassword_func: a function which checks the authentication credentials. + Its signature is checkpassword(realm, username, password). where + username and password are the values obtained from the request's + 'authorization' header. If authentication succeeds, checkpassword_func + returns True, else it returns False. + """ + if '"' in realm: + raise ValueError, 'Realm cannot contain the " (quote) character.' + + auth_header = cherrypy.request.headers.get('authorization') + if auth_header is not None: + try: + scheme, params = auth_header.split(' ', 1) + if scheme.lower() == 'basic': + # since CherryPy claims compability with Python 2.3, we must use + # the legacy API of base64 + username, password = base64.decodestring(params).split(':', 1) + if checkpassword_func(realm, username, password): + cherrypy.request.login = username + return # successful authentication + except (ValueError, binascii.Error): # split() error, base64.decodestring() error + raise cherrypy.HTTPError(400, 'Bad Request') + + # Respond with 401 status and a WWW-Authenticate header + cherrypy.response.headers['www-authenticate'] = 'Basic realm="%s"' % realm + raise cherrypy.HTTPError(401, "You are not authorized to access that resource") + diff --git a/cherrypy/lib/auth_digest.py b/cherrypy/lib/auth_digest.py new file mode 100644 index 00000000..00a4f1d7 --- /dev/null +++ b/cherrypy/lib/auth_digest.py @@ -0,0 +1,355 @@ +# This file is part of CherryPy <http://www.cherrypy.org/> +# -*- coding: utf-8 -*- +# vim:ts=4:sw=4:expandtab:fileencoding=utf-8 + +__doc__ = """An implementation of the server-side of HTTP Digest Access +Authentication, which is described in RFC 2617. + +Example usage, using the built-in get_ha1 function which uses a dict +of plaintext passwords as the credentials store: + +userpassdict = {'alice' : '4x5istwelve'} +get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(userpassdict) +digest_auth = {'tools.auth_digest.on': True, + 'tools.auth_digest.realm': 'wonderland', + 'tools.auth_digest.get_ha1_func': get_ha1, + 'tools.auth_digest.key': 'a565c27146791cfb', +} +app_config = { '/' : digest_auth } +""" + +__author__ = 'visteya' +__date__ = 'April 2009' + + +try: + from hashlib import md5 +except ImportError: + # Python 2.4 and earlier + from md5 import new as md5 +import time +import base64 +import urllib2 + +import cherrypy + +qop_auth = 'auth' +qop_auth_int = 'auth-int' +valid_qops = (qop_auth, qop_auth_int) + +valid_algorithms = ('MD5', 'MD5-sess') + + +def TRACE(msg): + cherrypy.log(msg, context='auth_digest') + +# Three helper functions for users of the tool, providing three variants +# of get_ha1() functions for three different kinds of credential stores. +def get_ha1_dict_plain(user_password_dict): + """Returns a get_ha1 function which obtains a plaintext password from a + dictionary of the form: {username : password}. + + If you want a simple dictionary-based authentication scheme, with plaintext + passwords, use get_ha1_dict_plain(my_userpass_dict) as the value for the + get_ha1_func argument to digest_auth(). + """ + def get_ha1(realm, username): + password = user_password_dict.get(username) + if password: + return md5('%s:%s:%s' % (username, realm, password)).hexdigest() + return None + + return get_ha1 + +def get_ha1_dict(user_ha1_dict): + """Returns a get_ha1 function which obtains a HA1 password hash from a + dictionary of the form: {username : HA1}. + + If you want a dictionary-based authentication scheme, but with + pre-computed HA1 hashes instead of plain-text passwords, use + getpass_dict_ha1(my_userha1_dict) as the value for the get_ha1_func + argument to digest_auth(). + """ + def get_ha1(realm, username): + return user_ha1_dict.get(user) + + return get_ha1 + +def get_ha1_file_htdigest(filename): + """Returns a get_ha1 function which obtains a HA1 password hash from a + flat file with lines of the same format as that produced by the Apache + htdigest utility. For example, for realm 'wonderland', username 'alice', + and password '4x5istwelve', the htdigest line would be: + + alice:wonderland:3238cdfe91a8b2ed8e39646921a02d4c + + If you want to use an Apache htdigest file as the credentials store, + then use get_ha1_file_htdigest(my_htdigest_file) as the value for the + get_ha1_func argument to digest_auth(). It is recommended that the filename + argument be an absolute path, to avoid problems. + """ + def get_ha1(realm, username): + result = None + f = open(filename, 'r') + for line in f: + u, r, ha1 = line.rstrip().split(':') + if u == username and r == realm: + result = ha1 + break + f.close() + return result + + return get_ha1 + + +def synthesize_nonce(s, key, timestamp=None): + """Synthesize a nonce value which resists spoofing and can be checked for staleness. + Returns a string suitable as the value for 'nonce' in the www-authenticate header. + + Args: + s: a string related to the resource, such as the hostname of the server. + key: a secret string known only to the server. + timestamp: an integer seconds-since-the-epoch timestamp + """ + if timestamp is None: + timestamp = int(time.time()) + h = md5('%s:%s:%s' % (timestamp, s, key)).hexdigest() + nonce = '%s:%s' % (timestamp, h) + return nonce + + +def H(s): + """The hash function H""" + return md5(s).hexdigest() + + +class HttpDigestAuthorization (object): + """Class to parse a Digest Authorization header and perform re-calculation + of the digest. + """ + + def errmsg(self, s): + return 'Digest Authorization header: %s' % s + + def __init__(self, auth_header, http_method, debug=False): + self.http_method = http_method + self.debug = debug + scheme, params = auth_header.split(" ", 1) + self.scheme = scheme.lower() + if self.scheme != 'digest': + raise ValueError('Authorization scheme is not "Digest"') + + self.auth_header = auth_header + + # make a dict of the params + items = urllib2.parse_http_list(params) + paramsd = urllib2.parse_keqv_list(items) + + self.realm = paramsd.get('realm') + self.username = paramsd.get('username') + self.nonce = paramsd.get('nonce') + self.uri = paramsd.get('uri') + self.method = paramsd.get('method') + self.response = paramsd.get('response') # the response digest + self.algorithm = paramsd.get('algorithm', 'MD5') + self.cnonce = paramsd.get('cnonce') + self.opaque = paramsd.get('opaque') + self.qop = paramsd.get('qop') # qop + self.nc = paramsd.get('nc') # nonce count + + # perform some correctness checks + if self.algorithm not in valid_algorithms: + raise ValueError(self.errmsg("Unsupported value for algorithm: '%s'" % self.algorithm)) + + has_reqd = self.username and \ + self.realm and \ + self.nonce and \ + self.uri and \ + self.response + if not has_reqd: + raise ValueError(self.errmsg("Not all required parameters are present.")) + + if self.qop: + if self.qop not in valid_qops: + raise ValueError(self.errmsg("Unsupported value for qop: '%s'" % self.qop)) + if not (self.cnonce and self.nc): + raise ValueError(self.errmsg("If qop is sent then cnonce and nc MUST be present")) + else: + if self.cnonce or self.nc: + raise ValueError(self.errmsg("If qop is not sent, neither cnonce nor nc can be present")) + + + def __str__(self): + return 'authorization : %s' % self.auth_header + + def validate_nonce(self, s, key): + """Validate the nonce. + Returns True if nonce was generated by synthesize_nonce() and the timestamp + is not spoofed, else returns False. + + Args: + s: a string related to the resource, such as the hostname of the server. + key: a secret string known only to the server. + Both s and key must be the same values which were used to synthesize the nonce + we are trying to validate. + """ + try: + timestamp, hashpart = self.nonce.split(':', 1) + s_timestamp, s_hashpart = synthesize_nonce(s, key, timestamp).split(':', 1) + is_valid = s_hashpart == hashpart + if self.debug: + TRACE('validate_nonce: %s' % is_valid) + return is_valid + except ValueError: # split() error + pass + return False + + + def is_nonce_stale(self, max_age_seconds=600): + """Returns True if a validated nonce is stale. The nonce contains a + timestamp in plaintext and also a secure hash of the timestamp. You should + first validate the nonce to ensure the plaintext timestamp is not spoofed. + """ + try: + timestamp, hashpart = self.nonce.split(':', 1) + if int(timestamp) + max_age_seconds > int(time.time()): + return False + except ValueError: # int() error + pass + if self.debug: + TRACE("nonce is stale") + return True + + + def HA2(self, entity_body=''): + """Returns the H(A2) string. See RFC 2617 3.2.2.3.""" + # RFC 2617 3.2.2.3 + # If the "qop" directive's value is "auth" or is unspecified, then A2 is: + # A2 = method ":" digest-uri-value + # + # If the "qop" value is "auth-int", then A2 is: + # A2 = method ":" digest-uri-value ":" H(entity-body) + if self.qop is None or self.qop == "auth": + a2 = '%s:%s' % (self.http_method, self.uri) + elif self.qop == "auth-int": + a2 = "%s:%s:%s" % (self.http_method, self.uri, H(entity_body)) + else: + # in theory, this should never happen, since I validate qop in __init__() + raise ValueError(self.errmsg("Unrecognized value for qop!")) + return H(a2) + + + def request_digest(self, ha1, entity_body=''): + """Calculates the Request-Digest. See RFC 2617 3.2.2.1. + Arguments: + + ha1 : the HA1 string obtained from the credentials store. + + entity_body : if 'qop' is set to 'auth-int', then A2 includes a hash + of the "entity body". The entity body is the part of the + message which follows the HTTP headers. See RFC 2617 section + 4.3. This refers to the entity the user agent sent in the request which + has the Authorization header. Typically GET requests don't have an entity, + and POST requests do. + """ + ha2 = self.HA2(entity_body) + # Request-Digest -- RFC 2617 3.2.2.1 + if self.qop: + req = "%s:%s:%s:%s:%s" % (self.nonce, self.nc, self.cnonce, self.qop, ha2) + else: + req = "%s:%s" % (self.nonce, ha2) + + # RFC 2617 3.2.2.2 + # + # If the "algorithm" directive's value is "MD5" or is unspecified, then A1 is: + # A1 = unq(username-value) ":" unq(realm-value) ":" passwd + # + # If the "algorithm" directive's value is "MD5-sess", then A1 is + # calculated only once - on the first request by the client following + # receipt of a WWW-Authenticate challenge from the server. + # A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd ) + # ":" unq(nonce-value) ":" unq(cnonce-value) + if self.algorithm == 'MD5-sess': + ha1 = H('%s:%s:%s' % (ha1, self.nonce, self.cnonce)) + + digest = H('%s:%s' % (ha1, req)) + return digest + + + +def www_authenticate(realm, key, algorithm='MD5', nonce=None, qop=qop_auth, stale=False): + """Constructs a WWW-Authenticate header for Digest authentication.""" + if qop not in valid_qops: + raise ValueError("Unsupported value for qop: '%s'" % qop) + if algorithm not in valid_algorithms: + raise ValueError("Unsupported value for algorithm: '%s'" % algorithm) + + if nonce is None: + nonce = synthesize_nonce(realm, key) + s = 'Digest realm="%s", nonce="%s", algorithm="%s", qop="%s"' % ( + realm, nonce, algorithm, qop) + if stale: + s += ', stale="true"' + return s + + +def digest_auth(realm, get_ha1_func, key, debug=False): + """digest_auth is a CherryPy tool which hooks at before_handler to perform + HTTP Digest Access Authentication, as specified in RFC 2617. + + If the request has an 'authorization' header with a 'Digest' scheme, this + tool authenticates the credentials supplied in that header. If + the request has no 'authorization' header, or if it does but the scheme is + not "Digest", or if authentication fails, the tool sends a 401 response with + a 'WWW-Authenticate' Digest header. + + Arguments: + realm: a string containing the authentication realm. + + get_ha1_func: a function which looks up a username in a credentials store + and returns the HA1 string, which is defined in the RFC to be + MD5(username : realm : password). The function's signature is: + get_ha1(realm, username) + where username is obtained from the request's 'authorization' header. + If username is not found in the credentials store, get_ha1() returns + None. + + key: a secret string known only to the server, used in the synthesis of nonces. + """ + + auth_header = cherrypy.request.headers.get('authorization') + nonce_is_stale = False + if auth_header is not None: + try: + auth = HttpDigestAuthorization(auth_header, cherrypy.request.method, debug=debug) + except ValueError, e: + raise cherrypy.HTTPError(400, 'Bad Request: %s' % e) + + if debug: + TRACE(str(auth)) + + if auth.validate_nonce(realm, key): + ha1 = get_ha1_func(realm, auth.username) + if ha1 is not None: + # note that for request.body to be available we need to hook in at + # before_handler, not on_start_resource like 3.1.x digest_auth does. + digest = auth.request_digest(ha1, entity_body=cherrypy.request.body) + if digest == auth.response: # authenticated + if debug: + TRACE("digest matches auth.response") + # Now check if nonce is stale. + # The choice of ten minutes' lifetime for nonce is somewhat arbitrary + nonce_is_stale = auth.is_nonce_stale(max_age_seconds=600) + if not nonce_is_stale: + cherrypy.request.login = auth.username + if debug: + TRACE("authentication of %s successful" % auth.username) + return + + # Respond with 401 status and a WWW-Authenticate header + header = www_authenticate(realm, key, stale=nonce_is_stale) + if debug: + TRACE(header) + cherrypy.response.headers['WWW-Authenticate'] = header + raise cherrypy.HTTPError(401, "You are not authorized to access that resource") + diff --git a/cherrypy/test/test_auth_basic.py b/cherrypy/test/test_auth_basic.py new file mode 100644 index 00000000..982bb9a1 --- /dev/null +++ b/cherrypy/test/test_auth_basic.py @@ -0,0 +1,89 @@ +# This file is part of CherryPy <http://www.cherrypy.org/> +# -*- coding: utf-8 -*- +# vim:ts=4:sw=4:expandtab:fileencoding=utf-8 + +from cherrypy.test import test +test.prefer_parent_path() + +try: + from hashlib import md5 +except ImportError: + # Python 2.4 and earlier + from md5 import new as md5 + +import cherrypy +from cherrypy.lib import auth_basic + +def setup_server(): + class Root: + def index(self): + return "This is public." + index.exposed = True + + class BasicProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + class BasicProtected2: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + userpassdict = {'xuser' : 'xpassword'} + userhashdict = {'xuser' : md5('xpassword').hexdigest()} + + def checkpasshash(realm, user, password): + p = userhashdict.get(user) + return p and p == md5(password).hexdigest() or False + + conf = {'/basic': {'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'wonderland', + 'tools.auth_basic.checkpassword_func': auth_basic.checkpassword_dict(userpassdict)}, + '/basic2': {'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'wonderland', + 'tools.auth_basic.checkpassword_func': checkpasshash}, + } + + root = Root() + root.basic = BasicProtected() + root.basic2 = BasicProtected2() + cherrypy.tree.mount(root, config=conf) + +from cherrypy.test import helper + +class BasicAuthTest(helper.CPWebCase): + + def testPublic(self): + self.getPage("/") + self.assertStatus('200 OK') + self.assertHeader('Content-Type', 'text/html') + self.assertBody('This is public.') + + def testBasic(self): + self.getPage("/basic/") + self.assertStatus(401) + self.assertHeader('WWW-Authenticate', 'Basic realm="wonderland"') + + self.getPage('/basic/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3JX')]) + self.assertStatus(401) + + self.getPage('/basic/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3Jk')]) + self.assertStatus('200 OK') + self.assertBody("Hello xuser, you've been authorized.") + + def testBasic2(self): + self.getPage("/basic2/") + self.assertStatus(401) + self.assertHeader('WWW-Authenticate', 'Basic realm="wonderland"') + + self.getPage('/basic2/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3JX')]) + self.assertStatus(401) + + self.getPage('/basic2/', [('Authorization', 'Basic eHVzZXI6eHBhc3N3b3Jk')]) + self.assertStatus('200 OK') + self.assertBody("Hello xuser, you've been authorized.") + + +if __name__ == "__main__": + helper.testmain() diff --git a/cherrypy/test/test_auth_digest.py b/cherrypy/test/test_auth_digest.py new file mode 100644 index 00000000..15000bdf --- /dev/null +++ b/cherrypy/test/test_auth_digest.py @@ -0,0 +1,120 @@ +# This file is part of CherryPy <http://www.cherrypy.org/> +# -*- coding: utf-8 -*- +# vim:ts=4:sw=4:expandtab:fileencoding=utf-8 + +from cherrypy.test import test +test.prefer_parent_path() + + +import cherrypy +from cherrypy.lib import auth_digest + +def setup_server(): + class Root: + def index(self): + return "This is public." + index.exposed = True + + class DigestProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + def fetch_users(): + return {'test': 'test'} + + + get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users()) + conf = {'/digest': {'tools.auth_digest.on': True, + 'tools.auth_digest.realm': 'localhost', + 'tools.auth_digest.get_ha1_func': get_ha1, + 'tools.auth_digest.key': 'a565c27146791cfb', + 'tools.auth_digest.debug': 'True'}} + + root = Root() + root.digest = DigestProtected() + cherrypy.tree.mount(root, config=conf) + +from cherrypy.test import helper + +class DigestAuthTest(helper.CPWebCase): + + def testPublic(self): + self.getPage("/") + self.assertStatus('200 OK') + self.assertHeader('Content-Type', 'text/html') + self.assertBody('This is public.') + + def testDigest(self): + self.getPage("/digest/") + self.assertStatus(401) + + value = None + for k, v in self.headers: + if k.lower() == "www-authenticate": + if v.startswith("Digest"): + value = v + break + + if value is None: + self._handlewebError("Digest authentification scheme was not found") + + value = value[7:] + items = value.split(', ') + tokens = {} + for item in items: + key, value = item.split('=') + tokens[key.lower()] = value + + missing_msg = "%s is missing" + bad_value_msg = "'%s' was expecting '%s' but found '%s'" + nonce = None + if 'realm' not in tokens: + self._handlewebError(missing_msg % 'realm') + elif tokens['realm'] != '"localhost"': + self._handlewebError(bad_value_msg % ('realm', '"localhost"', tokens['realm'])) + if 'nonce' not in tokens: + self._handlewebError(missing_msg % 'nonce') + else: + nonce = tokens['nonce'].strip('"') + if 'algorithm' not in tokens: + self._handlewebError(missing_msg % 'algorithm') + elif tokens['algorithm'] != '"MD5"': + self._handlewebError(bad_value_msg % ('algorithm', '"MD5"', tokens['algorithm'])) + if 'qop' not in tokens: + self._handlewebError(missing_msg % 'qop') + elif tokens['qop'] != '"auth"': + self._handlewebError(bad_value_msg % ('qop', '"auth"', tokens['qop'])) + + get_ha1_func = auth_digest.get_ha1_dict_plain({'test' : 'test'}) + + # Test user agent response with a wrong value for 'realm' + base_auth = 'Digest username="test", realm="wrong realm", nonce="%s", uri="/digest/", algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"' + + auth_header = base_auth % (nonce, '11111111111111111111111111111111', '00000001') + auth = auth_digest.HttpDigestAuthorization(auth_header, 'GET') + # calculate the response digest + ha1 = get_ha1_func(auth.realm, 'test') + response = auth.request_digest(ha1) + # send response with correct response digest, but wrong realm + auth_header = base_auth % (nonce, response, '00000001') + self.getPage('/digest/', [('Authorization', auth_header)]) + self.assertStatus(401) + + # Test that must pass + base_auth = 'Digest username="test", realm="localhost", nonce="%s", uri="/digest/", algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"' + + auth_header = base_auth % (nonce, '11111111111111111111111111111111', '00000001') + auth = auth_digest.HttpDigestAuthorization(auth_header, 'GET') + # calculate the response digest + ha1 = get_ha1_func('localhost', 'test') + response = auth.request_digest(ha1) + # send response with correct response digest + auth_header = base_auth % (nonce, response, '00000001') + self.getPage('/digest/', [('Authorization', auth_header)]) + self.assertStatus('200 OK') + self.assertBody("Hello test, you've been authorized.") + +if __name__ == "__main__": + helper.testmain() + |