diff options
-rw-r--r-- | keystoneclient/contrib/ec2/utils.py | 196 | ||||
-rw-r--r-- | tests/test_ec2utils.py | 72 |
2 files changed, 249 insertions, 19 deletions
diff --git a/keystoneclient/contrib/ec2/utils.py b/keystoneclient/contrib/ec2/utils.py index fcd8ee3..ca5afa7 100644 --- a/keystoneclient/contrib/ec2/utils.py +++ b/keystoneclient/contrib/ec2/utils.py @@ -32,24 +32,69 @@ class Ec2Signer(object): """ def __init__(self, secret_key): - secret_key = secret_key.encode() - self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1) + self.secret_key = secret_key.encode() + self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1) if hashlib.sha256: - self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256) + self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256) + + def _v4_creds(self, credentials): + """ + Detect if the credentials are for a v4 signed request, since AWS + removed the SignatureVersion field from the v4 request spec... + This expects a dict of the request headers to be passed in the + credentials dict, since the recommended way to pass v4 creds is + via the 'Authorization' header + see http://docs.aws.amazon.com/general/latest/gr/ + sigv4-signed-request-examples.html + + Alternatively X-Amz-Algorithm can be specified as a query parameter, + and the authentication data can also passed as query parameters. + + Note a hash of the request body is also required in the credentials + for v4 auth to work in the body_hash key, calculated via: + hashlib.sha256(req.body).hexdigest() + """ + try: + auth_str = credentials['headers']['Authorization'] + if auth_str.startswith('AWS4-HMAC-SHA256'): + return True + except KeyError: + # Alternatively the Authorization data can be passed via + # the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256 + try: + if (credentials['params']['X-Amz-Algorithm'] == + 'AWS4-HMAC-SHA256'): + return True + except KeyError: + pass + + return False def generate(self, credentials): """Generate auth string according to what SignatureVersion is given.""" - if credentials['params']['SignatureVersion'] == '0': + signature_version = credentials['params'].get('SignatureVersion') + if signature_version == '0': return self._calc_signature_0(credentials['params']) - if credentials['params']['SignatureVersion'] == '1': + if signature_version == '1': return self._calc_signature_1(credentials['params']) - if credentials['params']['SignatureVersion'] == '2': + if signature_version == '2': return self._calc_signature_2(credentials['params'], credentials['verb'], credentials['host'], credentials['path']) - raise Exception('Unknown Signature Version: %s' % - credentials['params']['SignatureVersion']) + if self._v4_creds(credentials): + return self._calc_signature_4(credentials['params'], + credentials['verb'], + credentials['host'], + credentials['path'], + credentials['headers'], + credentials['body_hash']) + + if signature_version is not None: + raise Exception('Unknown signature version: %s' % + signature_version) + else: + raise Exception('Unexpected signature format') @staticmethod def _get_utf8_value(value): @@ -77,6 +122,22 @@ class Ec2Signer(object): self.hmac.update(val) return base64.b64encode(self.hmac.digest()) + @staticmethod + def _canonical_qs(params): + """ + Construct a sorted, correctly encoded query string as required for + _calc_signature_2 and _calc_signature_4 + """ + keys = params.keys() + keys.sort() + pairs = [] + for key in keys: + val = Ec2Signer._get_utf8_value(params[key]) + val = urllib.quote(val, safe='-_~') + pairs.append(urllib.quote(key, safe='') + '=' + val) + qs = '&'.join(pairs) + return qs + def _calc_signature_2(self, params, verb, server_string, path): """Generate AWS signature version 2 string.""" string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path) @@ -86,15 +147,116 @@ class Ec2Signer(object): else: current_hmac = self.hmac params['SignatureMethod'] = 'HmacSHA1' - keys = params.keys() - keys.sort() - pairs = [] - for key in keys: - val = self._get_utf8_value(params[key]) - val = urllib.quote(val, safe='-_~') - pairs.append(urllib.quote(key, safe='') + '=' + val) - qs = '&'.join(pairs) - string_to_sign += qs + string_to_sign += self._canonical_qs(params) current_hmac.update(string_to_sign) b64 = base64.b64encode(current_hmac.digest()) return b64 + + def _calc_signature_4(self, params, verb, server_string, path, headers, + body_hash): + """Generate AWS signature version 4 string.""" + + def sign(key, msg): + return hmac.new(key, self._get_utf8_value(msg), + hashlib.sha256).digest() + + def signature_key(datestamp, region_name, service_name): + """ + Signature key derivation, see + http://docs.aws.amazon.com/general/latest/gr/ + signature-v4-examples.html#signature-v4-examples-python + """ + k_date = sign(self._get_utf8_value("AWS4" + self.secret_key), + datestamp) + k_region = sign(k_date, region_name) + k_service = sign(k_region, service_name) + k_signing = sign(k_service, "aws4_request") + return k_signing + + def auth_param(param_name): + """ + Get specified auth parameter, provided via one of: + - the Authorization header + - the X-Amz-* query parameters + """ + try: + auth_str = headers['Authorization'] + param_str = auth_str.partition( + '%s=' % param_name)[2].split(',')[0] + except KeyError: + param_str = params.get('X-Amz-%s' % param_name) + return param_str + + def date_param(): + """ + Get the X-Amz-Date' value, which can be either a header or paramter + + Note AWS supports parsing the Date header also, but this is not + currently supported here as it will require some format mangling + So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it + can be used to match against the YYYYMMDD format provided in the + credential scope. + see: + http://docs.aws.amazon.com/general/latest/gr/ + sigv4-date-handling.html + """ + try: + return headers['X-Amz-Date'] + except KeyError: + return params.get('X-Amz-Date') + + def canonical_header_str(): + # Get the list of headers to include, from either + # - the Authorization header (SignedHeaders key) + # - the X-Amz-SignedHeaders query parameter + headers_lower = dict((k.lower().strip(), v.strip()) + for (k, v) in headers.iteritems()) + header_list = [] + sh_str = auth_param('SignedHeaders') + for h in sh_str.split(';'): + if h not in headers_lower: + continue + if h == 'host': + # Note we discard any port suffix + header_list.append('%s:%s' % + (h, headers_lower[h].split(':')[0])) + else: + header_list.append('%s:%s' % (h, headers_lower[h])) + return '\n'.join(header_list) + '\n' + + # Create canonical request: + # http://docs.aws.amazon.com/general/latest/gr/ + # sigv4-create-canonical-request.html + # Get parameters and headers in expected string format + cr = "\n".join((verb.upper(), path, + self._canonical_qs(params), + canonical_header_str(), + auth_param('SignedHeaders'), + body_hash)) + + # Check the date, reject any request where the X-Amz-Date doesn't + # match the credential scope + credential = auth_param('Credential') + credential_split = credential.split('/') + credential_scope = '/'.join(credential_split[1:]) + credential_date = credential_split[1] + param_date = date_param() + if not param_date.startswith(credential_date): + raise Exception('Request date mismatch error') + + # Create the string to sign + # http://docs.aws.amazon.com/general/latest/gr/ + # sigv4-create-string-to-sign.html + string_to_sign = '\n'.join(('AWS4-HMAC-SHA256', + param_date, + credential_scope, + hashlib.sha256(cr).hexdigest())) + + # Calculate the derived key, this requires a datestamp, region + # and service, which can be extracted from the credential scope + (req_region, req_service) = credential_split[2:4] + s_key = signature_key(credential_date, req_region, req_service) + # Finally calculate the signature! + signature = hmac.new(s_key, self._get_utf8_value(string_to_sign), + hashlib.sha256).hexdigest() + return signature diff --git a/tests/test_ec2utils.py b/tests/test_ec2utils.py index b0bd4df..a3c36fa 100644 --- a/tests/test_ec2utils.py +++ b/tests/test_ec2utils.py @@ -27,6 +27,36 @@ class Ec2SignerTest(testtools.TestCase): self.secret = '89cdf9e94e2643cab35b8b8ac5a51f83' self.signer = Ec2Signer(self.secret) + def tearDown(self): + super(Ec2SignerTest, self).tearDown() + + def test_v4_creds_header(self): + auth_str = 'AWS4-HMAC-SHA256 blah' + credentials = {'host': '127.0.0.1', + 'verb': 'GET', + 'path': '/v1/', + 'params': {}, + 'headers': {'Authorization': auth_str}} + self.assertTrue(self.signer._v4_creds(credentials)) + + def test_v4_creds_param(self): + credentials = {'host': '127.0.0.1', + 'verb': 'GET', + 'path': '/v1/', + 'params': {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256'}, + 'headers': {}} + self.assertTrue(self.signer._v4_creds(credentials)) + + def test_v4_creds_false(self): + credentials = {'host': '127.0.0.1', + 'verb': 'GET', + 'path': '/v1/', + 'params': {'SignatureVersion': '0', + 'AWSAccessKeyId': self.access, + 'Timestamp': '2012-11-27T11:47:02Z', + 'Action': 'Foo'}} + self.assertFalse(self.signer._v4_creds(credentials)) + def test_generate_0(self): """Test generate function for v0 signature""" credentials = {'host': '127.0.0.1', @@ -40,8 +70,6 @@ class Ec2SignerTest(testtools.TestCase): expected = 'SmXQEZAUdQw5glv5mX8mmixBtas=' self.assertEqual(signature, expected) - pass - def test_generate_1(self): """Test generate function for v1 signature""" credentials = {'host': '127.0.0.1', @@ -75,3 +103,43 @@ class Ec2SignerTest(testtools.TestCase): signature = self.signer.generate(credentials) expected = 'ZqCxMI4ZtTXWI175743mJ0hy/Gc=' self.assertEqual(signature, expected) + + def test_generate_v4(self): + """ + Test v4 generator with data from AWS docs example, see: + http://docs.aws.amazon.com/general/latest/gr/ + sigv4-create-canonical-request.html + and + http://docs.aws.amazon.com/general/latest/gr/ + sigv4-signed-request-examples.html + """ + # Create a new signer object with the AWS example key + secret = 'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY' + signer = Ec2Signer(secret) + + body_hash = ('b6359072c78d70ebee1e81adcbab4f0' + '1bf2c23245fa365ef83fe8f1f955085e2') + auth_str = ('AWS4-HMAC-SHA256 ' + 'Credential=AKIAIOSFODNN7EXAMPLE/20110909/' + 'us-east-1/iam/aws4_request,' + 'SignedHeaders=content-type;host;x-amz-date,') + headers = {'Content-type': + 'application/x-www-form-urlencoded; charset=utf-8', + 'X-Amz-Date': '20110909T233600Z', + 'Host': 'iam.amazonaws.com', + 'Authorization': auth_str} + # Note the example in the AWS docs is inconsistent, previous + # examples specify no query string, but the final POST example + # does, apparently incorrectly since an empty parameter list + # aligns all steps and the final signature with the examples + params = {} + credentials = {'host': 'iam.amazonaws.com', + 'verb': 'POST', + 'path': '/', + 'params': params, + 'headers': headers, + 'body_hash': body_hash} + signature = signer.generate(credentials) + expected = ('ced6826de92d2bdeed8f846f0bf508e8' + '559e98e4b0199114b84c54174deb456c') + self.assertEqual(signature, expected) |