summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-08-07 23:43:13 +0000
committerGerrit Code Review <review@openstack.org>2016-08-07 23:43:13 +0000
commit2766744ba1a8296990882420714a794e7a4f216f (patch)
tree12a5fe1b3f3ac700e11eecc188c33702dfa13643
parent92178ac17c9549fcf88d39c50ecadfd2979b9c44 (diff)
parentdca4ea5fd6e1a4eb15eeea9879913f64f2851446 (diff)
downloadpython-swiftclient-2766744ba1a8296990882420714a794e7a4f216f.tar.gz
Merge "Fix unicode issues in tempurl command"
-rw-r--r--swiftclient/utils.py45
-rw-r--r--tests/unit/test_utils.py142
2 files changed, 137 insertions, 50 deletions
diff --git a/swiftclient/utils.py b/swiftclient/utils.py
index 0abaed6..10687bf 100644
--- a/swiftclient/utils.py
+++ b/swiftclient/utils.py
@@ -78,15 +78,20 @@ def generate_temp_url(path, seconds, key, method, absolute=False):
:raises: TypeError if seconds is not an integer
:return: the path portion of a temporary URL
"""
- if seconds < 0:
- raise ValueError('seconds must be a positive integer')
try:
- if not absolute:
- expiration = int(time.time() + seconds)
- else:
- expiration = int(seconds)
- except TypeError:
+ seconds = int(seconds)
+ except ValueError:
raise TypeError('seconds must be an integer')
+ if seconds < 0:
+ raise ValueError('seconds must be a positive integer')
+
+ if isinstance(path, six.binary_type):
+ try:
+ path_for_body = path.decode('utf-8')
+ except UnicodeDecodeError:
+ raise ValueError('path must be representable as UTF-8')
+ else:
+ path_for_body = path
standard_methods = ['GET', 'PUT', 'HEAD', 'POST', 'DELETE']
if method.upper() not in standard_methods:
@@ -94,18 +99,24 @@ def generate_temp_url(path, seconds, key, method, absolute=False):
logger.warning('Non default HTTP method %s for tempurl specified, '
'possibly an error', method.upper())
- hmac_body = '\n'.join([method.upper(), str(expiration), path])
+ if not absolute:
+ expiration = int(time.time() + seconds)
+ else:
+ expiration = seconds
+ hmac_body = u'\n'.join([method.upper(), str(expiration), path_for_body])
# Encode to UTF-8 for py3 compatibility
- sig = hmac.new(key.encode(),
- hmac_body.encode(),
- hashlib.sha1).hexdigest()
-
- return ('{path}?temp_url_sig='
- '{sig}&temp_url_expires={exp}'.format(
- path=path,
- sig=sig,
- exp=expiration))
+ if not isinstance(key, six.binary_type):
+ key = key.encode('utf-8')
+ sig = hmac.new(key, hmac_body.encode('utf-8'), hashlib.sha1).hexdigest()
+
+ temp_url = u'{path}?temp_url_sig={sig}&temp_url_expires={exp}'.format(
+ path=path_for_body, sig=sig, exp=expiration)
+ # Have return type match path from caller
+ if isinstance(path, six.binary_type):
+ return temp_url.encode('utf-8')
+ else:
+ return temp_url
def parse_api_response(headers, body):
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index aae466c..0f210a3 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -17,7 +17,7 @@ import unittest
import mock
import six
import tempfile
-from hashlib import md5
+from hashlib import md5, sha1
from swiftclient import utils as u
@@ -120,48 +120,124 @@ class TestPrtBytes(unittest.TestCase):
class TestTempURL(unittest.TestCase):
-
- def setUp(self):
- super(TestTempURL, self).setUp()
- self.url = '/v1/AUTH_account/c/o'
- self.seconds = 3600
- self.key = 'correcthorsebatterystaple'
- self.method = 'GET'
-
- @mock.patch('hmac.HMAC.hexdigest', return_value='temp_url_signature')
+ url = '/v1/AUTH_account/c/o'
+ seconds = 3600
+ key = 'correcthorsebatterystaple'
+ method = 'GET'
+ expected_url = url + ('?temp_url_sig=temp_url_signature'
+ '&temp_url_expires=1400003600')
+ expected_body = '\n'.join([
+ method,
+ '1400003600',
+ url,
+ ]).encode('utf-8')
+
+ @mock.patch('hmac.HMAC')
@mock.patch('time.time', return_value=1400000000)
def test_generate_temp_url(self, time_mock, hmac_mock):
- expected_url = (
- '/v1/AUTH_account/c/o?'
- 'temp_url_sig=temp_url_signature&'
- 'temp_url_expires=1400003600')
- url = u.generate_temp_url(self.url, self.seconds, self.key,
- self.method)
- self.assertEqual(url, expected_url)
+ hmac_mock().hexdigest.return_value = 'temp_url_signature'
+ url = u.generate_temp_url(self.url, self.seconds,
+ self.key, self.method)
+ key = self.key
+ if not isinstance(key, six.binary_type):
+ key = key.encode('utf-8')
+ self.assertEqual(url, self.expected_url)
+ self.assertEqual(hmac_mock.mock_calls, [
+ mock.call(),
+ mock.call(key, self.expected_body, sha1),
+ mock.call().hexdigest(),
+ ])
+ self.assertIsInstance(url, type(self.url))
+
+ def test_generate_temp_url_invalid_path(self):
+ with self.assertRaises(ValueError) as exc_manager:
+ u.generate_temp_url(b'/v1/a/c/\xff', self.seconds, self.key,
+ self.method)
+ self.assertEqual(exc_manager.exception.args[0],
+ 'path must be representable as UTF-8')
@mock.patch('hmac.HMAC.hexdigest', return_value="temp_url_signature")
def test_generate_absolute_expiry_temp_url(self, hmac_mock):
- expected_url = ('/v1/AUTH_account/c/o?'
- 'temp_url_sig=temp_url_signature&'
- 'temp_url_expires=2146636800')
+ if isinstance(self.expected_url, six.binary_type):
+ expected_url = self.expected_url.replace(
+ b'1400003600', b'2146636800')
+ else:
+ expected_url = self.expected_url.replace(
+ u'1400003600', u'2146636800')
url = u.generate_temp_url(self.url, 2146636800, self.key, self.method,
absolute=True)
self.assertEqual(url, expected_url)
def test_generate_temp_url_bad_seconds(self):
- self.assertRaises(TypeError,
- u.generate_temp_url,
- self.url,
- 'not_an_int',
- self.key,
- self.method)
-
- self.assertRaises(ValueError,
- u.generate_temp_url,
- self.url,
- -1,
- self.key,
- self.method)
+ with self.assertRaises(TypeError) as exc_manager:
+ u.generate_temp_url(self.url, 'not_an_int', self.key, self.method)
+ self.assertEqual(exc_manager.exception.args[0],
+ 'seconds must be an integer')
+
+ with self.assertRaises(ValueError) as exc_manager:
+ u.generate_temp_url(self.url, -1, self.key, self.method)
+ self.assertEqual(exc_manager.exception.args[0],
+ 'seconds must be a positive integer')
+
+
+class TestTempURLUnicodePathAndKey(TestTempURL):
+ url = u'/v1/\u00e4/c/\u00f3'
+ key = u'k\u00e9y'
+ expected_url = (u'%s?temp_url_sig=temp_url_signature'
+ u'&temp_url_expires=1400003600') % url
+ expected_body = u'\n'.join([
+ u'GET',
+ u'1400003600',
+ url,
+ ]).encode('utf-8')
+
+
+class TestTempURLUnicodePathBytesKey(TestTempURL):
+ url = u'/v1/\u00e4/c/\u00f3'
+ key = u'k\u00e9y'.encode('utf-8')
+ expected_url = (u'%s?temp_url_sig=temp_url_signature'
+ u'&temp_url_expires=1400003600') % url
+ expected_body = '\n'.join([
+ u'GET',
+ u'1400003600',
+ url,
+ ]).encode('utf-8')
+
+
+class TestTempURLBytesPathUnicodeKey(TestTempURL):
+ url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8')
+ key = u'k\u00e9y'
+ expected_url = url + (b'?temp_url_sig=temp_url_signature'
+ b'&temp_url_expires=1400003600')
+ expected_body = b'\n'.join([
+ b'GET',
+ b'1400003600',
+ url,
+ ])
+
+
+class TestTempURLBytesPathAndKey(TestTempURL):
+ url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8')
+ key = u'k\u00e9y'.encode('utf-8')
+ expected_url = url + (b'?temp_url_sig=temp_url_signature'
+ b'&temp_url_expires=1400003600')
+ expected_body = b'\n'.join([
+ b'GET',
+ b'1400003600',
+ url,
+ ])
+
+
+class TestTempURLBytesPathAndNonUtf8Key(TestTempURL):
+ url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8')
+ key = b'k\xffy'
+ expected_url = url + (b'?temp_url_sig=temp_url_signature'
+ b'&temp_url_expires=1400003600')
+ expected_body = b'\n'.join([
+ b'GET',
+ b'1400003600',
+ url,
+ ])
class TestReadableToIterable(unittest.TestCase):