diff options
Diffstat (limited to 'test/unit/common/middleware/test_encrypter_decrypter.py')
-rw-r--r-- | test/unit/common/middleware/test_encrypter_decrypter.py | 149 |
1 files changed, 129 insertions, 20 deletions
diff --git a/test/unit/common/middleware/test_encrypter_decrypter.py b/test/unit/common/middleware/test_encrypter_decrypter.py index 701c995b3..9f7e0a6f2 100644 --- a/test/unit/common/middleware/test_encrypter_decrypter.py +++ b/test/unit/common/middleware/test_encrypter_decrypter.py @@ -18,7 +18,7 @@ import unittest import uuid from swift.common import storage_policy -from swift.common.middleware import encrypter, decrypter, keymaster +from swift.common.middleware import encrypter, decrypter, keymaster, copy from swift.common.middleware.crypto_utils import load_crypto_meta, Crypto from swift.common.ring import Ring from swift.common.swob import Request @@ -48,10 +48,6 @@ class TestCryptoPipelineChanges(unittest.TestCase): cls._test_context = None def setUp(self): - self.container_name = uuid.uuid4().hex - self.container_path = 'http://localhost:8080/v1/a/' + \ - self.container_name - self.object_path = self.container_path + '/o' self.plaintext = 'unencrypted body content' self.plaintext_etag = md5hex(self.plaintext) @@ -63,15 +59,21 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.km = keymaster.KeyMaster(enc, TEST_KEYMASTER_CONF) self.crypto_app = decrypter.Decrypter(self.km, {}) - def _create_container(self, app, policy_name='one'): + def _create_container(self, app, policy_name='one', container_path=None): + if not container_path: + # choose new container name so that the policy can be specified + self.container_name = uuid.uuid4().hex + self.container_path = 'http://foo:8080/v1/a/' + self.container_name + self.object_path = self.container_path + '/o' + container_path = self.container_path req = Request.blank( - self.container_path, method='PUT', + container_path, method='PUT', headers={'X-Storage-Policy': policy_name}) resp = req.get_response(app) self.assertEqual('201 Created', resp.status) # sanity check req = Request.blank( - self.container_path, method='HEAD', + container_path, method='HEAD', headers={'X-Storage-Policy': policy_name}) resp = req.get_response(app) self.assertEqual(policy_name, resp.headers['X-Storage-Policy']) @@ -92,25 +94,35 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual('202 Accepted', resp.status) return resp - def _check_GET_and_HEAD(self, app): - req = Request.blank(self.object_path, method='GET') + def _copy_object(self, app, destination): + req = Request.blank(self.object_path, method='COPY', + headers={'Destination': destination}) + resp = req.get_response(app) + self.assertEqual('201 Created', resp.status) + self.assertEqual(self.plaintext_etag, resp.headers['Etag']) + return resp + + def _check_GET_and_HEAD(self, app, object_path=None): + object_path = object_path or self.object_path + req = Request.blank(object_path, method='GET') resp = req.get_response(app) self.assertEqual('200 OK', resp.status) self.assertEqual(self.plaintext, resp.body) self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit']) - req = Request.blank(self.object_path, method='HEAD') + req = Request.blank(object_path, method='HEAD') resp = req.get_response(app) self.assertEqual('200 OK', resp.status) self.assertEqual('', resp.body) self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit']) - def _check_match_requests(self, method, app): + def _check_match_requests(self, method, app, object_path=None): + object_path = object_path or self.object_path # verify conditional match requests expected_body = self.plaintext if method == 'GET' else '' # If-Match matches - req = Request.blank(self.object_path, method=method, + req = Request.blank(object_path, method=method, headers={'If-Match': '"%s"' % self.plaintext_etag}) resp = req.get_response(app) self.assertEqual('200 OK', resp.status) @@ -119,7 +131,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit']) # If-Match wildcard - req = Request.blank(self.object_path, method=method, + req = Request.blank(object_path, method=method, headers={'If-Match': '*'}) resp = req.get_response(app) self.assertEqual('200 OK', resp.status) @@ -128,7 +140,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit']) # If-Match does not match - req = Request.blank(self.object_path, method=method, + req = Request.blank(object_path, method=method, headers={'If-Match': '"not the etag"'}) resp = req.get_response(app) self.assertEqual('412 Precondition Failed', resp.status) @@ -137,7 +149,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): # If-None-Match matches req = Request.blank( - self.object_path, method=method, + object_path, method=method, headers={'If-None-Match': '"%s"' % self.plaintext_etag}) resp = req.get_response(app) self.assertEqual('304 Not Modified', resp.status) @@ -145,7 +157,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual(self.plaintext_etag, resp.headers['Etag']) # If-None-Match wildcard - req = Request.blank(self.object_path, method=method, + req = Request.blank(object_path, method=method, headers={'If-None-Match': '*'}) resp = req.get_response(app) self.assertEqual('304 Not Modified', resp.status) @@ -153,7 +165,7 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual(self.plaintext_etag, resp.headers['Etag']) # If-None-Match does not match - req = Request.blank(self.object_path, method=method, + req = Request.blank(object_path, method=method, headers={'If-None-Match': '"not the etag"'}) resp = req.get_response(app) self.assertEqual('200 OK', resp.status) @@ -161,9 +173,10 @@ class TestCryptoPipelineChanges(unittest.TestCase): self.assertEqual(self.plaintext_etag, resp.headers['Etag']) self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit']) - def _check_listing(self, app, expect_mismatch=False): + def _check_listing(self, app, expect_mismatch=False, container_path=None): + container_path = container_path or self.container_path req = Request.blank( - self.container_path, method='GET', query_string='format=json') + container_path, method='GET', query_string='format=json') resp = req.get_response(app) self.assertEqual('200 OK', resp.status) listing = json.loads(resp.body) @@ -447,6 +460,102 @@ class TestCryptoPipelineChanges(unittest.TestCase): frags = [frag for node, frag in frag_selection] self.assertEqual(exp_body, policy.pyeclib_driver.decode(frags)) + def _test_copy_encrypted_to_encrypted( + self, src_policy_name, dest_policy_name): + self._create_container(self.proxy_app, policy_name=src_policy_name) + self._put_object(self.crypto_app, self.plaintext) + self._post_object(self.crypto_app) + + copy_crypto_app = copy.ServerSideCopyMiddleware(self.crypto_app, {}) + + dest_container = uuid.uuid4().hex + dest_container_path = 'http://localhost:8080/v1/a/' + dest_container + self._create_container(copy_crypto_app, policy_name=dest_policy_name, + container_path=dest_container_path) + dest_obj_path = dest_container_path + '/o' + dest = '/%s/%s' % (dest_container, 'o') + self._copy_object(copy_crypto_app, dest) + + self._check_GET_and_HEAD(copy_crypto_app, object_path=dest_obj_path) + self._check_listing( + copy_crypto_app, container_path=dest_container_path) + self._check_match_requests( + 'GET', copy_crypto_app, object_path=dest_obj_path) + self._check_match_requests( + 'HEAD', copy_crypto_app, object_path=dest_obj_path) + + def test_copy_encrypted_to_encrypted(self): + self._test_copy_encrypted_to_encrypted('ec', 'ec') + self._test_copy_encrypted_to_encrypted('one', 'ec') + self._test_copy_encrypted_to_encrypted('ec', 'one') + self._test_copy_encrypted_to_encrypted('one', 'one') + + def _test_copy_encrypted_to_unencrypted( + self, src_policy_name, dest_policy_name): + self._create_container(self.proxy_app, policy_name=src_policy_name) + self._put_object(self.crypto_app, self.plaintext) + self._post_object(self.crypto_app) + + # make a pipeline with encryption disabled, use it to copy object + enc = encrypter.Encrypter( + self.proxy_app, {'disable_encryption': 'true'}) + km = keymaster.KeyMaster(enc, TEST_KEYMASTER_CONF) + dec = decrypter.Decrypter(km, {}) + copy_app = copy.ServerSideCopyMiddleware(dec, {}) + + dest_container = uuid.uuid4().hex + dest_container_path = 'http://localhost:8080/v1/a/' + dest_container + self._create_container(self.crypto_app, policy_name=dest_policy_name, + container_path=dest_container_path) + dest_obj_path = dest_container_path + '/o' + dest = '/%s/%s' % (dest_container, 'o') + self._copy_object(copy_app, dest) + + self._check_GET_and_HEAD(copy_app, object_path=dest_obj_path) + self._check_GET_and_HEAD(self.proxy_app, object_path=dest_obj_path) + self._check_listing(copy_app, container_path=dest_container_path) + self._check_listing(self.proxy_app, container_path=dest_container_path) + self._check_match_requests( + 'GET', self.proxy_app, object_path=dest_obj_path) + self._check_match_requests( + 'HEAD', self.proxy_app, object_path=dest_obj_path) + + def test_copy_encrypted_to_unencrypted(self): + self._test_copy_encrypted_to_unencrypted('ec', 'ec') + self._test_copy_encrypted_to_unencrypted('one', 'ec') + self._test_copy_encrypted_to_unencrypted('ec', 'one') + self._test_copy_encrypted_to_unencrypted('one', 'one') + + def _test_copy_unencrypted_to_encrypted( + self, src_policy_name, dest_policy_name): + self._create_container(self.proxy_app, policy_name=src_policy_name) + self._put_object(self.proxy_app, self.plaintext) + self._post_object(self.proxy_app) + + copy_crypto_app = copy.ServerSideCopyMiddleware(self.crypto_app, {}) + + dest_container = uuid.uuid4().hex + dest_container_path = 'http://localhost:8080/v1/a/' + dest_container + self._create_container(copy_crypto_app, policy_name=dest_policy_name, + container_path=dest_container_path) + dest_obj_path = dest_container_path + '/o' + dest = '/%s/%s' % (dest_container, 'o') + self._copy_object(copy_crypto_app, dest) + + self._check_GET_and_HEAD(copy_crypto_app, object_path=dest_obj_path) + self._check_listing( + copy_crypto_app, container_path=dest_container_path) + self._check_match_requests( + 'GET', copy_crypto_app, object_path=dest_obj_path) + self._check_match_requests( + 'HEAD', copy_crypto_app, object_path=dest_obj_path) + + def test_copy_unencrypted_to_encrypted(self): + self._test_copy_unencrypted_to_encrypted('ec', 'ec') + self._test_copy_unencrypted_to_encrypted('one', 'ec') + self._test_copy_unencrypted_to_encrypted('ec', 'one') + self._test_copy_unencrypted_to_encrypted('one', 'one') + class TestCryptoPipelineChangesFastPost(TestCryptoPipelineChanges): @classmethod |