diff options
author | Zuul <zuul@review.opendev.org> | 2023-01-22 08:36:19 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-01-22 08:36:19 +0000 |
commit | 069865e2b8c80000d0df197a33ec34510b54002b (patch) | |
tree | 77e9bd935006d160f2286520c353e95ce95c870a | |
parent | 9afd3103e71bfb6ff9f58032ddfde95c22e90091 (diff) | |
parent | c834e7a53d5a33a3fd13ffd954e6f4f4ee953dfc (diff) | |
download | swift-069865e2b8c80000d0df197a33ec34510b54002b.tar.gz |
Merge "s3api: Prevent XXE injections" into stable/victoriastable/victoria
-rw-r--r-- | swift/common/middleware/s3api/etree.py | 2 | ||||
-rw-r--r-- | test/functional/s3api/test_xxe_injection.py | 233 | ||||
-rw-r--r-- | test/unit/common/middleware/s3api/test_multi_delete.py | 40 |
3 files changed, 274 insertions, 1 deletions
diff --git a/swift/common/middleware/s3api/etree.py b/swift/common/middleware/s3api/etree.py index 987b84a14..e16b75340 100644 --- a/swift/common/middleware/s3api/etree.py +++ b/swift/common/middleware/s3api/etree.py @@ -130,7 +130,7 @@ class _Element(lxml.etree.ElementBase): parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element) -parser = lxml.etree.XMLParser() +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True) parser.set_element_class_lookup(parser_lookup) Element = parser.makeelement diff --git a/test/functional/s3api/test_xxe_injection.py b/test/functional/s3api/test_xxe_injection.py new file mode 100644 index 000000000..46297a968 --- /dev/null +++ b/test/functional/s3api/test_xxe_injection.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python +# Copyright (c) 2022 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests + +import botocore + +import test.functional as tf +from test.functional.s3api import S3ApiBaseBoto3 + + +def setUpModule(): + tf.setup_package() + + +def tearDownModule(): + tf.teardown_package() + + +class TestS3ApiXxeInjection(S3ApiBaseBoto3): + + def setUp(self): + super(TestS3ApiXxeInjection, self).setUp() + self.bucket = 'test-s3api-xxe-injection' + + def _create_bucket(self, **kwargs): + resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs) + response_metadata = resp.pop('ResponseMetadata', {}) + self.assertEqual(200, response_metadata.get('HTTPStatusCode')) + + @staticmethod + def _clear_data(request, **_kwargs): + if 'Content-MD5' in request.headers: + del request.headers['Content-MD5'] + request.data = b'' + + def _presign_url(self, method, key=None, **kwargs): + params = { + 'Bucket': self.bucket + } + if key: + params['Key'] = key + params.update(kwargs) + try: + # https://github.com/boto/boto3/issues/2192 + self.conn.meta.events.register( + 'before-sign.s3.*', self._clear_data) + return self.conn.generate_presigned_url( + method, Params=params, ExpiresIn=60) + finally: + self.conn.meta.events.unregister( + 'before-sign.s3.*', self._clear_data) + + def test_put_bucket_acl(self): + if not tf.cluster_info['s3api'].get('s3_acl'): + self.skipTest('s3_acl must be enabled') + + self._create_bucket() + + url = self._presign_url('put_bucket_acl') + resp = requests.put(url, data=""" +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> +<Owner> + <DisplayName>test:tester</DisplayName> + <ID>test:tester</ID> +</Owner> +<AccessControlList> + <Grant> + <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"> + <DisplayName>name&xxe;</DisplayName> + <ID>id&xxe;</ID> + </Grantee> + <Permission>WRITE</Permission> + </Grant> +</AccessControlList> +</AccessControlPolicy> +""") # noqa: E501 + self.assertEqual(200, resp.status_code) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + + acl = self.conn.get_bucket_acl(Bucket=self.bucket) + response_metadata = acl.pop('ResponseMetadata', {}) + self.assertEqual(200, response_metadata.get('HTTPStatusCode')) + self.assertDictEqual({ + 'Owner': { + 'DisplayName': 'test:tester', + 'ID': 'test:tester' + }, + 'Grants': [ + { + 'Grantee': { + 'DisplayName': 'id', + 'ID': 'id', + 'Type': 'CanonicalUser' + }, + 'Permission': 'WRITE' + } + ] + }, acl) + + def test_create_bucket(self): + url = self._presign_url('create_bucket') + resp = requests.put(url, data=""" +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <LocationConstraint>&xxe;</LocationConstraint> +</CreateBucketConfiguration> +""") # noqa: E501 + self.assertEqual(400, resp.status_code) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + + self.assertRaisesRegexp( + botocore.exceptions.ClientError, 'Not Found', + self.conn.head_bucket, Bucket=self.bucket) + + def test_delete_objects(self): + self._create_bucket() + + url = self._presign_url( + 'delete_objects', + Delete={ + 'Objects': [ + { + 'Key': 'string', + 'VersionId': 'string' + } + ] + }) + body = """ +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <Object> + <Key>&xxe;</Key> + </Object> +</Delete> +""" + body = body.encode('utf-8') + resp = requests.post(url, data=body) + self.assertEqual(400, resp.status_code, resp.content) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + + def test_complete_multipart_upload(self): + self._create_bucket() + + resp = self.conn.create_multipart_upload( + Bucket=self.bucket, Key='test') + response_metadata = resp.pop('ResponseMetadata', {}) + self.assertEqual(200, response_metadata.get('HTTPStatusCode')) + uploadid = resp.get('UploadId') + + try: + url = self._presign_url( + 'complete_multipart_upload', + Key='key', + MultipartUpload={ + 'Parts': [ + { + 'ETag': 'string', + 'PartNumber': 1 + } + ], + }, + UploadId=uploadid) + resp = requests.post(url, data=""" +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <Part> + <ETag>"{uploadid}"</ETag> + <PartNumber>&xxe;</PartNumber> + </Part> +</CompleteMultipartUpload> +""") # noqa: E501 + self.assertEqual(404, resp.status_code) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + + resp = requests.post(url, data=""" +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <Part> + <ETag>"&xxe;"</ETag> + <PartNumber>1</PartNumber> + </Part> +</CompleteMultipartUpload> +""") # noqa: E501 + self.assertEqual(404, resp.status_code) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + finally: + resp = self.conn.abort_multipart_upload( + Bucket=self.bucket, Key='test', UploadId=uploadid) + response_metadata = resp.pop('ResponseMetadata', {}) + self.assertEqual(204, response_metadata.get('HTTPStatusCode')) + + def test_put_bucket_versioning(self): + self._create_bucket() + + url = self._presign_url( + 'put_bucket_versioning', + VersioningConfiguration={ + 'Status': 'Enabled' + }) + resp = requests.put(url, data=""" +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]> +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <Status>&xxe;</Status> +</VersioningConfiguration> +""") # noqa: E501 + self.assertEqual(400, resp.status_code) + self.assertNotIn(b'xxe', resp.content) + self.assertNotIn(b'[swift-hash]', resp.content) + + versioning = self.conn.get_bucket_versioning(Bucket=self.bucket) + response_metadata = versioning.pop('ResponseMetadata', {}) + self.assertEqual(200, response_metadata.get('HTTPStatusCode')) + self.assertDictEqual({}, versioning) diff --git a/test/unit/common/middleware/s3api/test_multi_delete.py b/test/unit/common/middleware/s3api/test_multi_delete.py index 02ee5d32f..1bff1956d 100644 --- a/test/unit/common/middleware/s3api/test_multi_delete.py +++ b/test/unit/common/middleware/s3api/test_multi_delete.py @@ -445,6 +445,7 @@ class TestS3ApiMultiDelete(S3ApiTestCase): body=body) status, headers, body = self.call_s3api(req) self.assertEqual(status.split()[0], '200') + self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body) def _test_object_multi_DELETE(self, account): self.keys = ['Key1', 'Key2'] @@ -500,6 +501,45 @@ class TestS3ApiMultiDelete(S3ApiTestCase): elem = fromstring(body) self.assertEqual(len(elem.findall('Deleted')), len(self.keys)) + def test_object_multi_DELETE_with_system_entity(self): + self.keys = ['Key1', 'Key2'] + self.swift.register( + 'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0], + swob.HTTPNotFound, {}, None) + self.swift.register( + 'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1], + swob.HTTPNoContent, {}, None) + + elem = Element('Delete') + for key in self.keys: + obj = SubElement(elem, 'Object') + SubElement(obj, 'Key').text = key + body = tostring(elem, use_s3ns=False) + body = body.replace( + b'?>\n', + b'?>\n<!DOCTYPE foo ' + b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n', + ).replace(b'>Key1<', b'>Key1&ent;<') + content_md5 = ( + base64.b64encode(md5(body).digest()) + .strip()) + + req = Request.blank('/bucket?delete', + environ={'REQUEST_METHOD': 'POST'}, + headers={ + 'Authorization': 'AWS test:full_control:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5}, + body=body) + req.date = datetime.now() + req.content_type = 'text/plain' + + status, headers, body = self.call_s3api(req) + self.assertEqual(status, '200 OK', body) + self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body) + self.assertNotIn(b'root:/root', body) + self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body) + def _test_no_body(self, use_content_length=False, use_transfer_encoding=False, string_to_md5=b''): content_md5 = base64.b64encode(md5(string_to_md5).digest()).strip() |