diff options
author | Paul <paultax@gmail.com> | 2012-07-26 23:00:18 +0800 |
---|---|---|
committer | Paul <paultax@gmail.com> | 2012-07-26 23:00:18 +0800 |
commit | 5aef5663c075b56cd25dd781a457f2d7e365df01 (patch) | |
tree | 8dee18ed9fc9ed34ba54f243f2a6becc0e7a3836 /awsauth.py | |
parent | f33f8b8084e9cebed95bdaf08ed42a35220a08a0 (diff) | |
download | python-requests-aws-5aef5663c075b56cd25dd781a457f2d7e365df01.tar.gz |
Fixed issue #3
Added tests
Refactored get_canonical_string so it can be tested
Diffstat (limited to 'awsauth.py')
-rw-r--r-- | awsauth.py | 50 |
1 files changed, 17 insertions, 33 deletions
@@ -3,9 +3,11 @@ import base64 import hmac import urllib + from hashlib import sha1 as sha -from urlparse import urlparse, urlunparse +from urlparse import urlparse from email.utils import formatdate + from requests.auth import AuthBase @@ -21,7 +23,7 @@ class S3Auth(AuthBase): 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] - + def __init__(self, access_key, secret_key, service_url=None): if service_url: @@ -33,45 +35,32 @@ class S3Auth(AuthBase): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate(timeval=None, localtime=False, usegmt=True) + r.headers['Authorization'] = 'AWS %s:%s'%(self.access_key, self.get_signature(r)) return r def get_signature(self, r): - h = hmac.new(self.secret_key, self.get_canonical_string(r), digestmod=sha) + canonical_string = self.get_canonical_string(r.url, r.headers, r.method) + h = hmac.new(self.secret_key, canonical_string, digestmod=sha) return base64.encodestring(h.digest()).strip() - def get_canonical_string(self, r): - # r.url needs to be encoded as well - r_url = urlparse(r.url) - # what is a man to do? - parsedurl = list(r_url) - parsedurl[2] = urllib.quote_plus(parsedurl[2][1:]) # path includes the leading slash, which should not be part of the %-encoded string - r.url = urlunparse(parsedurl) - - parsedurl = urlparse(r.url) + def get_canonical_string(self, url, headers, method): + parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) - ''' bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] - ''' - # stolen from https://github.com/kennethreitz/python-requests-aws/blob/master/awsauth.py - bucket = '' - netloc_split = parsedurl.netloc.split('.') - if ((len(netloc_split) == 4) or - (len(netloc_split) == 3 and netloc_split[0].lower() != 's3')): - bucket = netloc_split[0] interesting_headers = {} ok_keys = ['content-md5', 'content-type', 'date'] - for key in r.headers: + for key in headers: lk = key.lower() - if r.headers[key] is not None and (lk in ok_keys or lk.startswith('x-amz-')): - interesting_headers[lk] = r.headers[key].strip() + if headers[key] is not None and (lk in ok_keys or lk.startswith('x-amz-')): + interesting_headers[lk] = headers[key].strip() # these keys get empty strings if they don't exist if not interesting_headers.has_key('content-type'): @@ -84,12 +73,8 @@ class S3Auth(AuthBase): if interesting_headers.has_key('x-amz-date'): interesting_headers['date'] = '' - sorted_header_keys = interesting_headers.keys() - sorted_header_keys.sort() - - buf = '%s\n' % r.method - - for key in sorted_header_keys: + buf = '%s\n' % method + for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): @@ -101,8 +86,8 @@ class S3Auth(AuthBase): if bucket != '': buf += '/%s' % bucket - # add the objectkey. even if it doesn't exist, add the slash - buf += '/%s' % objectkey + # add the objectkey. even if it doesn't exist, add the slash + buf += '/%s' % urllib.unquote(objectkey) params_found = False @@ -110,12 +95,11 @@ class S3Auth(AuthBase): for q in query_args: k = q.split('=')[0] if k in self.special_params: - if params_found: buf += '&%s' % q else: buf += '?%s' % q - params_found = True return buf + |