summaryrefslogtreecommitdiff
path: root/test/unit/common/middleware/s3api/test_acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/common/middleware/s3api/test_acl.py')
-rw-r--r--test/unit/common/middleware/s3api/test_acl.py230
1 files changed, 230 insertions, 0 deletions
diff --git a/test/unit/common/middleware/s3api/test_acl.py b/test/unit/common/middleware/s3api/test_acl.py
new file mode 100644
index 000000000..3e3ed17c7
--- /dev/null
+++ b/test/unit/common/middleware/s3api/test_acl.py
@@ -0,0 +1,230 @@
+# Copyright (c) 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import mock
+
+from cStringIO import StringIO
+from hashlib import md5
+
+from swift.common.swob import Request, HTTPAccepted
+from swift.common.middleware.s3api.etree import fromstring, tostring, \
+ Element, SubElement, XMLNS_XSI
+from swift.common.middleware.s3api.s3response import InvalidArgument
+from swift.common.middleware.s3api.acl_utils import handle_acl_header
+
+from test.unit.common.middleware.s3api import S3ApiTestCase
+from test.unit.common.middleware.s3api.helpers import UnreadableInput
+from test.unit.common.middleware.s3api.test_s3_acl import s3acl
+
+
+class TestS3ApiAcl(S3ApiTestCase):
+
+ def setUp(self):
+ super(TestS3ApiAcl, self).setUp()
+ # All ACL API should be called against to existing bucket.
+ self.swift.register('PUT', '/v1/AUTH_test/bucket',
+ HTTPAccepted, {}, None)
+
+ def _check_acl(self, owner, body):
+ elem = fromstring(body, 'AccessControlPolicy')
+ permission = elem.find('./AccessControlList/Grant/Permission').text
+ self.assertEqual(permission, 'FULL_CONTROL')
+ name = elem.find('./AccessControlList/Grant/Grantee/ID').text
+ self.assertEqual(name, owner)
+
+ def test_bucket_acl_GET(self):
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'GET'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header()})
+ status, headers, body = self.call_s3api(req)
+ self._check_acl('test:tester', body)
+
+ def test_bucket_acl_PUT(self):
+ elem = Element('AccessControlPolicy')
+ owner = SubElement(elem, 'Owner')
+ SubElement(owner, 'ID').text = 'id'
+ acl = SubElement(elem, 'AccessControlList')
+ grant = SubElement(acl, 'Grant')
+ grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
+ grantee.set('{%s}type' % XMLNS_XSI, 'Group')
+ SubElement(grantee, 'URI').text = \
+ 'http://acs.amazonaws.com/groups/global/AllUsers'
+ SubElement(grant, 'Permission').text = 'READ'
+
+ xml = tostring(elem)
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header()},
+ body=xml)
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(status.split()[0], '200')
+
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT',
+ 'wsgi.input': StringIO(xml)},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header(),
+ 'Transfer-Encoding': 'chunked'})
+ self.assertIsNone(req.content_length)
+ self.assertIsNone(req.message_length())
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(status.split()[0], '200')
+
+ def test_bucket_canned_acl_PUT(self):
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header(),
+ 'X-AMZ-ACL': 'public-read'})
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(status.split()[0], '200')
+
+ @s3acl(s3acl_only=True)
+ def test_bucket_canned_acl_PUT_with_s3acl(self):
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header(),
+ 'X-AMZ-ACL': 'public-read'})
+ with mock.patch('swift.common.middleware.s3api.s3request.'
+ 'handle_acl_header') as mock_handler:
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(status.split()[0], '200')
+ self.assertEqual(mock_handler.call_count, 0)
+
+ def test_bucket_fails_with_both_acl_header_and_xml_PUT(self):
+ elem = Element('AccessControlPolicy')
+ owner = SubElement(elem, 'Owner')
+ SubElement(owner, 'ID').text = 'id'
+ acl = SubElement(elem, 'AccessControlList')
+ grant = SubElement(acl, 'Grant')
+ grantee = SubElement(grant, 'Grantee', nsmap={'xsi': XMLNS_XSI})
+ grantee.set('{%s}type' % XMLNS_XSI, 'Group')
+ SubElement(grantee, 'URI').text = \
+ 'http://acs.amazonaws.com/groups/global/AllUsers'
+ SubElement(grant, 'Permission').text = 'READ'
+
+ xml = tostring(elem)
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header(),
+ 'X-AMZ-ACL': 'public-read'},
+ body=xml)
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(self._get_error_code(body),
+ 'UnexpectedContent')
+
+ def _test_put_no_body(self, use_content_length=False,
+ use_transfer_encoding=False, string_to_md5=''):
+ content_md5 = md5(string_to_md5).digest().encode('base64').strip()
+ with UnreadableInput(self) as fake_input:
+ req = Request.blank(
+ '/bucket?acl',
+ environ={
+ 'REQUEST_METHOD': 'PUT',
+ 'wsgi.input': fake_input},
+ headers={
+ 'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header(),
+ 'Content-MD5': content_md5},
+ body='')
+ if not use_content_length:
+ req.environ.pop('CONTENT_LENGTH')
+ if use_transfer_encoding:
+ req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(status, '400 Bad Request')
+ self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader')
+ self.assertEqual(self._get_error_message(body),
+ 'Your request was missing a required header.')
+ self.assertIn('<MissingHeaderName>x-amz-acl</MissingHeaderName>', body)
+
+ @s3acl
+ def test_bucket_fails_with_neither_acl_header_nor_xml_PUT(self):
+ self._test_put_no_body()
+ self._test_put_no_body(string_to_md5='test')
+ self._test_put_no_body(use_content_length=True)
+ self._test_put_no_body(use_content_length=True, string_to_md5='test')
+ self._test_put_no_body(use_transfer_encoding=True)
+ self._test_put_no_body(use_transfer_encoding=True, string_to_md5='zz')
+
+ def test_object_acl_GET(self):
+ req = Request.blank('/bucket/object?acl',
+ environ={'REQUEST_METHOD': 'GET'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header()})
+ status, headers, body = self.call_s3api(req)
+ self._check_acl('test:tester', body)
+
+ def test_invalid_xml(self):
+ req = Request.blank('/bucket?acl',
+ environ={'REQUEST_METHOD': 'PUT'},
+ headers={'Authorization': 'AWS test:tester:hmac',
+ 'Date': self.get_date_header()},
+ body='invalid')
+ status, headers, body = self.call_s3api(req)
+ self.assertEqual(self._get_error_code(body), 'MalformedACLError')
+
+ def test_handle_acl_header(self):
+ def check_generated_acl_header(acl, targets):
+ req = Request.blank('/bucket',
+ headers={'X-Amz-Acl': acl})
+ handle_acl_header(req)
+ for target in targets:
+ self.assertTrue(target[0] in req.headers)
+ self.assertEqual(req.headers[target[0]], target[1])
+
+ check_generated_acl_header('public-read',
+ [('X-Container-Read', '.r:*,.rlistings')])
+ check_generated_acl_header('public-read-write',
+ [('X-Container-Read', '.r:*,.rlistings'),
+ ('X-Container-Write', '.r:*')])
+ check_generated_acl_header('private',
+ [('X-Container-Read', '.'),
+ ('X-Container-Write', '.')])
+
+ @s3acl(s3acl_only=True)
+ def test_handle_acl_header_with_s3acl(self):
+ def check_generated_acl_header(acl, targets):
+ req = Request.blank('/bucket',
+ headers={'X-Amz-Acl': acl})
+ for target in targets:
+ self.assertTrue(target not in req.headers)
+ self.assertTrue('HTTP_X_AMZ_ACL' in req.environ)
+ # TODO: add transration and assertion for s3acl
+
+ check_generated_acl_header('public-read',
+ ['X-Container-Read'])
+ check_generated_acl_header('public-read-write',
+ ['X-Container-Read', 'X-Container-Write'])
+ check_generated_acl_header('private',
+ ['X-Container-Read', 'X-Container-Write'])
+
+ def test_handle_acl_with_invalid_header_string(self):
+ req = Request.blank('/bucket', headers={'X-Amz-Acl': 'invalid'})
+ with self.assertRaises(InvalidArgument) as cm:
+ handle_acl_header(req)
+ self.assertTrue('argument_name' in cm.exception.info)
+ self.assertEqual(cm.exception.info['argument_name'], 'x-amz-acl')
+ self.assertTrue('argument_value' in cm.exception.info)
+ self.assertEqual(cm.exception.info['argument_value'], 'invalid')
+
+
+if __name__ == '__main__':
+ unittest.main()