diff options
author | Tim Burke <tim.burke@gmail.com> | 2016-04-01 15:12:20 -0700 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2016-10-24 01:52:37 +0200 |
commit | a38efb6031efda7a076886066b6993cdb144f6a3 (patch) | |
tree | 2ace5bec3fa5e2585d53c1b929c5e3e3aa42cd43 /tests | |
parent | 73e4296a389893c750f7c70a477ec828e4360197 (diff) | |
download | python-swiftclient-a38efb6031efda7a076886066b6993cdb144f6a3.tar.gz |
Add v1password keystoneauth plugin
This lets us use Keystone sessions against endpoints like swauth and
tempauth with code like:
import keystoneauth1.loading
import keystoneauth1.session
import swiftclient
loader = keystoneauth1.loading.get_plugin_loader('v1password')
auth_plugin = loader.load_from_options(
auth_url='http://saio:8080/auth/v1.0',
username='test:tester',
password='testing')
keystone_session = keystoneauth1.session.Session(auth_plugin)
conn = swiftclient.Connection(session=keystone_session)
The plugin includes an optional project_name option, which may be used
to override the swift account from the storage url that was returned.
Additionally, it includes enough infrastructure to support some commands
in python-openstackclient>=3.0:
export OS_AUTH_TYPE=v1password
export OS_AUTH_URL=http://saio:8080/auth/v1.0
export OS_PROJECT_NAME=AUTH_test2
export OS_USERNAME=test:tester
export OS_PASSWORD=testing
openstack token issue
openstack catalog list
openstack catalog show object-store
openstack object store account show
openstack container list
openstack container create <container>
openstack container save <container>
openstack container show <container>
openstack container delete <container>
openstack object list <container>
openstack object create <container> <file>
openstack object save <container> <object>
opsentack object show <container> <object>
openstack object delete <container> <object>
Change-Id: Ia963dc44415f72a6518227e86d9528a987e07491
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/test_authv1.py | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/tests/unit/test_authv1.py b/tests/unit/test_authv1.py new file mode 100644 index 0000000..968109a --- /dev/null +++ b/tests/unit/test_authv1.py @@ -0,0 +1,246 @@ +# Copyright 2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +import datetime +import json +import mock +import unittest +from keystoneauth1 import plugin +from keystoneauth1 import loading +from keystoneauth1 import exceptions +from swiftclient import authv1 + + +class TestDataNoAccount(object): + options = dict( + auth_url='http://saio:8080/auth/v1.0', + username='test:tester', + password='testing') + storage_url = 'http://saio:8080/v1/AUTH_test' + expected_endpoint = storage_url + token = 'token' + + +class TestDataWithAccount(object): + options = dict( + auth_url='http://saio:8080/auth/v1.0', + username='test2:tester2', + project_name='SOME_other_account', + password='testing2') + storage_url = 'http://saio:8080/v1/AUTH_test2' + expected_endpoint = 'http://saio:8080/v1/SOME_other_account' + token = 'other_token' + + +class TestPluginLoading(TestDataNoAccount, unittest.TestCase): + def test_can_load(self): + loader = loading.get_plugin_loader('v1password') + self.assertIsInstance(loader, authv1.PasswordLoader) + + auth_plugin = loader.load_from_options(**self.options) + self.assertIsInstance(auth_plugin, authv1.PasswordPlugin) + + self.assertEqual(self.options['auth_url'], auth_plugin.auth_url) + self.assertEqual(self.options['username'], auth_plugin.user) + self.assertEqual(self.options.get('project_name'), auth_plugin.account) + self.assertEqual(self.options['password'], auth_plugin.key) + + def test_get_state(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.assertIsNone(auth_plugin.get_auth_state()) + + with mock.patch('swiftclient.authv1.time.time', return_value=1234.56): + auth_plugin.auth_ref = authv1.AccessInfoV1( + self.options['auth_url'], + self.storage_url, + self.options.get('project_name'), + self.options['username'], + self.token, + 60) + + expected = json.dumps({ + 'auth_url': self.options['auth_url'], + 'username': self.options['username'], + 'account': self.options.get('project_name'), + 'issued': 1234.56, + 'storage_url': self.storage_url, + 'auth_token': self.token, + 'expires': 1234.56 + 60, + }, sort_keys=True) + self.assertEqual(expected, auth_plugin.auth_ref.get_state()) + self.assertEqual(expected, auth_plugin.get_auth_state()) + + def test_set_state(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.assertIsNone(auth_plugin.auth_ref) + + auth_plugin.auth_ref = object() + auth_plugin.set_auth_state(None) + self.assertIsNone(auth_plugin.get_auth_state()) + + state = json.dumps({ + 'auth_url': self.options['auth_url'], + 'username': self.options['username'], + 'account': self.options.get('project_name'), + 'issued': 1234.56, + 'storage_url': self.storage_url, + 'auth_token': self.token, + 'expires': None, + }, sort_keys=True) + auth_plugin.set_auth_state(state) + self.assertIsInstance(auth_plugin.auth_ref, authv1.AccessInfoV1) + + self.assertEqual(self.options['username'], + auth_plugin.auth_ref.username) + self.assertEqual(self.options['auth_url'], + auth_plugin.auth_ref.auth_url) + self.assertEqual(self.storage_url, auth_plugin.auth_ref.storage_url) + self.assertEqual(self.options.get('project_name'), auth_plugin.account) + self.assertEqual(self.token, auth_plugin.auth_ref.auth_token) + self.assertEqual(1234.56, auth_plugin.auth_ref._issued) + self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued)) + self.assertIsNone(auth_plugin.auth_ref._expires) + self.assertIsNone(auth_plugin.auth_ref.expires) + + +class TestPluginLoadingWithAccount(TestDataWithAccount, TestPluginLoading): + pass + + +class TestPlugin(TestDataNoAccount, unittest.TestCase): + def setUp(self): + self.mock_session = mock.MagicMock() + self.mock_response = self.mock_session.get.return_value + self.mock_response.status_code = 200 + self.mock_response.headers = { + 'X-Auth-Token': self.token, + 'X-Storage-Url': self.storage_url, + } + + def test_get_access(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + with mock.patch('swiftclient.authv1.time.time', return_value=1234.56): + access = auth_plugin.get_access(self.mock_session) + + self.assertEqual(self.mock_session.get.mock_calls, [mock.call( + self.options['auth_url'], authenticated=False, log=False, headers={ + 'X-Auth-User': self.options['username'], + 'X-Auth-Key': self.options['password'], + })]) + + self.assertEqual(self.options['username'], access.username) + # `openstack token issue` requires a user_id property + self.assertEqual(self.options['username'], access.user_id) + self.assertEqual(self.storage_url, access.storage_url) + self.assertEqual(self.token, access.auth_token) + self.assertEqual(1234.56, access._issued) + self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued)) + self.assertIsNone(access.expires) + + # `openstack catalog list/show` require a catalog property + catalog = access.service_catalog.catalog + self.assertEqual('swift', catalog[0].get('name')) + self.assertEqual('object-store', catalog[0].get('type')) + self.assertIn('endpoints', catalog[0]) + self.assertIn(self.storage_url, [ + e.get('publicURL') for e in catalog[0]['endpoints']]) + + def test_get_access_with_expiry(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.headers['X-Auth-Token-Expires'] = '78.9' + with mock.patch('swiftclient.authv1.time.time', + return_value=1234.56) as mock_time: + access = auth_plugin.get_access(self.mock_session) + self.assertEqual(1234.56 + 78.9, access._expires) + self.assertIs(datetime.datetime, + type(auth_plugin.auth_ref.expires)) + + self.assertIs(True, access.will_expire_soon(90)) + self.assertIs(False, access.will_expire_soon(60)) + self.assertEqual(3, len(mock_time.mock_calls)) + + def test_get_access_bad_expiry(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.headers['X-Auth-Token-Expires'] = 'foo' + access = auth_plugin.get_access(self.mock_session) + self.assertEqual(None, access.expires) + + self.assertIs(False, access.will_expire_soon(60)) + self.assertIs(False, access.will_expire_soon(1e20)) + + def test_get_access_bad_status(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.status_code = 401 + self.assertRaises(exceptions.InvalidResponse, + auth_plugin.get_access, self.mock_session) + + def test_get_access_missing_token(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.headers.pop('X-Auth-Token') + self.assertRaises(exceptions.InvalidResponse, + auth_plugin.get_access, self.mock_session) + + def test_get_access_accepts_storage_token(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.headers.pop('X-Auth-Token') + self.mock_response.headers['X-Storage-Token'] = 'yet another token' + access = auth_plugin.get_access(self.mock_session) + self.assertEqual('yet another token', access.auth_token) + + def test_get_access_missing_url(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + self.mock_response.headers.pop('X-Storage-Url') + self.assertRaises(exceptions.InvalidResponse, + auth_plugin.get_access, self.mock_session) + + def test_get_endpoint(self): + auth_plugin = authv1.PasswordPlugin(**self.options) + + object_store_endpoint = auth_plugin.get_endpoint( + self.mock_session, service_type='object-store') + self.assertEqual(object_store_endpoint, self.expected_endpoint) + + auth_endpoint = auth_plugin.get_endpoint( + self.mock_session, interface=plugin.AUTH_INTERFACE) + self.assertEqual(auth_endpoint, self.options['auth_url']) + + with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr: + auth_plugin.get_endpoint(self.mock_session) + self.assertEqual('public endpoint for None service not found', + str(exc_mgr.exception)) + + with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr: + auth_plugin.get_endpoint( + self.mock_session, service_type='identity', region_name='DFW') + self.assertEqual( + 'public endpoint for identity service in DFW region not found', + str(exc_mgr.exception)) + + with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr: + auth_plugin.get_endpoint( + self.mock_session, service_type='image', service_name='glance') + self.assertEqual( + 'public endpoint for image service named glance not found', + str(exc_mgr.exception)) + + with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr: + auth_plugin.get_endpoint( + self.mock_session, service_type='compute', service_name='nova', + region_name='IAD') + self.assertEqual('public endpoint for compute service named nova in ' + 'IAD region not found', str(exc_mgr.exception)) + + +class TestPluginWithAccount(TestDataWithAccount, TestPlugin): + pass |