From 50c6fdf46a9743376e6a381176bdd8a3e41e141a Mon Sep 17 00:00:00 2001 From: Paul Date: Sat, 17 Jan 2015 10:34:51 +0100 Subject: PY3 and PEP8 fixes --- awsauth.py | 5 ++--- example.py | 36 ++++++++++++++++----------------- test.py | 67 +++++++++++++++++++++++++++++++++++++++----------------------- 3 files changed, 62 insertions(+), 46 deletions(-) diff --git a/awsauth.py b/awsauth.py index 9d598ca..c3d5c0a 100644 --- a/awsauth.py +++ b/awsauth.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import base64 import hmac from hashlib import sha1 as sha @@ -7,7 +6,6 @@ py3k = False try: from urlparse import urlparse from base64 import encodestring - except: py3k = True from urllib.parse import urlparse @@ -84,7 +82,8 @@ class S3Auth(AuthBase): lk = lk.decode('utf-8') except: pass - if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')): + if headers[key] and (lk in interesting_headers.keys() + or lk.startswith('x-amz-')): interesting_headers[lk] = headers[key].strip() # If x-amz-date is used it supersedes the date header. diff --git a/example.py b/example.py index e9082d4..cab545a 100644 --- a/example.py +++ b/example.py @@ -1,42 +1,42 @@ #!/usr/bin/env python from __future__ import print_function - import requests - from awsauth import S3Auth -import gzip - -import urllib - ACCESS_KEY = "ACCESSKEYXXXXXXXXXXXX" SECRET_KEY = "AWSSECRETKEYXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" -acceptableAccessCodes = (200, 204) # # https://forums.aws.amazon.com/thread.jspa?threadID=28799: http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTObjectDELETE.html +# https://forums.aws.amazon.com/thread.jspa?threadID=28799: +# http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTObjectDELETE.html +acceptable_accesscodes = (200, 204) if __name__ == '__main__': - confirmIt = u'Sam is sweet' # Data needs to be in unicode, or it will fail + # Data needs to be in unicode, or it will fail + data = u'Sam is sweet' - bucketName = 'mybucket' - objectName = ['myfile.txt', 'my+file.txt'] + bucket = 'mybucket' + object_name = ['myfile.txt', 'my+file.txt'] - for o in objectName: + for o in object_name: # Creating a file - r = requests.put(('http://%s.s3.amazonaws.com/%s' % (bucketName, o)), data=confirmIt, auth=S3Auth(ACCESS_KEY, SECRET_KEY)) - if r.status_code not in acceptableAccessCodes: + url = 'http://{0}.s3.amazonaws.com/{1}'.format(bucket, o) + r = requests.put(url, data=data, auth=S3Auth(ACCESS_KEY, SECRET_KEY)) + if r.status_code not in acceptable_accesscodes: r.raise_for_status() # Downloading a file - r = requests.get(('http://%s.s3.amazonaws.com/%s' % (bucketName, o)), auth=S3Auth(ACCESS_KEY, SECRET_KEY)) - if r.status_code not in acceptableAccessCodes: + url = 'http://{0}.s3.amazonaws.com/{1}'.format(bucket, o) + r = requests.get(url, auth=S3Auth(ACCESS_KEY, SECRET_KEY)) + if r.status_code not in acceptable_accesscodes: r.raise_for_status() - if r.content == confirmIt: + if r.content == data: print('Hala Madrid!') # Removing a file - r = requests.delete(('http://%s.s3.amazonaws.com/%s' % (bucketName, o)), auth=S3Auth(ACCESS_KEY, SECRET_KEY)) - if r.status_code not in acceptableAccessCodes: + url = 'http://{0}.s3.amazonaws.com/{1}'.format(bucket, o) + r = requests.delete(url, auth=S3Auth(ACCESS_KEY, SECRET_KEY)) + if r.status_code not in acceptable_accesscodes: r.raise_for_status() diff --git a/test.py b/test.py index 719cd66..9d7b3fc 100644 --- a/test.py +++ b/test.py @@ -1,12 +1,19 @@ import unittest import os import requests -from awsauth import S3Auth import hashlib -import base64 +import sys +from awsauth import S3Auth + +PY3 = sys.version > '3' + +if PY3: + from base64 import encodebytes as encodestring +else: + from base64 import encodestring -TEST_BUCKET = 'testpolpol' +TEST_BUCKET = 'testpolpol2' ACCESS_KEY = 'ACCESSKEYXXXXXXXXXXXX' SECRET_KEY = 'AWSSECRETKEYXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' if 'AWS_ACCESS_KEY' in os.environ: @@ -14,31 +21,40 @@ if 'AWS_ACCESS_KEY' in os.environ: if 'AWS_SECRET_KEY' in os.environ: SECRET_KEY = os.environ['AWS_SECRET_KEY'] + class TestAWS(unittest.TestCase): def setUp(self): - self.auth=S3Auth(ACCESS_KEY, SECRET_KEY) - + self.auth = S3Auth(ACCESS_KEY, SECRET_KEY) + + def get_content_md5(self, data): + hashdig = hashlib.md5(data.encode('utf-8').strip()).digest() + signature = encodestring(hashdig) + if PY3: + return signature.decode('utf-8').strip() + return signature.strip() + def test_put_get_delete(self): + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/myfile.txt' testdata = 'Sam is sweet' - r = requests.put('http://'+ TEST_BUCKET + '.s3.amazonaws.com/myfile.txt', data=testdata, auth=self.auth) + r = requests.put(url, data=testdata, auth=self.auth) self.assertEqual(r.status_code, 200) # Downloading a file - r = requests.get('http://'+ TEST_BUCKET + '.s3.amazonaws.com/myfile.txt', auth=self.auth) - self.assertEqual(r.status_code, 200) + r = requests.get(url, auth=self.auth) + self.assertEqual(r.status_code, 200) self.assertEqual(r.text, 'Sam is sweet') # Removing a file - r = requests.delete('http://'+ TEST_BUCKET + '.s3.amazonaws.com/myfile.txt', auth=self.auth) - self.assertEqual(r.status_code, 204) - + r = requests.delete(url, auth=self.auth) + self.assertEqual(r.status_code, 204) + def test_put_get_delete_filnamehasplus(self): testdata = 'Sam is sweet' filename = 'my+file.txt' - url = 'http://'+ TEST_BUCKET + '.s3.amazonaws.com/%s'%(filename) + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/%s' % (filename) r = requests.put(url, data=testdata, auth=self.auth) self.assertEqual(r.status_code, 200) # Downloading a file r = requests.get(url, auth=self.auth) - self.assertEqual(r.status_code, 200) + self.assertEqual(r.status_code, 200) self.assertEqual(r.text, testdata) # Removing a file r = requests.delete(url, auth=self.auth) @@ -47,19 +63,19 @@ class TestAWS(unittest.TestCase): def test_put_get_delete_filname_encoded(self): testdata = 'Sam is sweet' filename = 'my%20file.txt' - url = 'http://'+ TEST_BUCKET + '.s3.amazonaws.com/%s'%(filename) + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/%s' % (filename) r = requests.put(url, data=testdata, auth=self.auth) self.assertEqual(r.status_code, 200) # Downloading a file r = requests.get(url, auth=self.auth) - self.assertEqual(r.status_code, 200) + self.assertEqual(r.status_code, 200) self.assertEqual(r.text, testdata) # Removing a file r = requests.delete(url, auth=self.auth) self.assertEqual(r.status_code, 204) def test_put_get_delete_cors(self): - url = 'http://'+ TEST_BUCKET + '.s3.amazonaws.com/?cors' + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/?cors' testdata = '\ \ *\ @@ -68,8 +84,8 @@ class TestAWS(unittest.TestCase): Authorization\ \ ' - content_md5 = base64.encodestring(hashlib.md5(testdata.strip()).digest()).strip() - r = requests.put(url, data=testdata, auth=self.auth, headers={'content-md5': content_md5}) + headers = {'content-md5': self.get_content_md5(testdata)} + r = requests.put(url, data=testdata, auth=self.auth, headers=headers) self.assertEqual(r.status_code, 200) # Downloading current cors configuration r = requests.get(url, auth=self.auth) @@ -79,7 +95,7 @@ class TestAWS(unittest.TestCase): self.assertEqual(r.status_code, 204) def test_put_get_delete_tagging(self): - url = 'http://'+ TEST_BUCKET + '.s3.amazonaws.com/?tagging' + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/?tagging' testdata = '\ \ \ @@ -88,8 +104,8 @@ class TestAWS(unittest.TestCase): \ \ ' - content_md5 = base64.encodestring(hashlib.md5(testdata.strip()).digest()).strip() - r = requests.put(url, data=testdata, auth=self.auth, headers={'content-md5': content_md5}) + headers = {'content-md5': self.get_content_md5(testdata)} + r = requests.put(url, data=testdata, auth=self.auth, headers=headers) self.assertEqual(r.status_code, 204) # Downloading current cors configuration r = requests.get(url, auth=self.auth) @@ -99,15 +115,16 @@ class TestAWS(unittest.TestCase): self.assertEqual(r.status_code, 204) def test_put_get_notification(self): - url = 'http://'+ TEST_BUCKET + '.s3.amazonaws.com/?notification' + url = 'http://' + TEST_BUCKET + '.s3.amazonaws.com/?notification' testdata = '' - content_md5 = base64.encodestring(hashlib.md5(testdata.strip()).digest()).strip() - r = requests.put(url, data=testdata, auth=self.auth, headers={'content-md5': content_md5}) + headers = {'content-md5': self.get_content_md5(testdata)} + r = requests.put(url, data=testdata, auth=self.auth, headers=headers) self.assertEqual(r.status_code, 200) # Downloading current cors configuration r = requests.get(url, auth=self.auth) self.assertEqual(r.status_code, 200) - # No Delete ?notification API, empty tag is default + # No Delete ?notification API, empty + # tag is default if __name__ == '__main__': unittest.main() -- cgit v1.2.1