summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keystoneclient/contrib/ec2/utils.py196
-rw-r--r--tests/test_ec2utils.py72
2 files changed, 249 insertions, 19 deletions
diff --git a/keystoneclient/contrib/ec2/utils.py b/keystoneclient/contrib/ec2/utils.py
index fcd8ee3..ca5afa7 100644
--- a/keystoneclient/contrib/ec2/utils.py
+++ b/keystoneclient/contrib/ec2/utils.py
@@ -32,24 +32,69 @@ class Ec2Signer(object):
"""
def __init__(self, secret_key):
- secret_key = secret_key.encode()
- self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
+ self.secret_key = secret_key.encode()
+ self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
if hashlib.sha256:
- self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
+ self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
+
+ def _v4_creds(self, credentials):
+ """
+ Detect if the credentials are for a v4 signed request, since AWS
+ removed the SignatureVersion field from the v4 request spec...
+ This expects a dict of the request headers to be passed in the
+ credentials dict, since the recommended way to pass v4 creds is
+ via the 'Authorization' header
+ see http://docs.aws.amazon.com/general/latest/gr/
+ sigv4-signed-request-examples.html
+
+ Alternatively X-Amz-Algorithm can be specified as a query parameter,
+ and the authentication data can also passed as query parameters.
+
+ Note a hash of the request body is also required in the credentials
+ for v4 auth to work in the body_hash key, calculated via:
+ hashlib.sha256(req.body).hexdigest()
+ """
+ try:
+ auth_str = credentials['headers']['Authorization']
+ if auth_str.startswith('AWS4-HMAC-SHA256'):
+ return True
+ except KeyError:
+ # Alternatively the Authorization data can be passed via
+ # the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
+ try:
+ if (credentials['params']['X-Amz-Algorithm'] ==
+ 'AWS4-HMAC-SHA256'):
+ return True
+ except KeyError:
+ pass
+
+ return False
def generate(self, credentials):
"""Generate auth string according to what SignatureVersion is given."""
- if credentials['params']['SignatureVersion'] == '0':
+ signature_version = credentials['params'].get('SignatureVersion')
+ if signature_version == '0':
return self._calc_signature_0(credentials['params'])
- if credentials['params']['SignatureVersion'] == '1':
+ if signature_version == '1':
return self._calc_signature_1(credentials['params'])
- if credentials['params']['SignatureVersion'] == '2':
+ if signature_version == '2':
return self._calc_signature_2(credentials['params'],
credentials['verb'],
credentials['host'],
credentials['path'])
- raise Exception('Unknown Signature Version: %s' %
- credentials['params']['SignatureVersion'])
+ if self._v4_creds(credentials):
+ return self._calc_signature_4(credentials['params'],
+ credentials['verb'],
+ credentials['host'],
+ credentials['path'],
+ credentials['headers'],
+ credentials['body_hash'])
+
+ if signature_version is not None:
+ raise Exception('Unknown signature version: %s' %
+ signature_version)
+ else:
+ raise Exception('Unexpected signature format')
@staticmethod
def _get_utf8_value(value):
@@ -77,6 +122,22 @@ class Ec2Signer(object):
self.hmac.update(val)
return base64.b64encode(self.hmac.digest())
+ @staticmethod
+ def _canonical_qs(params):
+ """
+ Construct a sorted, correctly encoded query string as required for
+ _calc_signature_2 and _calc_signature_4
+ """
+ keys = params.keys()
+ keys.sort()
+ pairs = []
+ for key in keys:
+ val = Ec2Signer._get_utf8_value(params[key])
+ val = urllib.quote(val, safe='-_~')
+ pairs.append(urllib.quote(key, safe='') + '=' + val)
+ qs = '&'.join(pairs)
+ return qs
+
def _calc_signature_2(self, params, verb, server_string, path):
"""Generate AWS signature version 2 string."""
string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
@@ -86,15 +147,116 @@ class Ec2Signer(object):
else:
current_hmac = self.hmac
params['SignatureMethod'] = 'HmacSHA1'
- keys = params.keys()
- keys.sort()
- pairs = []
- for key in keys:
- val = self._get_utf8_value(params[key])
- val = urllib.quote(val, safe='-_~')
- pairs.append(urllib.quote(key, safe='') + '=' + val)
- qs = '&'.join(pairs)
- string_to_sign += qs
+ string_to_sign += self._canonical_qs(params)
current_hmac.update(string_to_sign)
b64 = base64.b64encode(current_hmac.digest())
return b64
+
+ def _calc_signature_4(self, params, verb, server_string, path, headers,
+ body_hash):
+ """Generate AWS signature version 4 string."""
+
+ def sign(key, msg):
+ return hmac.new(key, self._get_utf8_value(msg),
+ hashlib.sha256).digest()
+
+ def signature_key(datestamp, region_name, service_name):
+ """
+ Signature key derivation, see
+ http://docs.aws.amazon.com/general/latest/gr/
+ signature-v4-examples.html#signature-v4-examples-python
+ """
+ k_date = sign(self._get_utf8_value("AWS4" + self.secret_key),
+ datestamp)
+ k_region = sign(k_date, region_name)
+ k_service = sign(k_region, service_name)
+ k_signing = sign(k_service, "aws4_request")
+ return k_signing
+
+ def auth_param(param_name):
+ """
+ Get specified auth parameter, provided via one of:
+ - the Authorization header
+ - the X-Amz-* query parameters
+ """
+ try:
+ auth_str = headers['Authorization']
+ param_str = auth_str.partition(
+ '%s=' % param_name)[2].split(',')[0]
+ except KeyError:
+ param_str = params.get('X-Amz-%s' % param_name)
+ return param_str
+
+ def date_param():
+ """
+ Get the X-Amz-Date' value, which can be either a header or paramter
+
+ Note AWS supports parsing the Date header also, but this is not
+ currently supported here as it will require some format mangling
+ So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
+ can be used to match against the YYYYMMDD format provided in the
+ credential scope.
+ see:
+ http://docs.aws.amazon.com/general/latest/gr/
+ sigv4-date-handling.html
+ """
+ try:
+ return headers['X-Amz-Date']
+ except KeyError:
+ return params.get('X-Amz-Date')
+
+ def canonical_header_str():
+ # Get the list of headers to include, from either
+ # - the Authorization header (SignedHeaders key)
+ # - the X-Amz-SignedHeaders query parameter
+ headers_lower = dict((k.lower().strip(), v.strip())
+ for (k, v) in headers.iteritems())
+ header_list = []
+ sh_str = auth_param('SignedHeaders')
+ for h in sh_str.split(';'):
+ if h not in headers_lower:
+ continue
+ if h == 'host':
+ # Note we discard any port suffix
+ header_list.append('%s:%s' %
+ (h, headers_lower[h].split(':')[0]))
+ else:
+ header_list.append('%s:%s' % (h, headers_lower[h]))
+ return '\n'.join(header_list) + '\n'
+
+ # Create canonical request:
+ # http://docs.aws.amazon.com/general/latest/gr/
+ # sigv4-create-canonical-request.html
+ # Get parameters and headers in expected string format
+ cr = "\n".join((verb.upper(), path,
+ self._canonical_qs(params),
+ canonical_header_str(),
+ auth_param('SignedHeaders'),
+ body_hash))
+
+ # Check the date, reject any request where the X-Amz-Date doesn't
+ # match the credential scope
+ credential = auth_param('Credential')
+ credential_split = credential.split('/')
+ credential_scope = '/'.join(credential_split[1:])
+ credential_date = credential_split[1]
+ param_date = date_param()
+ if not param_date.startswith(credential_date):
+ raise Exception('Request date mismatch error')
+
+ # Create the string to sign
+ # http://docs.aws.amazon.com/general/latest/gr/
+ # sigv4-create-string-to-sign.html
+ string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
+ param_date,
+ credential_scope,
+ hashlib.sha256(cr).hexdigest()))
+
+ # Calculate the derived key, this requires a datestamp, region
+ # and service, which can be extracted from the credential scope
+ (req_region, req_service) = credential_split[2:4]
+ s_key = signature_key(credential_date, req_region, req_service)
+ # Finally calculate the signature!
+ signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
+ hashlib.sha256).hexdigest()
+ return signature
diff --git a/tests/test_ec2utils.py b/tests/test_ec2utils.py
index b0bd4df..a3c36fa 100644
--- a/tests/test_ec2utils.py
+++ b/tests/test_ec2utils.py
@@ -27,6 +27,36 @@ class Ec2SignerTest(testtools.TestCase):
self.secret = '89cdf9e94e2643cab35b8b8ac5a51f83'
self.signer = Ec2Signer(self.secret)
+ def tearDown(self):
+ super(Ec2SignerTest, self).tearDown()
+
+ def test_v4_creds_header(self):
+ auth_str = 'AWS4-HMAC-SHA256 blah'
+ credentials = {'host': '127.0.0.1',
+ 'verb': 'GET',
+ 'path': '/v1/',
+ 'params': {},
+ 'headers': {'Authorization': auth_str}}
+ self.assertTrue(self.signer._v4_creds(credentials))
+
+ def test_v4_creds_param(self):
+ credentials = {'host': '127.0.0.1',
+ 'verb': 'GET',
+ 'path': '/v1/',
+ 'params': {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256'},
+ 'headers': {}}
+ self.assertTrue(self.signer._v4_creds(credentials))
+
+ def test_v4_creds_false(self):
+ credentials = {'host': '127.0.0.1',
+ 'verb': 'GET',
+ 'path': '/v1/',
+ 'params': {'SignatureVersion': '0',
+ 'AWSAccessKeyId': self.access,
+ 'Timestamp': '2012-11-27T11:47:02Z',
+ 'Action': 'Foo'}}
+ self.assertFalse(self.signer._v4_creds(credentials))
+
def test_generate_0(self):
"""Test generate function for v0 signature"""
credentials = {'host': '127.0.0.1',
@@ -40,8 +70,6 @@ class Ec2SignerTest(testtools.TestCase):
expected = 'SmXQEZAUdQw5glv5mX8mmixBtas='
self.assertEqual(signature, expected)
- pass
-
def test_generate_1(self):
"""Test generate function for v1 signature"""
credentials = {'host': '127.0.0.1',
@@ -75,3 +103,43 @@ class Ec2SignerTest(testtools.TestCase):
signature = self.signer.generate(credentials)
expected = 'ZqCxMI4ZtTXWI175743mJ0hy/Gc='
self.assertEqual(signature, expected)
+
+ def test_generate_v4(self):
+ """
+ Test v4 generator with data from AWS docs example, see:
+ http://docs.aws.amazon.com/general/latest/gr/
+ sigv4-create-canonical-request.html
+ and
+ http://docs.aws.amazon.com/general/latest/gr/
+ sigv4-signed-request-examples.html
+ """
+ # Create a new signer object with the AWS example key
+ secret = 'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'
+ signer = Ec2Signer(secret)
+
+ body_hash = ('b6359072c78d70ebee1e81adcbab4f0'
+ '1bf2c23245fa365ef83fe8f1f955085e2')
+ auth_str = ('AWS4-HMAC-SHA256 '
+ 'Credential=AKIAIOSFODNN7EXAMPLE/20110909/'
+ 'us-east-1/iam/aws4_request,'
+ 'SignedHeaders=content-type;host;x-amz-date,')
+ headers = {'Content-type':
+ 'application/x-www-form-urlencoded; charset=utf-8',
+ 'X-Amz-Date': '20110909T233600Z',
+ 'Host': 'iam.amazonaws.com',
+ 'Authorization': auth_str}
+ # Note the example in the AWS docs is inconsistent, previous
+ # examples specify no query string, but the final POST example
+ # does, apparently incorrectly since an empty parameter list
+ # aligns all steps and the final signature with the examples
+ params = {}
+ credentials = {'host': 'iam.amazonaws.com',
+ 'verb': 'POST',
+ 'path': '/',
+ 'params': params,
+ 'headers': headers,
+ 'body_hash': body_hash}
+ signature = signer.generate(credentials)
+ expected = ('ced6826de92d2bdeed8f846f0bf508e8'
+ '559e98e4b0199114b84c54174deb456c')
+ self.assertEqual(signature, expected)