summaryrefslogtreecommitdiff
path: root/test/functional/test_swiftclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_swiftclient.py')
-rw-r--r--test/functional/test_swiftclient.py557
1 files changed, 557 insertions, 0 deletions
diff --git a/test/functional/test_swiftclient.py b/test/functional/test_swiftclient.py
new file mode 100644
index 0000000..9a74c63
--- /dev/null
+++ b/test/functional/test_swiftclient.py
@@ -0,0 +1,557 @@
+# Copyright (c) 2014 Christian Schwede <christian.schwede@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+import time
+from io import BytesIO
+
+import six
+from six.moves import configparser
+
+import swiftclient
+
+
+class TestFunctional(unittest.TestCase):
+
+ def __init__(self, *args, **kwargs):
+ super(TestFunctional, self).__init__(*args, **kwargs)
+ self.skip_tests = False
+ self._get_config()
+
+ self.test_data = b'42' * 10
+ self.etag = '2704306ec982238d85d4b235c925d58e'
+
+ self.containername = "functional-tests-container-%s" % int(time.time())
+ self.containername_2 = self.containername + '_second'
+ self.containername_3 = self.containername + '_third'
+ self.objectname = "functional-tests-object-%s" % int(time.time())
+ self.objectname_2 = self.objectname + '_second'
+
+ def _get_config(self):
+ config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE',
+ '/etc/swift/test.conf')
+ config = configparser.ConfigParser({'auth_version': '1'})
+ config.read(config_file)
+ self.config = config
+ if config.has_section('func_test'):
+ if config.has_option('func_test', 'auth_uri'):
+ self.auth_url = config.get('func_test', 'auth_uri')
+ try:
+ self.auth_version = config.get('func_test', 'auth_version')
+ except configparser.NoOptionError:
+ last_piece = self.auth_url.rstrip('/').rsplit('/', 1)[1]
+ if last_piece.endswith('.0'):
+ last_piece = last_piece[:-2]
+ if last_piece in ('1', '2', '3'):
+ self.auth_version = last_piece
+ else:
+ raise
+ else:
+ auth_host = config.get('func_test', 'auth_host')
+ auth_port = config.getint('func_test', 'auth_port')
+ auth_ssl = config.getboolean('func_test', 'auth_ssl')
+ auth_prefix = config.get('func_test', 'auth_prefix')
+ self.auth_version = config.get('func_test', 'auth_version')
+ self.auth_url = ""
+ if auth_ssl:
+ self.auth_url += "https://"
+ else:
+ self.auth_url += "http://"
+ self.auth_url += "%s:%s%s" % (
+ auth_host, auth_port, auth_prefix)
+ if self.auth_version == "1":
+ self.auth_url += 'v1.0'
+
+ try:
+ self.account_username = config.get('func_test',
+ 'account_username')
+ except configparser.NoOptionError:
+ account = config.get('func_test', 'account')
+ username = config.get('func_test', 'username')
+ self.account_username = "%s:%s" % (account, username)
+ self.password = config.get('func_test', 'password')
+ else:
+ self.skip_tests = True
+
+ def _get_connection(self):
+ """
+ Subclasses may override to use different connection setup
+ """
+ return swiftclient.Connection(
+ self.auth_url, self.account_username, self.password,
+ auth_version=self.auth_version)
+
+ def setUp(self):
+ super(TestFunctional, self).setUp()
+ if self.skip_tests:
+ self.skipTest('SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG')
+
+ self.conn = self._get_connection()
+ self.conn.put_container(self.containername)
+ self.conn.put_container(self.containername_2)
+ self.conn.put_object(
+ self.containername, self.objectname, self.test_data)
+ self.conn.put_object(
+ self.containername, self.objectname_2, self.test_data)
+
+ def tearDown(self):
+ super(TestFunctional, self).tearDown()
+ for obj in [self.objectname, self.objectname_2]:
+ try:
+ self.conn.delete_object(self.containername, obj)
+ except swiftclient.ClientException:
+ pass
+
+ for container in [self.containername,
+ self.containername_2,
+ self.containername_3,
+ self.containername + '_segments']:
+ try:
+ self.conn.delete_container(container)
+ except swiftclient.ClientException:
+ pass
+ self.conn.close()
+
+ def _check_account_headers(self, headers):
+ headers_to_check = [
+ 'content-length',
+ 'x-account-object-count',
+ 'x-timestamp',
+ 'x-trans-id',
+ 'date',
+ 'x-account-bytes-used',
+ 'x-account-container-count',
+ 'content-type',
+ 'accept-ranges',
+ ]
+ for h in headers_to_check:
+ self.assertIn(h, headers)
+ self.assertTrue(headers[h])
+
+ def test_stat_account(self):
+ headers = self.conn.head_account()
+ self._check_account_headers(headers)
+
+ def test_list_account(self):
+ headers, containers = self.conn.get_account()
+ self._check_account_headers(headers)
+
+ self.assertTrue(len(containers))
+ test_container = [c
+ for c in containers
+ if c.get('name') == self.containername][0]
+ self.assertTrue(test_container.get('bytes') >= 0)
+ self.assertTrue(test_container.get('count') >= 0)
+
+ # Check if list limit is working
+ headers, containers = self.conn.get_account(limit=1)
+ self.assertEqual(1, len(containers))
+
+ # Check full listing
+ headers, containers = self.conn.get_account(limit=1, full_listing=True)
+ self.assertTrue(len(containers) >= 2) # there might be more containers
+
+ # Test marker
+ headers, containers = self.conn.get_account(marker=self.containername)
+ self.assertTrue(len(containers) >= 1)
+ self.assertEqual(self.containername_2, containers[0].get('name'))
+
+ # Test prefix
+ _, containers = self.conn.get_account(prefix='dne')
+ self.assertEqual(0, len(containers))
+
+ # Test delimiter
+ _, containers = self.conn.get_account(
+ prefix=self.containername, delimiter='_')
+ self.assertEqual(2, len(containers))
+ self.assertEqual(self.containername, containers[0].get('name'))
+ self.assertTrue(
+ self.containername_2.startswith(containers[1].get('subdir')))
+
+ def _check_container_headers(self, headers):
+ self.assertTrue(headers.get('content-length'))
+ self.assertTrue(headers.get('x-container-object-count'))
+ self.assertTrue(headers.get('x-timestamp'))
+ self.assertTrue(headers.get('x-trans-id'))
+ self.assertTrue(headers.get('date'))
+ self.assertTrue(headers.get('x-container-bytes-used'))
+ self.assertTrue(headers.get('x-container-object-count'))
+ self.assertTrue(headers.get('content-type'))
+ self.assertTrue(headers.get('accept-ranges'))
+
+ def test_stat_container(self):
+ headers = self.conn.head_container(self.containername)
+ self._check_container_headers(headers)
+
+ def test_list_container(self):
+ headers, objects = self.conn.get_container(self.containername)
+ self._check_container_headers(headers)
+ self.assertTrue(len(objects))
+ test_object = [o
+ for o in objects
+ if o.get('name') == self.objectname][0]
+ self.assertEqual(len(self.test_data), test_object.get('bytes'))
+ self.assertEqual(self.etag, test_object.get('hash'))
+ self.assertEqual('application/octet-stream',
+ test_object.get('content_type'))
+
+ # Check if list limit is working
+ headers, objects = self.conn.get_container(self.containername, limit=1)
+ self.assertEqual(1, len(objects))
+
+ # Check full listing
+ headers, objects = self.conn.get_container(
+ self.containername, limit=1, full_listing=True)
+ self.assertEqual(2, len(objects))
+
+ # Test marker
+ headers, objects = self.conn.get_container(
+ self.containername, marker=self.objectname)
+ self.assertEqual(1, len(objects))
+ self.assertEqual(self.objectname_2, objects[0].get('name'))
+
+ def test_create_container(self):
+ self.conn.put_container(self.containername_3)
+ self.assertTrue(self.conn.head_container(self.containername_3))
+
+ def test_delete(self):
+ self.conn.delete_object(self.containername, self.objectname)
+ self.conn.delete_object(self.containername, self.objectname_2)
+ self.conn.delete_container(self.containername)
+
+ # Container HEAD will raise an exception if container doesn't exist
+ # which is only possible if previous requests succeeded
+ self.assertRaises(
+ swiftclient.ClientException,
+ self.conn.head_container,
+ self.containername)
+
+ def test_upload_object(self):
+ # Object with content from string
+ self.conn.put_object(
+ self.containername, self.objectname, contents=self.test_data)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('application/octet-stream',
+ hdrs.get('content-type'))
+
+ # Same but with content_type
+ self.conn.put_object(
+ self.containername, self.objectname,
+ content_type='text/plain', contents=self.test_data)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('text/plain',
+ hdrs.get('content-type'))
+
+ # Same but with content-type in headers
+ self.conn.put_object(
+ self.containername, self.objectname,
+ headers={'Content-Type': 'text/plain'}, contents=self.test_data)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('text/plain',
+ hdrs.get('content-type'))
+
+ # content_type rewrites content-type in headers
+ self.conn.put_object(
+ self.containername, self.objectname,
+ content_type='image/jpeg',
+ headers={'Content-Type': 'text/plain'}, contents=self.test_data)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('image/jpeg',
+ hdrs.get('content-type'))
+
+ # Same but with content-length
+ self.conn.put_object(
+ self.containername, self.objectname,
+ contents=self.test_data, content_length=len(self.test_data))
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('application/octet-stream', hdrs.get('content-type'))
+
+ # Content from File-like object
+ fileobj = BytesIO(self.test_data)
+ self.conn.put_object(
+ self.containername, self.objectname, contents=fileobj)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('application/octet-stream', hdrs.get('content-type'))
+
+ # Content from File-like object, but read in chunks
+ fileobj = BytesIO(self.test_data)
+ self.conn.put_object(
+ self.containername, self.objectname,
+ contents=fileobj, content_length=len(self.test_data),
+ chunk_size=10)
+ hdrs = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual(str(len(self.test_data)),
+ hdrs.get('content-length'))
+ self.assertEqual(self.etag, hdrs.get('etag'))
+ self.assertEqual('application/octet-stream', hdrs.get('content-type'))
+
+ # Wrong etag arg, should raise an exception
+ self.assertRaises(
+ swiftclient.ClientException,
+ self.conn.put_object,
+ self.containername, self.objectname,
+ contents=self.test_data, etag='invalid')
+
+ def test_download_object(self):
+ # Download whole object
+ hdrs, body = self.conn.get_object(self.containername, self.objectname)
+ self.assertEqual(self.test_data, body)
+
+ # Download in chunks, should return a generator
+ hdrs, body = self.conn.get_object(
+ self.containername, self.objectname,
+ resp_chunk_size=10)
+ downloaded_contents = b''
+ while True:
+ try:
+ chunk = next(body)
+ except StopIteration:
+ break
+ downloaded_contents += chunk
+ self.assertEqual(self.test_data, downloaded_contents)
+
+ # Download in chunks, should also work with read
+ hdrs, body = self.conn.get_object(
+ self.containername, self.objectname,
+ resp_chunk_size=10)
+ num_bytes = 5
+ downloaded_contents = body.read(num_bytes)
+ self.assertEqual(num_bytes, len(downloaded_contents))
+ downloaded_contents += body.read()
+ self.assertEqual(self.test_data, downloaded_contents)
+
+ def test_put_object_using_generator(self):
+ # verify that put using a generator yielding empty strings does not
+ # cause connection to be closed
+ def data():
+ yield b"should"
+ yield b""
+ yield b" tolerate"
+ yield b""
+ yield b" empty chunks"
+
+ self.conn.put_object(
+ self.containername, self.objectname, data())
+ hdrs, body = self.conn.get_object(self.containername, self.objectname)
+ self.assertEqual(b"should tolerate empty chunks", body)
+
+ def test_download_object_retry_chunked(self):
+ resp_chunk_size = 2
+ hdrs, body = self.conn.get_object(self.containername,
+ self.objectname,
+ resp_chunk_size=resp_chunk_size)
+ data = next(body)
+ self.assertEqual(self.test_data[:resp_chunk_size], data)
+ self.assertTrue(1, self.conn.attempts)
+ for chunk in body.resp:
+ # Flush remaining data from underlying response
+ # (simulate a dropped connection)
+ pass
+ # Trigger the retry
+ for chunk in body:
+ data += chunk
+ self.assertEqual(self.test_data, data)
+ self.assertEqual(2, self.conn.attempts)
+
+ def test_download_object_retry_chunked_auth_failure(self):
+ resp_chunk_size = 2
+ self.conn.token = 'invalid'
+ hdrs, body = self.conn.get_object(self.containername,
+ self.objectname,
+ resp_chunk_size=resp_chunk_size)
+ self.assertEqual(2, self.conn.attempts)
+ for chunk in body.resp:
+ # Flush remaining data from underlying response
+ # (simulate a dropped connection)
+ pass
+
+ self.conn.token = 'invalid'
+ data = next(body)
+ self.assertEqual(4, self.conn.attempts)
+
+ for chunk in body:
+ data += chunk
+
+ self.assertEqual(self.test_data, data)
+ self.assertEqual(4, self.conn.attempts)
+
+ def test_download_object_non_chunked(self):
+ hdrs, body = self.conn.get_object(self.containername, self.objectname)
+ data = body
+ self.assertEqual(self.test_data, data)
+ self.assertTrue(1, self.conn.attempts)
+
+ hdrs, body = self.conn.get_object(self.containername, self.objectname,
+ resp_chunk_size=0)
+ data = body
+ self.assertEqual(self.test_data, data)
+ self.assertTrue(1, self.conn.attempts)
+
+ def test_post_account(self):
+ self.conn.post_account({'x-account-meta-data': 'Something'})
+ headers = self.conn.head_account()
+ self.assertEqual('Something', headers.get('x-account-meta-data'))
+
+ def test_post_container(self):
+ self.conn.post_container(
+ self.containername, {'x-container-meta-color': 'Something'})
+
+ headers = self.conn.head_container(self.containername)
+ self.assertEqual('Something', headers.get('x-container-meta-color'))
+
+ def test_post_object(self):
+ self.conn.post_object(self.containername,
+ self.objectname,
+ {'x-object-meta-color': 'Something',
+ 'x-object-meta-uni': b'\xd8\xaa'.decode('utf8'),
+ 'x-object-meta-int': 123,
+ 'x-object-meta-float': 45.67,
+ 'x-object-meta-bool': False})
+
+ headers = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual('Something', headers.get('x-object-meta-color'))
+ self.assertEqual(b'\xd8\xaa'.decode('utf-8'),
+ headers.get('x-object-meta-uni'))
+ self.assertEqual('123', headers.get('x-object-meta-int'))
+ self.assertEqual('45.67', headers.get('x-object-meta-float'))
+ self.assertEqual('False', headers.get('x-object-meta-bool'))
+
+ def test_post_object_unicode_header_name(self):
+ self.conn.post_object(self.containername,
+ self.objectname,
+ {u'x-object-meta-\U0001f44d': u'\U0001f44d'})
+
+ # Note that we can't actually read this header back on py3; see
+ # https://bugs.python.org/issue37093
+ # We'll have to settle for just testing that the POST doesn't blow up
+ # with a UnicodeDecodeError
+ if six.PY2:
+ headers = self.conn.head_object(
+ self.containername, self.objectname)
+ self.assertIn(u'x-object-meta-\U0001f44d', headers)
+ self.assertEqual(u'\U0001f44d',
+ headers.get(u'x-object-meta-\U0001f44d'))
+
+ def test_copy_object(self):
+ self.conn.put_object(
+ self.containername, self.objectname, self.test_data)
+ self.conn.copy_object(self.containername,
+ self.objectname,
+ headers={'x-object-meta-color': 'Something'})
+
+ headers = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual('Something', headers.get('x-object-meta-color'))
+
+ self.conn.copy_object(self.containername,
+ self.objectname,
+ headers={'x-object-meta-taste': 'Second'})
+
+ headers = self.conn.head_object(self.containername, self.objectname)
+ self.assertEqual('Something', headers.get('x-object-meta-color'))
+ self.assertEqual('Second', headers.get('x-object-meta-taste'))
+
+ destination = "/%s/%s" % (self.containername, self.objectname_2)
+ self.conn.copy_object(self.containername,
+ self.objectname,
+ destination,
+ headers={'x-object-meta-taste': 'Second'})
+ headers, data = self.conn.get_object(self.containername,
+ self.objectname_2)
+ self.assertEqual(self.test_data, data)
+ self.assertEqual('Something', headers.get('x-object-meta-color'))
+ self.assertEqual('Second', headers.get('x-object-meta-taste'))
+
+ destination = "/%s/%s" % (self.containername, self.objectname_2)
+ self.conn.copy_object(self.containername,
+ self.objectname,
+ destination,
+ headers={'x-object-meta-color': 'Else'},
+ fresh_metadata=True)
+
+ headers = self.conn.head_object(self.containername, self.objectname_2)
+ self.assertEqual('Else', headers.get('x-object-meta-color'))
+ self.assertIsNone(headers.get('x-object-meta-taste'))
+
+ def test_get_capabilities(self):
+ resp = self.conn.get_capabilities()
+ self.assertTrue(resp.get('swift'))
+
+
+class TestUsingKeystone(TestFunctional):
+ """
+ Repeat tests using os_options parameter to Connection.
+ """
+
+ def _get_connection(self):
+ account = username = password = None
+ if self.auth_version not in ('2', '3'):
+ self.skipTest('SKIPPING KEYSTONE-SPECIFIC FUNCTIONAL TESTS')
+ try:
+ account = self.config.get('func_test', 'account')
+ username = self.config.get('func_test', 'username')
+ password = self.config.get('func_test', 'password')
+ except Exception:
+ self.skipTest('SKIPPING KEYSTONE-SPECIFIC FUNCTIONAL TESTS' +
+ ' - NO CONFIG')
+ os_options = {'tenant_name': account}
+ return swiftclient.Connection(
+ self.auth_url, username, password, auth_version=self.auth_version,
+ os_options=os_options)
+
+
+class TestUsingKeystoneV3(TestFunctional):
+ """
+ Repeat tests using a keystone user with domain specified.
+ """
+
+ def _get_connection(self):
+ account = username = password = project_domain = user_domain = None
+ if self.auth_version != '3':
+ self.skipTest('SKIPPING KEYSTONE-V3-SPECIFIC FUNCTIONAL TESTS')
+ try:
+ account = self.config.get('func_test', 'account4')
+ username = self.config.get('func_test', 'username4')
+ user_domain = self.config.get('func_test', 'domain4')
+ project_domain = self.config.get('func_test', 'domain4')
+ password = self.config.get('func_test', 'password4')
+ except Exception:
+ self.skipTest('SKIPPING KEYSTONE-V3-SPECIFIC FUNCTIONAL TESTS' +
+ ' - NO CONFIG')
+
+ os_options = {'project_name': account,
+ 'project_domain_name': project_domain,
+ 'user_domain_name': user_domain}
+ return swiftclient.Connection(self.auth_url, username, password,
+ auth_version=self.auth_version,
+ os_options=os_options)