From 5aef5663c075b56cd25dd781a457f2d7e365df01 Mon Sep 17 00:00:00 2001 From: Paul Date: Thu, 26 Jul 2012 23:00:18 +0800 Subject: Fixed issue #3 Added tests Refactored get_canonical_string so it can be tested --- awsauth.py | 50 +++++++++++++++++--------------------------------- 1 file changed, 17 insertions(+), 33 deletions(-) (limited to 'awsauth.py') diff --git a/awsauth.py b/awsauth.py index 84b404f..8ab5e5f 100644 --- a/awsauth.py +++ b/awsauth.py @@ -3,9 +3,11 @@ import base64 import hmac import urllib + from hashlib import sha1 as sha -from urlparse import urlparse, urlunparse +from urlparse import urlparse from email.utils import formatdate + from requests.auth import AuthBase @@ -21,7 +23,7 @@ class S3Auth(AuthBase): 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] - + def __init__(self, access_key, secret_key, service_url=None): if service_url: @@ -33,45 +35,32 @@ class S3Auth(AuthBase): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate(timeval=None, localtime=False, usegmt=True) + r.headers['Authorization'] = 'AWS %s:%s'%(self.access_key, self.get_signature(r)) return r def get_signature(self, r): - h = hmac.new(self.secret_key, self.get_canonical_string(r), digestmod=sha) + canonical_string = self.get_canonical_string(r.url, r.headers, r.method) + h = hmac.new(self.secret_key, canonical_string, digestmod=sha) return base64.encodestring(h.digest()).strip() - def get_canonical_string(self, r): - # r.url needs to be encoded as well - r_url = urlparse(r.url) - # what is a man to do? - parsedurl = list(r_url) - parsedurl[2] = urllib.quote_plus(parsedurl[2][1:]) # path includes the leading slash, which should not be part of the %-encoded string - r.url = urlunparse(parsedurl) - - parsedurl = urlparse(r.url) + def get_canonical_string(self, url, headers, method): + parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) - ''' bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] - ''' - # stolen from https://github.com/kennethreitz/python-requests-aws/blob/master/awsauth.py - bucket = '' - netloc_split = parsedurl.netloc.split('.') - if ((len(netloc_split) == 4) or - (len(netloc_split) == 3 and netloc_split[0].lower() != 's3')): - bucket = netloc_split[0] interesting_headers = {} ok_keys = ['content-md5', 'content-type', 'date'] - for key in r.headers: + for key in headers: lk = key.lower() - if r.headers[key] is not None and (lk in ok_keys or lk.startswith('x-amz-')): - interesting_headers[lk] = r.headers[key].strip() + if headers[key] is not None and (lk in ok_keys or lk.startswith('x-amz-')): + interesting_headers[lk] = headers[key].strip() # these keys get empty strings if they don't exist if not interesting_headers.has_key('content-type'): @@ -84,12 +73,8 @@ class S3Auth(AuthBase): if interesting_headers.has_key('x-amz-date'): interesting_headers['date'] = '' - sorted_header_keys = interesting_headers.keys() - sorted_header_keys.sort() - - buf = '%s\n' % r.method - - for key in sorted_header_keys: + buf = '%s\n' % method + for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): @@ -101,8 +86,8 @@ class S3Auth(AuthBase): if bucket != '': buf += '/%s' % bucket - # add the objectkey. even if it doesn't exist, add the slash - buf += '/%s' % objectkey + # add the objectkey. even if it doesn't exist, add the slash + buf += '/%s' % urllib.unquote(objectkey) params_found = False @@ -110,12 +95,11 @@ class S3Auth(AuthBase): for q in query_args: k = q.split('=')[0] if k in self.special_params: - if params_found: buf += '&%s' % q else: buf += '?%s' % q - params_found = True return buf + -- cgit v1.2.1