summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swift/common/middleware/formpost.py49
-rw-r--r--swift/common/middleware/tempurl.py24
-rw-r--r--swift/common/utils.py40
-rw-r--r--test/functional/test_tempurl.py29
-rw-r--r--test/unit/common/middleware/test_formpost.py194
-rw-r--r--test/unit/common/middleware/test_tempurl.py3
-rw-r--r--test/unit/common/test_utils.py73
7 files changed, 352 insertions, 60 deletions
diff --git a/swift/common/middleware/formpost.py b/swift/common/middleware/formpost.py
index 84a8ee09b..b1df25825 100644
--- a/swift/common/middleware/formpost.py
+++ b/swift/common/middleware/formpost.py
@@ -84,11 +84,11 @@ desired.
The expires attribute is the Unix timestamp before which the form
must be submitted before it is invalidated.
-The signature attribute is the HMAC-SHA1 signature of the form. Here is
+The signature attribute is the HMAC signature of the form. Here is
sample code for computing the signature::
import hmac
- from hashlib import sha1
+ from hashlib import sha512
from time import time
path = '/v1/account/container/object_prefix'
redirect = 'https://srv.com/some-page' # set to '' if redirect not in form
@@ -98,7 +98,7 @@ sample code for computing the signature::
key = 'mykey'
hmac_body = '%s\n%s\n%s\n%s\n%s' % (path, redirect,
max_file_size, max_file_count, expires)
- signature = hmac.new(key, hmac_body, sha1).hexdigest()
+ signature = hmac.new(key, hmac_body, sha512).hexdigest()
The key is the value of either the account (X-Account-Meta-Temp-URL-Key,
X-Account-Meta-Temp-Url-Key-2) or the container
@@ -123,7 +123,7 @@ the file are simply ignored).
__all__ = ['FormPost', 'filter_factory', 'READ_CHUNK_SIZE', 'MAX_VALUE_LENGTH']
import hmac
-from hashlib import sha1
+import hashlib
from time import time
import six
@@ -131,10 +131,11 @@ from six.moves.urllib.parse import quote
from swift.common.constraints import valid_api_version
from swift.common.exceptions import MimeInvalid
-from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata
+from swift.common.middleware.tempurl import get_tempurl_keys_from_metadata, \
+ SUPPORTED_DIGESTS
from swift.common.utils import streq_const_time, parse_content_disposition, \
parse_mime_headers, iter_multipart_mime_documents, reiterate, \
- close_if_possible
+ close_if_possible, get_logger, extract_digest_and_algorithm
from swift.common.registry import register_swift_info
from swift.common.wsgi import make_pre_authed_env
from swift.common.swob import HTTPUnauthorized, wsgi_to_str, str_to_wsgi
@@ -210,6 +211,11 @@ class FormPost(object):
self.app = app
#: The filter configuration dict.
self.conf = conf
+ # Defaulting to SUPPORTED_DIGESTS just so we don't completely
+ # deprecate sha1 yet. We'll change this to DEFAULT_ALLOWED_DIGESTS
+ # later.
+ self.allowed_digests = conf.get(
+ 'allowed_digests', SUPPORTED_DIGESTS)
def __call__(self, env, start_response):
"""
@@ -405,13 +411,22 @@ class FormPost(object):
hmac_body = hmac_body.encode('utf-8')
has_valid_sig = False
+ signature = attributes.get('signature', '')
+ try:
+ hash_algorithm, signature = extract_digest_and_algorithm(signature)
+ except ValueError:
+ raise FormUnauthorized('invalid signature')
+ if hash_algorithm not in self.allowed_digests:
+ raise FormUnauthorized('invalid signature')
+ if six.PY2:
+ hash_algorithm = getattr(hashlib, hash_algorithm)
+
for key in keys:
# Encode key like in swift.common.utls.get_hmac.
if not isinstance(key, six.binary_type):
key = key.encode('utf8')
- sig = hmac.new(key, hmac_body, sha1).hexdigest()
- if streq_const_time(sig, (attributes.get('signature') or
- 'invalid')):
+ sig = hmac.new(key, hmac_body, hash_algorithm).hexdigest()
+ if streq_const_time(sig, signature):
has_valid_sig = True
if not has_valid_sig:
raise FormUnauthorized('invalid signature')
@@ -467,6 +482,18 @@ def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
- register_swift_info('formpost')
-
+ logger = get_logger(conf, log_route='formpost')
+ allowed_digests = set(conf.get('allowed_digests', '').split()) or \
+ SUPPORTED_DIGESTS
+ not_supported = allowed_digests - SUPPORTED_DIGESTS
+ if not_supported:
+ logger.warning('The following digest algorithms are configured but '
+ 'not supported: %s', ', '.join(not_supported))
+ allowed_digests -= not_supported
+ if not allowed_digests:
+ raise ValueError('No valid digest algorithms are configured '
+ 'for formpost')
+ info = {'allowed_digests': sorted(allowed_digests)}
+ register_swift_info('formpost', **info)
+ conf.update(info)
return lambda app: FormPost(app, conf)
diff --git a/swift/common/middleware/tempurl.py b/swift/common/middleware/tempurl.py
index 138590020..ab155f039 100644
--- a/swift/common/middleware/tempurl.py
+++ b/swift/common/middleware/tempurl.py
@@ -298,7 +298,6 @@ __all__ = ['TempURL', 'filter_factory',
'DEFAULT_OUTGOING_REMOVE_HEADERS',
'DEFAULT_OUTGOING_ALLOW_HEADERS']
-import binascii
from calendar import timegm
import six
from os.path import basename
@@ -313,7 +312,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import header_to_environ_key, HTTPUnauthorized, \
HTTPBadRequest, wsgi_to_str
from swift.common.utils import split_path, get_valid_utf8_str, \
- get_hmac, streq_const_time, quote, get_logger, strict_b64decode
+ get_hmac, streq_const_time, quote, get_logger, extract_digest_and_algorithm
from swift.common.registry import register_swift_info, register_sensitive_param
@@ -505,23 +504,10 @@ class TempURL(object):
if not temp_url_sig or not temp_url_expires:
return self._invalid(env, start_response)
- if ':' in temp_url_sig:
- hash_algorithm, temp_url_sig = temp_url_sig.split(':', 1)
- if ('-' in temp_url_sig or '_' in temp_url_sig) and not (
- '+' in temp_url_sig or '/' in temp_url_sig):
- temp_url_sig = temp_url_sig.replace('-', '+').replace('_', '/')
- try:
- temp_url_sig = binascii.hexlify(strict_b64decode(
- temp_url_sig + '=='))
- if not six.PY2:
- temp_url_sig = temp_url_sig.decode('ascii')
- except ValueError:
- return self._invalid(env, start_response)
- elif len(temp_url_sig) == 40:
- hash_algorithm = 'sha1'
- elif len(temp_url_sig) == 64:
- hash_algorithm = 'sha256'
- else:
+ try:
+ hash_algorithm, temp_url_sig = extract_digest_and_algorithm(
+ temp_url_sig)
+ except ValueError:
return self._invalid(env, start_response)
if hash_algorithm not in self.allowed_digests:
return self._invalid(env, start_response)
diff --git a/swift/common/utils.py b/swift/common/utils.py
index ffd6b3999..25505d8ff 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -282,6 +282,46 @@ except (InvalidHashPathConfigError, IOError):
pass
+def extract_digest_and_algorithm(value):
+ """
+ Returns a tuple of (digest_algorithm, hex_encoded_digest)
+ from a client-provided string of the form::
+
+ <hex-encoded digest>
+
+ or::
+
+ <algorithm>:<base64-encoded digest>
+
+ Note that hex-encoded strings must use one of sha1, sha256, or sha512.
+
+ :raises: ValueError on parse failures
+ """
+ if ':' in value:
+ algo, value = value.split(':', 1)
+ # accept both standard and url-safe base64
+ if ('-' in value or '_' in value) and not (
+ '+' in value or '/' in value):
+ value = value.replace('-', '+').replace('_', '/')
+ value = binascii.hexlify(strict_b64decode(value + '=='))
+ if not six.PY2:
+ value = value.decode('ascii')
+ else:
+ try:
+ binascii.unhexlify(value) # make sure it decodes
+ except TypeError:
+ # This is just for py2
+ raise ValueError('Non-hexadecimal digit found')
+ algo = {
+ 40: 'sha1',
+ 64: 'sha256',
+ 128: 'sha512',
+ }.get(len(value))
+ if not algo:
+ raise ValueError('Bad digest length')
+ return algo, value
+
+
def get_hmac(request_method, path, expires, key, digest="sha1",
ip_range=None):
"""
diff --git a/test/functional/test_tempurl.py b/test/functional/test_tempurl.py
index 48b3bc753..6f442e479 100644
--- a/test/functional/test_tempurl.py
+++ b/test/functional/test_tempurl.py
@@ -840,7 +840,7 @@ class TestTempurlAlgorithms(Base):
else:
raise ValueError('Unrecognized encoding: %r' % encoding)
- def _do_test(self, digest, encoding, expect_failure=False):
+ def _do_test(self, digest, encoding):
expires = int(time()) + 86400
sig = self.get_sig(expires, digest, encoding)
@@ -852,24 +852,14 @@ class TestTempurlAlgorithms(Base):
parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
- if expect_failure:
- with self.assertRaises(ResponseError):
- self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
- self.assert_status([401])
-
- # ditto for HEADs
- with self.assertRaises(ResponseError):
- self.env.obj.info(parms=parms, cfg={'no_auth_token': True})
- self.assert_status([401])
- else:
- contents = self.env.obj.read(
- parms=parms,
- cfg={'no_auth_token': True})
- self.assertEqual(contents, b"obj contents")
+ contents = self.env.obj.read(
+ parms=parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, b"obj contents")
- # GET tempurls also allow HEAD requests
- self.assertTrue(self.env.obj.info(
- parms=parms, cfg={'no_auth_token': True}))
+ # GET tempurls also allow HEAD requests
+ self.assertTrue(self.env.obj.info(
+ parms=parms, cfg={'no_auth_token': True}))
@requires_digest('sha1')
def test_sha1(self):
@@ -889,8 +879,7 @@ class TestTempurlAlgorithms(Base):
@requires_digest('sha512')
def test_sha512(self):
- # 128 chars seems awfully long for a signature -- let's require base64
- self._do_test('sha512', 'hex', expect_failure=True)
+ self._do_test('sha512', 'hex')
self._do_test('sha512', 'base64')
self._do_test('sha512', 'base64-no-padding')
self._do_test('sha512', 'url-safe-base64')
diff --git a/test/unit/common/middleware/test_formpost.py b/test/unit/common/middleware/test_formpost.py
index a8b4a0be3..caa7487d6 100644
--- a/test/unit/common/middleware/test_formpost.py
+++ b/test/unit/common/middleware/test_formpost.py
@@ -13,18 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import base64
import hmac
+import hashlib
import unittest
-from hashlib import sha1
from time import time
import six
+if six.PY3:
+ from unittest import mock
+else:
+ import mock
from io import BytesIO
from swift.common.swob import Request, Response, wsgi_quote
from swift.common.middleware import tempauth, formpost
from swift.common.utils import split_path
+from swift.common import registry
from swift.proxy.controllers.base import get_cache_key
+from test.debug_logger import debug_logger
def hmac_msg(path, redirect, max_file_size, max_file_count, expires):
@@ -163,11 +170,23 @@ class TestFormPost(unittest.TestCase):
'meta': meta}
def _make_sig_env_body(self, path, redirect, max_file_size, max_file_count,
- expires, key, user_agent=True):
- sig = hmac.new(
+ expires, key, user_agent=True, algorithm='sha512',
+ prefix=True):
+ alg_name = algorithm
+ if six.PY2:
+ algorithm = getattr(hashlib, algorithm)
+ mac = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ algorithm)
+ if prefix:
+ if six.PY2:
+ sig = alg_name + ':' + base64.b64encode(mac.digest())
+ else:
+ sig = alg_name + ':' + base64.b64encode(
+ mac.digest()).decode('ascii')
+ else:
+ sig = mac.hexdigest()
body = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
@@ -297,7 +316,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="redirect"',
@@ -415,7 +434,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------168072824752491622650073',
'Content-Disposition: form-data; name="redirect"',
@@ -532,7 +551,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'------WebKitFormBoundaryq3CFxUjfsDMu8XsA',
'Content-Disposition: form-data; name="redirect"',
@@ -652,7 +671,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'-----------------------------7db20d93017c',
'Content-Disposition: form-data; name="redirect"',
@@ -770,7 +789,7 @@ class TestFormPost(unittest.TestCase):
sig = hmac.new(
key,
hmac_msg(path, redirect, max_file_size, max_file_count, expires),
- sha1).hexdigest()
+ hashlib.sha512).hexdigest()
wsgi_input = '\r\n'.join([
'--------------------------dea19ac8502ca805',
'Content-Disposition: form-data; name="redirect"',
@@ -1459,6 +1478,78 @@ class TestFormPost(unittest.TestCase):
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
+ def test_prefixed_and_not_prefixed_sigs_good(self):
+ def do_test(digest, prefixed):
+ key = b'abc'
+ sig, env, body = self._make_sig_env_body(
+ '/v1/AUTH_test/container', '', 1024, 10,
+ int(time() + 86400), key, algorithm=digest, prefix=prefixed)
+ env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
+ env['swift.infocache'][get_cache_key('AUTH_test')] = (
+ self._fake_cache_env('AUTH_test', [key]))
+ env['swift.infocache'][get_cache_key(
+ 'AUTH_test', 'container')] = {'meta': {}}
+ self.app = FakeApp(iter([('201 Created', {}, b''),
+ ('201 Created', {}, b'')]))
+ self.auth = tempauth.filter_factory({})(self.app)
+ self.formpost = formpost.filter_factory({})(self.auth)
+ status = [None]
+ headers = [None]
+ exc_info = [None]
+
+ def start_response(s, h, e=None):
+ status[0] = s
+ headers[0] = h
+ exc_info[0] = e
+
+ body = b''.join(self.formpost(env, start_response))
+ status = status[0]
+ headers = headers[0]
+ exc_info = exc_info[0]
+ self.assertEqual(status, '201 Created')
+ location = None
+ for h, v in headers:
+ if h.lower() == 'location':
+ location = v
+ self.assertIsNone(location)
+ self.assertIsNone(exc_info)
+ self.assertTrue(b'201 Created' in body)
+ self.assertEqual(len(self.app.requests), 2)
+ self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
+ self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
+
+ for digest in ('sha1', 'sha256', 'sha512'):
+ do_test(digest, True)
+ do_test(digest, False)
+
+ def test_prefixed_and_not_prefixed_sigs_unsupported(self):
+ def do_test(digest, prefixed):
+ key = b'abc'
+ sig, env, body = self._make_sig_env_body(
+ '/v1/AUTH_test/container', '', 1024, 10,
+ int(time() + 86400), key, algorithm=digest, prefix=prefixed)
+ env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
+ env['swift.infocache'][get_cache_key('AUTH_test')] = (
+ self._fake_cache_env('AUTH_test', [key]))
+ env['swift.infocache'][get_cache_key(
+ 'AUTH_test', 'container')] = {'meta': {}}
+ self.app = FakeApp(iter([('201 Created', {}, b''),
+ ('201 Created', {}, b'')]))
+ self.auth = tempauth.filter_factory({})(self.app)
+ self.formpost = formpost.filter_factory({})(self.auth)
+ status = [None]
+
+ def start_response(s, h, e=None):
+ status[0] = s
+
+ body = b''.join(self.formpost(env, start_response))
+ status = status[0]
+ self.assertEqual(status, '401 Unauthorized')
+
+ for digest in ('md5', 'sha224'):
+ do_test(digest, True)
+ do_test(digest, False)
+
def test_no_redirect_expired(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
@@ -1559,6 +1650,51 @@ class TestFormPost(unittest.TestCase):
self.assertIsNone(exc_info)
self.assertTrue(b'FormPost: invalid starting boundary' in body)
+ def test_redirect_allowed_and_unsupported_digests(self):
+ def do_test(digest):
+ key = b'abc'
+ sig, env, body = self._make_sig_env_body(
+ '/v1/AUTH_test/container', 'http://redirect', 1024, 10,
+ int(time() + 86400), key, algorithm=digest)
+ env['wsgi.input'] = BytesIO(b'\r\n'.join(body))
+ env['swift.infocache'][get_cache_key('AUTH_test')] = (
+ self._fake_cache_env('AUTH_test', [key]))
+ env['swift.infocache'][get_cache_key(
+ 'AUTH_test', 'container')] = {'meta': {}}
+ self.app = FakeApp(iter([('201 Created', {}, b''),
+ ('201 Created', {}, b'')]))
+ self.auth = tempauth.filter_factory({})(self.app)
+ self.formpost = formpost.filter_factory({})(self.auth)
+ status = [None]
+ headers = [None]
+ exc_info = [None]
+
+ def start_response(s, h, e=None):
+ status[0] = s
+ headers[0] = h
+ exc_info[0] = e
+
+ body = b''.join(self.formpost(env, start_response))
+ return body, status[0], headers[0], exc_info[0]
+
+ for algorithm in ('sha1', 'sha256', 'sha512'):
+ body, status, headers, exc_info = do_test(algorithm)
+ self.assertEqual(status, '303 See Other')
+ location = None
+ for h, v in headers:
+ if h.lower() == 'location':
+ location = v
+ self.assertEqual(location, 'http://redirect?status=201&message=')
+ self.assertIsNone(exc_info)
+ self.assertTrue(location.encode('utf-8') in body)
+ self.assertEqual(len(self.app.requests), 2)
+ self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
+ self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
+
+ # unsupported
+ _body, status, _headers, _exc_info = do_test("md5")
+ self.assertEqual(status, '401 Unauthorized')
+
def test_no_v1(self):
key = b'abc'
sig, env, body = self._make_sig_env_body(
@@ -2099,5 +2235,45 @@ class TestFormPost(unittest.TestCase):
self.assertFalse("Content-Encoding" in self.app.requests[2].headers)
+class TestSwiftInfo(unittest.TestCase):
+ def setUp(self):
+ registry._swift_info = {}
+ registry._swift_admin_info = {}
+
+ def test_registered_defaults(self):
+ formpost.filter_factory({})
+ swift_info = registry.get_swift_info()
+ self.assertIn('formpost', swift_info)
+ info = swift_info['formpost']
+ self.assertIn('allowed_digests', info)
+ self.assertEqual(info['allowed_digests'], ['sha1', 'sha256', 'sha512'])
+
+ def test_non_default_methods(self):
+ logger = debug_logger()
+ with mock.patch('swift.common.middleware.formpost.get_logger',
+ return_value=logger):
+ formpost.filter_factory({
+ 'allowed_digests': 'sha1 sha512 md5 not-a-valid-digest',
+ })
+ swift_info = registry.get_swift_info()
+ self.assertIn('formpost', swift_info)
+ info = swift_info['formpost']
+ self.assertIn('allowed_digests', info)
+ self.assertEqual(info['allowed_digests'], ['sha1', 'sha512'])
+ warning_lines = logger.get_lines_for_level('warning')
+ self.assertIn(
+ 'The following digest algorithms are configured '
+ 'but not supported:',
+ warning_lines[0])
+ self.assertIn('not-a-valid-digest', warning_lines[0])
+ self.assertIn('md5', warning_lines[0])
+
+ def test_bad_config(self):
+ with self.assertRaises(ValueError):
+ formpost.filter_factory({
+ 'allowed_digests': 'md4',
+ })
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/common/middleware/test_tempurl.py b/test/unit/common/middleware/test_tempurl.py
index 24f3512b4..1d61e99e1 100644
--- a/test/unit/common/middleware/test_tempurl.py
+++ b/test/unit/common/middleware/test_tempurl.py
@@ -220,7 +220,8 @@ class TestTempURL(unittest.TestCase):
self.tempurl = tempurl.filter_factory({
'allowed_digests': 'sha1'})(self.auth)
- sig = 'valid_sigs_will_be_exactly_40_characters'
+ # valid sig should be exactly 40 hex chars
+ sig = 'deadbeefdeadbeefdeadbeefdeadbeefdeadbeef'
expires = int(time() + 1000)
p_logging.access_logger.logger = debug_logger('fake')
resp = self._make_request(
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 639191c6e..d142661a1 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -3836,6 +3836,79 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(u'abc_%EC%9D%BC%EC%98%81',
utils.quote(u'abc_\uc77c\uc601'))
+ def test_extract_digest_and_algorithm(self):
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f'),
+ ('sha1', 'b17f6ff8da0e251737aa9e3ee69a881e3e092e2f'))
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha1:sw3eTSuFYrhJZGbDtGsrmsUFRGE='),
+ ('sha1', 'b30dde4d2b8562b8496466c3b46b2b9ac5054461'))
+ # also good with '=' stripped
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha1:sw3eTSuFYrhJZGbDtGsrmsUFRGE'),
+ ('sha1', 'b30dde4d2b8562b8496466c3b46b2b9ac5054461'))
+
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'b963712313cd4236696fb4c4cf11fc56'
+ 'ff4158e0bcbf1d4424df147783fd1045'),
+ ('sha256', 'b963712313cd4236696fb4c4cf11fc56'
+ 'ff4158e0bcbf1d4424df147783fd1045'))
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha256:uWNxIxPNQjZpb7TEzxH8Vv9BWOC8vx1EJN8Ud4P9EEU='),
+ ('sha256', 'b963712313cd4236696fb4c4cf11fc56'
+ 'ff4158e0bcbf1d4424df147783fd1045'))
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha256:uWNxIxPNQjZpb7TEzxH8Vv9BWOC8vx1EJN8Ud4P9EEU'),
+ ('sha256', 'b963712313cd4236696fb4c4cf11fc56'
+ 'ff4158e0bcbf1d4424df147783fd1045'))
+
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ '26df3d9d59da574d6f8d359cb2620b1b'
+ '86737215c38c412dfee0a410acea1ac4'
+ '285ad0c37229ca74e715c443979da17d'
+ '3d77a97d2ac79cc5e395b05bfa4bdd30'),
+ ('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
+ '86737215c38c412dfee0a410acea1ac4'
+ '285ad0c37229ca74e715c443979da17d'
+ '3d77a97d2ac79cc5e395b05bfa4bdd30'))
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha512:Jt89nVnaV01vjTWcsmILG4ZzchXDjEEt/uCkEKzq'
+ 'GsQoWtDDcinKdOcVxEOXnaF9PXepfSrHnMXjlbBb+kvdMA=='),
+ ('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
+ '86737215c38c412dfee0a410acea1ac4'
+ '285ad0c37229ca74e715c443979da17d'
+ '3d77a97d2ac79cc5e395b05bfa4bdd30'))
+ self.assertEqual(
+ utils.extract_digest_and_algorithm(
+ 'sha512:Jt89nVnaV01vjTWcsmILG4ZzchXDjEEt_uCkEKzq'
+ 'GsQoWtDDcinKdOcVxEOXnaF9PXepfSrHnMXjlbBb-kvdMA'),
+ ('sha512', '26df3d9d59da574d6f8d359cb2620b1b'
+ '86737215c38c412dfee0a410acea1ac4'
+ '285ad0c37229ca74e715c443979da17d'
+ '3d77a97d2ac79cc5e395b05bfa4bdd30'))
+
+ with self.assertRaises(ValueError):
+ utils.extract_digest_and_algorithm('')
+ with self.assertRaises(ValueError):
+ utils.extract_digest_and_algorithm(
+ 'exactly_forty_chars_but_not_hex_encoded!')
+ # Too short (md5)
+ with self.assertRaises(ValueError):
+ utils.extract_digest_and_algorithm(
+ 'd41d8cd98f00b204e9800998ecf8427e')
+ # but you can slip it in via the prefix notation!
+ self.assertEqual(
+ utils.extract_digest_and_algorithm('md5:1B2M2Y8AsgTpgAmY7PhCfg'),
+ ('md5', 'd41d8cd98f00b204e9800998ecf8427e'))
+
def test_get_hmac(self):
self.assertEqual(
utils.get_hmac('GET', '/path', 1, 'abc'),