summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/sample.conf10
-rw-r--r--test/unit/test_multithreading.py2
-rw-r--r--test/unit/test_shell.py46
-rw-r--r--test/unit/test_swiftclient.py106
-rw-r--r--test/unit/test_utils.py52
5 files changed, 181 insertions, 35 deletions
diff --git a/test/sample.conf b/test/sample.conf
index 2b19de4..04dfdd9 100644
--- a/test/sample.conf
+++ b/test/sample.conf
@@ -1,16 +1,10 @@
[func_test]
# sample config
-auth_host = 127.0.0.1
-auth_port = 8080
-auth_ssl = no
-auth_prefix = /auth/
+auth_uri = http://127.0.0.1:8080/auth/v1.0/
## sample config for Swift with Keystone v2 API
# For keystone v3 change auth_version to 3 and auth_prefix to /v3/
#auth_version = 2
-#auth_host = localhost
-#auth_port = 5000
-#auth_ssl = no
-#auth_prefix = /v2.0/
+#auth_uri = http://localhost:5000/v2.0/
# You may want to run tests against endpoints that use development certs
# without installing the CA system-wide. Use this to trust an extra set
diff --git a/test/unit/test_multithreading.py b/test/unit/test_multithreading.py
index 8944d48..e9732cd 100644
--- a/test/unit/test_multithreading.py
+++ b/test/unit/test_multithreading.py
@@ -37,7 +37,7 @@ class ThreadTestCase(unittest.TestCase):
self.got_args_kwargs.put((args, kwargs))
if item == 'sleep':
- sleep(1)
+ sleep(.1)
if item == 'go boom':
raise Exception('I went boom!')
diff --git a/test/unit/test_shell.py b/test/unit/test_shell.py
index a63d16b..f94e5e2 100644
--- a/test/unit/test_shell.py
+++ b/test/unit/test_shell.py
@@ -2395,6 +2395,8 @@ class TestParsing(TestBase):
'object_storage_url', 'project_domain_id',
'user_id', 'user_domain_id', 'tenant_id',
'service_type', 'project_id', 'auth_token',
+ 'auth_type', 'application_credential_id',
+ 'application_credential_secret',
'project_domain_name']
for key in expected_os_opts_keys:
self.assertIn(key, actual_os_opts_dict)
@@ -2686,6 +2688,50 @@ class TestParsing(TestBase):
swiftclient.shell.main(args)
self.assertIn('Auth version 3 requires OS_AUTH_URL', str(cm.exception))
+ def test_command_args_v3applicationcredential(self):
+ result = [None, None]
+ fake_command = self._make_fake_command(result)
+ opts = {"auth_version": "3"}
+ os_opts = {
+ "auth_type": "v3applicationcredential",
+ "application_credential_id": "proejct_id",
+ "application_credential_secret": "secret",
+ "auth_url": "http://example.com:5000/v3"}
+
+ args = _make_args("stat", opts, os_opts)
+ with mock.patch('swiftclient.shell.st_stat', fake_command):
+ swiftclient.shell.main(args)
+ self.assertEqual(['stat'], result[1])
+ with mock.patch('swiftclient.shell.st_stat', fake_command):
+ args = args + ["container_name"]
+ swiftclient.shell.main(args)
+ self.assertEqual(["stat", "container_name"], result[1])
+
+ def test_insufficient_args_v3applicationcredential(self):
+ opts = {"auth_version": "3"}
+ os_opts = {
+ "auth_type": "v3applicationcredential",
+ "application_credential_secret": "secret",
+ "auth_url": "http://example.com:5000/v3"}
+
+ args = _make_args("stat", opts, os_opts)
+ with self.assertRaises(SystemExit) as cm:
+ swiftclient.shell.main(args)
+ self.assertIn('Auth version 3 (application credential) requires',
+ str(cm.exception))
+
+ os_opts = {
+ "auth_type": "v3oidcpassword",
+ "application_credential_id": "proejct_id",
+ "application_credential_secret": "secret",
+ "auth_url": "http://example.com:5000/v3"}
+
+ args = _make_args("stat", opts, os_opts)
+ with self.assertRaises(SystemExit) as cm:
+ swiftclient.shell.main(args)
+ self.assertIn('Only "v3applicationcredential" is supported for',
+ str(cm.exception))
+
def test_password_prompt(self):
def do_test(opts, os_opts, auth_version):
args = _make_args("stat", opts, os_opts)
diff --git a/test/unit/test_swiftclient.py b/test/unit/test_swiftclient.py
index 2d45deb..2644e33 100644
--- a/test/unit/test_swiftclient.py
+++ b/test/unit/test_swiftclient.py
@@ -84,6 +84,23 @@ class TestClientException(unittest.TestCase):
self.assertIs(True, hasattr(exc, key))
self.assertEqual(getattr(exc, key), value)
+ def test_transaction_id_from_headers(self):
+ exc = c.ClientException('test')
+ self.assertIsNone(exc.transaction_id)
+
+ exc = c.ClientException('test', http_response_headers={})
+ self.assertIsNone(exc.transaction_id)
+
+ exc = c.ClientException('test', http_response_headers={
+ 'X-Trans-Id': 'some-id'})
+ self.assertEqual(exc.transaction_id, 'some-id')
+ self.assertIn('(txn: some-id)', str(exc))
+
+ exc = c.ClientException('test', http_response_headers={
+ 'X-Openstack-Request-Id': 'some-other-id'})
+ self.assertEqual(exc.transaction_id, 'some-other-id')
+ self.assertIn('(txn: some-other-id)', str(exc))
+
class MockHttpResponse(object):
def __init__(self, status=0, headers=None, verify=False):
@@ -562,6 +579,63 @@ class TestGetAuth(MockHttpTest):
self.assertTrue(url.startswith("http"))
self.assertTrue(token)
+ def test_auth_v3applicationcredential(self):
+ from keystoneauth1 import exceptions as ksauthexceptions
+
+ os_options = {
+ "auth_type": "v3applicationcredential",
+ "application_credential_id": "proejct_id",
+ "application_credential_secret": "secret"}
+
+ class FakeEndpointData(object):
+ catalog_url = 'http://swift.cluster/v1/KEY_project_id'
+
+ class FakeKeystoneuth1v3Session(object):
+
+ def __init__(self, auth):
+ self.auth = auth
+ self.token = 'token'
+
+ def get_token(self):
+ if self.auth.auth_url == 'http://keystone:5000/v3':
+ return self.token
+ elif self.auth.auth_url == 'http://keystone:9000/v3':
+ raise ksauthexceptions.AuthorizationFailure
+ else:
+ raise ksauthexceptions.Unauthorized
+
+ def get_endpoint_data(self, service_type, endpoint_type, **kwargs):
+ return FakeEndpointData()
+
+ mock_sess = FakeKeystoneuth1v3Session
+ with mock.patch('keystoneauth1.session.Session', mock_sess):
+ url, token = c.get_auth('http://keystone:5000', '', '',
+ os_options=os_options,
+ auth_version="3")
+
+ self.assertTrue(url.startswith("http"))
+ self.assertEqual(url, 'http://swift.cluster/v1/KEY_project_id')
+ self.assertEqual(token, 'token')
+
+ with mock.patch('keystoneauth1.session.Session', mock_sess):
+ with self.assertRaises(c.ClientException) as exc_mgr:
+ url, token = c.get_auth('http://keystone:9000', '', '',
+ os_options=os_options,
+ auth_version="3")
+
+ body = 'Unauthorized. Check application credential id and secret.'
+ body = 'Authorization Failure. Cannot authorize API client.'
+ self.assertEqual(exc_mgr.exception.__str__()[-89:], body)
+
+ with mock.patch('keystoneauth1.session.Session', mock_sess):
+ with self.assertRaises(c.ClientException) as exc_mgr:
+ url, token = c.get_auth('http://keystone:5000', '', '',
+ os_options=os_options,
+ auth_version="2")
+
+ body = 'Unauthorized. Check application credential id and secret.'
+ self.assertEqual(exc_mgr.exception.__str__()[-89:], body)
+
def test_get_keystone_client_2_0(self):
# check the correct auth version is passed to get_auth_keystone
os_options = {'tenant_name': 'asdf'}
@@ -2035,6 +2109,38 @@ class TestConnection(MockHttpTest):
self.assertEqual(request['headers']['x-auth-token'],
'tToken')
+ def test_url_mapping(self):
+ conn = c.Connection()
+ uri_versions = {
+ 'http://storage.test.com':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/v1':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/v1/':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/swift':
+ 'http://storage.test.com/swift/info',
+ 'http://storage.test.com/swift/':
+ 'http://storage.test.com/swift/info',
+ 'http://storage.test.com/v1.0':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/swift/v1.0':
+ 'http://storage.test.com/swift/info',
+ 'http://storage.test.com/v111':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/v111/test':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/v1/test':
+ 'http://storage.test.com/info',
+ 'http://storage.test.com/swift/v1.0/test':
+ 'http://storage.test.com/swift/info',
+ 'http://storage.test.com/v1.0/test':
+ 'http://storage.test.com/info'}
+ for uri_k, uri_v in uri_versions.items():
+ self.assertEqual(conn._map_url(uri_k), uri_v)
+
def test_get_capabilities(self):
conn = c.Connection()
with mock.patch('swiftclient.client.get_capabilities') as get_cap:
diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py
index 97abc44..cbee82b 100644
--- a/test/unit/test_utils.py
+++ b/test/unit/test_utils.py
@@ -521,15 +521,15 @@ class TestLengthWrapper(unittest.TestCase):
with tempfile.NamedTemporaryFile(mode='wb') as f:
f.write(b'a' * 100)
f.flush()
- contents = open(f.name, 'rb')
- data = u.LengthWrapper(contents, 42, True)
- s = b'a' * 42
- read_data = b''.join(iter(data.read, ''))
+ with open(f.name, 'rb') as contents:
+ data = u.LengthWrapper(contents, 42, True)
+ s = b'a' * 42
+ read_data = b''.join(iter(data.read, ''))
- self.assertEqual(42, len(data))
- self.assertEqual(42, len(read_data))
- self.assertEqual(s, read_data)
- self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
+ self.assertEqual(42, len(data))
+ self.assertEqual(42, len(read_data))
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
def test_segmented_file(self):
with tempfile.NamedTemporaryFile(mode='wb') as f:
@@ -539,24 +539,24 @@ class TestLengthWrapper(unittest.TestCase):
f.write((c * segment_length).encode())
f.flush()
for i, c in enumerate(segments):
- contents = open(f.name, 'rb')
- contents.seek(i * segment_length)
- data = u.LengthWrapper(contents, segment_length, True)
- read_data = b''.join(iter(data.read, ''))
- s = (c * segment_length).encode()
-
- self.assertEqual(segment_length, len(data))
- self.assertEqual(segment_length, len(read_data))
- self.assertEqual(s, read_data)
- self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
-
- data.reset()
- self.assertEqual(md5().hexdigest(), data.get_md5sum())
- read_data = b''.join(iter(data.read, ''))
- self.assertEqual(segment_length, len(data))
- self.assertEqual(segment_length, len(read_data))
- self.assertEqual(s, read_data)
- self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
+ with open(f.name, 'rb') as contents:
+ contents.seek(i * segment_length)
+ data = u.LengthWrapper(contents, segment_length, True)
+ read_data = b''.join(iter(data.read, ''))
+ s = (c * segment_length).encode()
+
+ self.assertEqual(segment_length, len(data))
+ self.assertEqual(segment_length, len(read_data))
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
+
+ data.reset()
+ self.assertEqual(md5().hexdigest(), data.get_md5sum())
+ read_data = b''.join(iter(data.read, ''))
+ self.assertEqual(segment_length, len(data))
+ self.assertEqual(segment_length, len(read_data))
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
class TestGroupers(unittest.TestCase):