summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/fakes.py71
-rw-r--r--tests/test_base.py48
-rw-r--r--tests/test_client.py18
-rw-r--r--tests/test_http.py74
-rw-r--r--tests/test_service_catalog.py127
-rw-r--r--tests/test_shell.py75
-rw-r--r--tests/test_utils.py74
-rw-r--r--tests/utils.py5
-rw-r--r--tests/v1/__init__.py0
-rw-r--r--tests/v1/fakes.py765
-rw-r--r--tests/v1/test_auth.py297
-rw-r--r--tests/v1/test_shell.py77
-rw-r--r--tests/v1/testfile.txt1
-rw-r--r--tests/v1/utils.py29
15 files changed, 1661 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/fakes.py b/tests/fakes.py
new file mode 100644
index 0000000..248214f
--- /dev/null
+++ b/tests/fakes.py
@@ -0,0 +1,71 @@
+"""
+A fake server that "responds" to API methods with pre-canned responses.
+
+All of these responses come from the spec, so if for some reason the spec's
+wrong the tests might raise AssertionError. I've indicated in comments the
+places where actual behavior differs from the spec.
+"""
+
+
+def assert_has_keys(dict, required=[], optional=[]):
+ keys = dict.keys()
+ for k in required:
+ try:
+ assert k in keys
+ except AssertionError:
+ extra_keys = set(keys).difference(set(required + optional))
+ raise AssertionError("found unexpected keys: %s" %
+ list(extra_keys))
+
+
+class FakeClient(object):
+
+ def assert_called(self, method, url, body=None, pos=-1):
+ """
+ Assert than an API method was just called.
+ """
+ expected = (method, url)
+ called = self.client.callstack[pos][0:2]
+
+ assert self.client.callstack, \
+ "Expected %s %s but no calls were made." % expected
+
+ assert expected == called, 'Expected %s %s; got %s %s' % \
+ (expected + called)
+
+ if body is not None:
+ assert self.client.callstack[pos][2] == body
+
+ def assert_called_anytime(self, method, url, body=None):
+ """
+ Assert than an API method was called anytime in the test.
+ """
+ expected = (method, url)
+
+ assert self.client.callstack, \
+ "Expected %s %s but no calls were made." % expected
+
+ found = False
+ for entry in self.client.callstack:
+ if expected == entry[0:2]:
+ found = True
+ break
+
+ assert found, 'Expected %s %s; got %s' % \
+ (expected, self.client.callstack)
+ if body is not None:
+ try:
+ assert entry[2] == body
+ except AssertionError:
+ print entry[2]
+ print "!="
+ print body
+ raise
+
+ self.client.callstack = []
+
+ def clear_callstack(self):
+ self.client.callstack = []
+
+ def authenticate(self):
+ pass
diff --git a/tests/test_base.py b/tests/test_base.py
new file mode 100644
index 0000000..7eba986
--- /dev/null
+++ b/tests/test_base.py
@@ -0,0 +1,48 @@
+from cinderclient import base
+from cinderclient import exceptions
+from cinderclient.v1 import volumes
+from tests import utils
+from tests.v1 import fakes
+
+
+cs = fakes.FakeClient()
+
+
+class BaseTest(utils.TestCase):
+
+ def test_resource_repr(self):
+ r = base.Resource(None, dict(foo="bar", baz="spam"))
+ self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>")
+
+ def test_getid(self):
+ self.assertEqual(base.getid(4), 4)
+
+ class TmpObject(object):
+ id = 4
+ self.assertEqual(base.getid(TmpObject), 4)
+
+ def test_eq(self):
+ # Two resources of the same type with the same id: equal
+ r1 = base.Resource(None, {'id': 1, 'name': 'hi'})
+ r2 = base.Resource(None, {'id': 1, 'name': 'hello'})
+ self.assertEqual(r1, r2)
+
+ # Two resoruces of different types: never equal
+ r1 = base.Resource(None, {'id': 1})
+ r2 = volumes.Volume(None, {'id': 1})
+ self.assertNotEqual(r1, r2)
+
+ # Two resources with no ID: equal if their info is equal
+ r1 = base.Resource(None, {'name': 'joe', 'age': 12})
+ r2 = base.Resource(None, {'name': 'joe', 'age': 12})
+ self.assertEqual(r1, r2)
+
+ def test_findall_invalid_attribute(self):
+ # Make sure findall with an invalid attribute doesn't cause errors.
+ # The following should not raise an exception.
+ cs.volumes.findall(vegetable='carrot')
+
+ # However, find() should raise an error
+ self.assertRaises(exceptions.NotFound,
+ cs.volumes.find,
+ vegetable='carrot')
diff --git a/tests/test_client.py b/tests/test_client.py
new file mode 100644
index 0000000..f5e4bab
--- /dev/null
+++ b/tests/test_client.py
@@ -0,0 +1,18 @@
+
+import cinderclient.client
+import cinderclient.v1.client
+from tests import utils
+
+
+class ClientTest(utils.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_get_client_class_v1(self):
+ output = cinderclient.client.get_client_class('1')
+ self.assertEqual(output, cinderclient.v1.client.Client)
+
+ def test_get_client_class_unknown(self):
+ self.assertRaises(cinderclient.exceptions.UnsupportedVersion,
+ cinderclient.client.get_client_class, '0')
diff --git a/tests/test_http.py b/tests/test_http.py
new file mode 100644
index 0000000..13d744e
--- /dev/null
+++ b/tests/test_http.py
@@ -0,0 +1,74 @@
+import httplib2
+import mock
+
+from cinderclient import client
+from cinderclient import exceptions
+from tests import utils
+
+
+fake_response = httplib2.Response({"status": 200})
+fake_body = '{"hi": "there"}'
+mock_request = mock.Mock(return_value=(fake_response, fake_body))
+
+
+def get_client():
+ cl = client.HTTPClient("username", "password",
+ "project_id", "auth_test")
+ return cl
+
+
+def get_authed_client():
+ cl = get_client()
+ cl.management_url = "http://example.com"
+ cl.auth_token = "token"
+ return cl
+
+
+class ClientTest(utils.TestCase):
+
+ def test_get(self):
+ cl = get_authed_client()
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ @mock.patch('time.time', mock.Mock(return_value=1234))
+ def test_get_call():
+ resp, body = cl.get("/hi")
+ headers = {"X-Auth-Token": "token",
+ "X-Auth-Project-Id": "project_id",
+ "User-Agent": cl.USER_AGENT,
+ 'Accept': 'application/json',
+ }
+ mock_request.assert_called_with("http://example.com/hi",
+ "GET", headers=headers)
+ # Automatic JSON parsing
+ self.assertEqual(body, {"hi": "there"})
+
+ test_get_call()
+
+ def test_post(self):
+ cl = get_authed_client()
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_post_call():
+ cl.post("/hi", body=[1, 2, 3])
+ headers = {
+ "X-Auth-Token": "token",
+ "X-Auth-Project-Id": "project_id",
+ "Content-Type": "application/json",
+ 'Accept': 'application/json',
+ "User-Agent": cl.USER_AGENT
+ }
+ mock_request.assert_called_with("http://example.com/hi", "POST",
+ headers=headers, body='[1, 2, 3]')
+
+ test_post_call()
+
+ def test_auth_failure(self):
+ cl = get_client()
+
+ # response must not have x-server-management-url header
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ self.assertRaises(exceptions.AuthorizationFailure, cl.authenticate)
+
+ test_auth_call()
diff --git a/tests/test_service_catalog.py b/tests/test_service_catalog.py
new file mode 100644
index 0000000..bb93dcf
--- /dev/null
+++ b/tests/test_service_catalog.py
@@ -0,0 +1,127 @@
+from cinderclient import exceptions
+from cinderclient import service_catalog
+from tests import utils
+
+
+# Taken directly from keystone/content/common/samples/auth.json
+# Do not edit this structure. Instead, grab the latest from there.
+
+SERVICE_CATALOG = {
+ "access": {
+ "token": {
+ "id": "ab48a9efdfedb23ty3494",
+ "expires": "2010-11-01T03:32:15-05:00",
+ "tenant": {
+ "id": "345",
+ "name": "My Project"
+ }
+ },
+ "user": {
+ "id": "123",
+ "name": "jqsmith",
+ "roles": [
+ {
+ "id": "234",
+ "name": "compute:admin",
+ },
+ {
+ "id": "235",
+ "name": "object-store:admin",
+ "tenantId": "1",
+ }
+ ],
+ "roles_links": [],
+ },
+ "serviceCatalog": [
+ {
+ "name": "Cloud Servers",
+ "type": "compute",
+ "endpoints": [
+ {
+ "tenantId": "1",
+ "publicURL": "https://compute1.host/v1/1234",
+ "internalURL": "https://compute1.host/v1/1234",
+ "region": "North",
+ "versionId": "1.0",
+ "versionInfo": "https://compute1.host/v1/",
+ "versionList": "https://compute1.host/"
+ },
+ {
+ "tenantId": "2",
+ "publicURL": "https://compute1.host/v1/3456",
+ "internalURL": "https://compute1.host/v1/3456",
+ "region": "North",
+ "versionId": "1.1",
+ "versionInfo": "https://compute1.host/v1/",
+ "versionList": "https://compute1.host/"
+ },
+ ],
+ "endpoints_links": [],
+ },
+ {
+ "name": "Nova Volumes",
+ "type": "volume",
+ "endpoints": [
+ {
+ "tenantId": "1",
+ "publicURL": "https://volume1.host/v1/1234",
+ "internalURL": "https://volume1.host/v1/1234",
+ "region": "South",
+ "versionId": "1.0",
+ "versionInfo": "uri",
+ "versionList": "uri"
+ },
+ {
+ "tenantId": "2",
+ "publicURL": "https://volume1.host/v1/3456",
+ "internalURL": "https://volume1.host/v1/3456",
+ "region": "South",
+ "versionId": "1.1",
+ "versionInfo": "https://volume1.host/v1/",
+ "versionList": "https://volume1.host/"
+ },
+ ],
+ "endpoints_links": [
+ {
+ "rel": "next",
+ "href": "https://identity1.host/v2.0/endpoints"
+ },
+ ],
+ },
+ ],
+ "serviceCatalog_links": [
+ {
+ "rel": "next",
+ "href": "https://identity.host/v2.0/endpoints?session=2hfh8Ar",
+ },
+ ],
+ },
+}
+
+
+class ServiceCatalogTest(utils.TestCase):
+ def test_building_a_service_catalog(self):
+ sc = service_catalog.ServiceCatalog(SERVICE_CATALOG)
+
+ self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for,
+ service_type='compute')
+ self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'),
+ "https://compute1.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'),
+ "https://compute1.host/v1/3456")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for,
+ "region", "South", service_type='compute')
+
+ def test_alternate_service_type(self):
+ sc = service_catalog.ServiceCatalog(SERVICE_CATALOG)
+
+ self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for,
+ service_type='volume')
+ self.assertEquals(sc.url_for('tenantId', '1', service_type='volume'),
+ "https://volume1.host/v1/1234")
+ self.assertEquals(sc.url_for('tenantId', '2', service_type='volume'),
+ "https://volume1.host/v1/3456")
+
+ self.assertRaises(exceptions.EndpointNotFound, sc.url_for,
+ "region", "North", service_type='volume')
diff --git a/tests/test_shell.py b/tests/test_shell.py
new file mode 100644
index 0000000..902aec5
--- /dev/null
+++ b/tests/test_shell.py
@@ -0,0 +1,75 @@
+import cStringIO
+import os
+import httplib2
+import sys
+
+from cinderclient import exceptions
+import cinderclient.shell
+from tests import utils
+
+
+class ShellTest(utils.TestCase):
+
+ # Patch os.environ to avoid required auth info.
+ def setUp(self):
+ global _old_env
+ fake_env = {
+ 'OS_USERNAME': 'username',
+ 'OS_PASSWORD': 'password',
+ 'OS_TENANT_NAME': 'tenant_name',
+ 'OS_AUTH_URL': 'http://no.where',
+ }
+ _old_env, os.environ = os.environ, fake_env.copy()
+
+ def shell(self, argstr):
+ orig = sys.stdout
+ try:
+ sys.stdout = cStringIO.StringIO()
+ _shell = cinderclient.shell.OpenStackCinderShell()
+ _shell.main(argstr.split())
+ except SystemExit:
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ self.assertEqual(exc_value.code, 0)
+ finally:
+ out = sys.stdout.getvalue()
+ sys.stdout.close()
+ sys.stdout = orig
+
+ return out
+
+ def tearDown(self):
+ global _old_env
+ os.environ = _old_env
+
+ def test_help_unknown_command(self):
+ self.assertRaises(exceptions.CommandError, self.shell, 'help foofoo')
+
+ def test_debug(self):
+ httplib2.debuglevel = 0
+ self.shell('--debug help')
+ assert httplib2.debuglevel == 1
+
+ def test_help(self):
+ required = [
+ '^usage: ',
+ '(?m)^\s+create\s+Add a new volume.',
+ '(?m)^See "cinder help COMMAND" for help on a specific command',
+ ]
+ for argstr in ['--help', 'help']:
+ help_text = self.shell(argstr)
+ for r in required:
+ self.assertRegexpMatches(help_text, r)
+
+ def test_help_on_subcommand(self):
+ required = [
+ '^usage: cinder list',
+ '(?m)^List all the volumes.',
+ ]
+ argstrings = [
+ 'list --help',
+ 'help list',
+ ]
+ for argstr in argstrings:
+ help_text = self.shell(argstr)
+ for r in required:
+ self.assertRegexpMatches(help_text, r)
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..39fb2c9
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,74 @@
+
+from cinderclient import exceptions
+from cinderclient import utils
+from cinderclient import base
+from tests import utils as test_utils
+
+UUID = '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0'
+
+
+class FakeResource(object):
+
+ def __init__(self, _id, properties):
+ self.id = _id
+ try:
+ self.name = properties['name']
+ except KeyError:
+ pass
+ try:
+ self.display_name = properties['display_name']
+ except KeyError:
+ pass
+
+
+class FakeManager(base.ManagerWithFind):
+
+ resource_class = FakeResource
+
+ resources = [
+ FakeResource('1234', {'name': 'entity_one'}),
+ FakeResource(UUID, {'name': 'entity_two'}),
+ FakeResource('4242', {'display_name': 'entity_three'}),
+ FakeResource('5678', {'name': '9876'})
+ ]
+
+ def get(self, resource_id):
+ for resource in self.resources:
+ if resource.id == str(resource_id):
+ return resource
+ raise exceptions.NotFound(resource_id)
+
+ def list(self):
+ return self.resources
+
+
+class FindResourceTestCase(test_utils.TestCase):
+
+ def setUp(self):
+ self.manager = FakeManager(None)
+
+ def test_find_none(self):
+ self.assertRaises(exceptions.CommandError,
+ utils.find_resource,
+ self.manager,
+ 'asdf')
+
+ def test_find_by_integer_id(self):
+ output = utils.find_resource(self.manager, 1234)
+ self.assertEqual(output, self.manager.get('1234'))
+
+ def test_find_by_str_id(self):
+ output = utils.find_resource(self.manager, '1234')
+ self.assertEqual(output, self.manager.get('1234'))
+
+ def test_find_by_uuid(self):
+ output = utils.find_resource(self.manager, UUID)
+ self.assertEqual(output, self.manager.get(UUID))
+
+ def test_find_by_str_name(self):
+ output = utils.find_resource(self.manager, 'entity_one')
+ self.assertEqual(output, self.manager.get('1234'))
+
+ def test_find_by_str_displayname(self):
+ output = utils.find_resource(self.manager, 'entity_three')
+ self.assertEqual(output, self.manager.get('4242'))
diff --git a/tests/utils.py b/tests/utils.py
new file mode 100644
index 0000000..7f1c5dc
--- /dev/null
+++ b/tests/utils.py
@@ -0,0 +1,5 @@
+import unittest2
+
+
+class TestCase(unittest2.TestCase):
+ pass
diff --git a/tests/v1/__init__.py b/tests/v1/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/v1/__init__.py
diff --git a/tests/v1/fakes.py b/tests/v1/fakes.py
new file mode 100644
index 0000000..e430970
--- /dev/null
+++ b/tests/v1/fakes.py
@@ -0,0 +1,765 @@
+# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
+# Copyright 2011 OpenStack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import httplib2
+import urlparse
+
+from cinderclient import client as base_client
+from cinderclient.v1 import client
+from tests import fakes
+
+
+class FakeClient(fakes.FakeClient, client.Client):
+
+ def __init__(self, *args, **kwargs):
+ client.Client.__init__(self, 'username', 'password',
+ 'project_id', 'auth_url')
+ self.client = FakeHTTPClient(**kwargs)
+
+
+class FakeHTTPClient(base_client.HTTPClient):
+
+ def __init__(self, **kwargs):
+ self.username = 'username'
+ self.password = 'password'
+ self.auth_url = 'auth_url'
+ self.callstack = []
+
+ def _cs_request(self, url, method, **kwargs):
+ # Check that certain things are called correctly
+ if method in ['GET', 'DELETE']:
+ assert 'body' not in kwargs
+ elif method == 'PUT':
+ assert 'body' in kwargs
+
+ # Call the method
+ args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
+ kwargs.update(args)
+ munged_url = url.rsplit('?', 1)[0]
+ munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
+ munged_url = munged_url.replace('-', '_')
+
+ callback = "%s_%s" % (method.lower(), munged_url)
+
+ if not hasattr(self, callback):
+ raise AssertionError('Called unknown API method: %s %s, '
+ 'expected fakes method name: %s' %
+ (method, url, callback))
+
+ # Note the call
+ self.callstack.append((method, url, kwargs.get('body', None)))
+
+ status, body = getattr(self, callback)(**kwargs)
+ if hasattr(status, 'items'):
+ return httplib2.Response(status), body
+ else:
+ return httplib2.Response({"status": status}), body
+
+ #
+ # Limits
+ #
+
+ def get_limits(self, **kw):
+ return (200, {"limits": {
+ "rate": [
+ {
+ "uri": "*",
+ "regex": ".*",
+ "limit": [
+ {
+ "value": 10,
+ "verb": "POST",
+ "remaining": 2,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"
+ },
+ {
+ "value": 10,
+ "verb": "PUT",
+ "remaining": 2,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"
+ },
+ {
+ "value": 100,
+ "verb": "DELETE",
+ "remaining": 100,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"
+ }
+ ]
+ },
+ {
+ "uri": "*/servers",
+ "regex": "^/servers",
+ "limit": [
+ {
+ "verb": "POST",
+ "value": 25,
+ "remaining": 24,
+ "unit": "DAY",
+ "next-available": "2011-12-15T22:42:45Z"
+ }
+ ]
+ }
+ ],
+ "absolute": {
+ "maxTotalRAMSize": 51200,
+ "maxServerMeta": 5,
+ "maxImageMeta": 5,
+ "maxPersonality": 5,
+ "maxPersonalitySize": 10240
+ },
+ },
+ })
+
+ #
+ # Servers
+ #
+
+ def get_volumes(self, **kw):
+ return (200, {"volumes": [
+ {'id': 1234, 'name': 'sample-volume'},
+ {'id': 5678, 'name': 'sample-volume2'}
+ ]})
+
+ # TODO(jdg): This will need to change
+ # at the very least it's not complete
+ def get_volumes_detail(self, **kw):
+ return (200, {"volumes": [
+ {'id': 1234,
+ 'name': 'sample-volume',
+ 'attachments': [{'server_id': 1234}]
+ },
+ ]})
+
+ def get_volumes_1234(self, **kw):
+ r = {'volume': self.get_volumes_detail()[1]['volumes'][0]}
+ return (200, r)
+
+ def post_servers(self, body, **kw):
+ assert set(body.keys()) <= set(['server', 'os:scheduler_hints'])
+ fakes.assert_has_keys(body['server'],
+ required=['name', 'imageRef', 'flavorRef'],
+ optional=['metadata', 'personality'])
+ if 'personality' in body['server']:
+ for pfile in body['server']['personality']:
+ fakes.assert_has_keys(pfile, required=['path', 'contents'])
+ return (202, self.get_servers_1234()[1])
+
+ def get_servers_1234(self, **kw):
+ r = {'server': self.get_servers_detail()[1]['servers'][0]}
+ return (200, r)
+
+ def get_servers_5678(self, **kw):
+ r = {'server': self.get_servers_detail()[1]['servers'][1]}
+ return (200, r)
+
+ def put_servers_1234(self, body, **kw):
+ assert body.keys() == ['server']
+ fakes.assert_has_keys(body['server'], optional=['name', 'adminPass'])
+ return (204, None)
+
+ def delete_servers_1234(self, **kw):
+ return (202, None)
+
+ def delete_volumes_1234(self, **kw):
+ return (202, None)
+
+ def delete_servers_1234_metadata_test_key(self, **kw):
+ return (204, None)
+
+ def delete_servers_1234_metadata_key1(self, **kw):
+ return (204, None)
+
+ def delete_servers_1234_metadata_key2(self, **kw):
+ return (204, None)
+
+ def post_servers_1234_metadata(self, **kw):
+ return (204, {'metadata': {'test_key': 'test_value'}})
+
+ def get_servers_1234_diagnostics(self, **kw):
+ return (200, {'data': 'Fake diagnostics'})
+
+ def get_servers_1234_actions(self, **kw):
+ return (200, {'actions': [
+ {
+ 'action': 'rebuild',
+ 'error': None,
+ 'created_at': '2011-12-30 11:45:36'
+ },
+ {
+ 'action': 'reboot',
+ 'error': 'Failed!',
+ 'created_at': '2011-12-30 11:40:29'
+ },
+ ]})
+
+ #
+ # Server Addresses
+ #
+
+ def get_servers_1234_ips(self, **kw):
+ return (200, {'addresses':
+ self.get_servers_1234()[1]['server']['addresses']})
+
+ def get_servers_1234_ips_public(self, **kw):
+ return (200, {'public':
+ self.get_servers_1234_ips()[1]['addresses']['public']})
+
+ def get_servers_1234_ips_private(self, **kw):
+ return (200, {'private':
+ self.get_servers_1234_ips()[1]['addresses']['private']})
+
+ def delete_servers_1234_ips_public_1_2_3_4(self, **kw):
+ return (202, None)
+
+ #
+ # Server actions
+ #
+
+ def post_servers_1234_action(self, body, **kw):
+ _body = None
+ resp = 202
+ assert len(body.keys()) == 1
+ action = body.keys()[0]
+ if action == 'reboot':
+ assert body[action].keys() == ['type']
+ assert body[action]['type'] in ['HARD', 'SOFT']
+ elif action == 'rebuild':
+ keys = body[action].keys()
+ if 'adminPass' in keys:
+ keys.remove('adminPass')
+ assert keys == ['imageRef']
+ _body = self.get_servers_1234()[1]
+ elif action == 'resize':
+ assert body[action].keys() == ['flavorRef']
+ elif action == 'confirmResize':
+ assert body[action] is None
+ # This one method returns a different response code
+ return (204, None)
+ elif action == 'revertResize':
+ assert body[action] is None
+ elif action == 'migrate':
+ assert body[action] is None
+ elif action == 'rescue':
+ assert body[action] is None
+ elif action == 'unrescue':
+ assert body[action] is None
+ elif action == 'lock':
+ assert body[action] is None
+ elif action == 'unlock':
+ assert body[action] is None
+ elif action == 'addFixedIp':
+ assert body[action].keys() == ['networkId']
+ elif action == 'removeFixedIp':
+ assert body[action].keys() == ['address']
+ elif action == 'addFloatingIp':
+ assert body[action].keys() == ['address']
+ elif action == 'removeFloatingIp':
+ assert body[action].keys() == ['address']
+ elif action == 'createImage':
+ assert set(body[action].keys()) == set(['name', 'metadata'])
+ resp = dict(status=202, location="http://blah/images/456")
+ elif action == 'changePassword':
+ assert body[action].keys() == ['adminPass']
+ elif action == 'os-getConsoleOutput':
+ assert body[action].keys() == ['length']
+ return (202, {'output': 'foo'})
+ elif action == 'os-getVNCConsole':
+ assert body[action].keys() == ['type']
+ elif action == 'os-migrateLive':
+ assert set(body[action].keys()) == set(['host',
+ 'block_migration',
+ 'disk_over_commit'])
+ else:
+ raise AssertionError("Unexpected server action: %s" % action)
+ return (resp, _body)
+
+ #
+ # Cloudpipe
+ #
+
+ def get_os_cloudpipe(self, **kw):
+ return (200, {'cloudpipes': [
+ {'project_id':1}
+ ]})
+
+ def post_os_cloudpipe(self, **ks):
+ return (202, {'instance_id': '9d5824aa-20e6-4b9f-b967-76a699fc51fd'})
+
+ #
+ # Flavors
+ #
+
+ def get_flavors(self, **kw):
+ return (200, {'flavors': [
+ {'id': 1, 'name': '256 MB Server'},
+ {'id': 2, 'name': '512 MB Server'}
+ ]})
+
+ def get_flavors_detail(self, **kw):
+ return (200, {'flavors': [
+ {'id': 1, 'name': '256 MB Server', 'ram': 256, 'disk': 10,
+ 'OS-FLV-EXT-DATA:ephemeral': 10},
+ {'id': 2, 'name': '512 MB Server', 'ram': 512, 'disk': 20,
+ 'OS-FLV-EXT-DATA:ephemeral': 20}
+ ]})
+
+ def get_flavors_1(self, **kw):
+ return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][0]})
+
+ def get_flavors_2(self, **kw):
+ return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][1]})
+
+ def get_flavors_3(self, **kw):
+ # Diablo has no ephemeral
+ return (200, {'flavor': {'id': 3, 'name': '256 MB Server',
+ 'ram': 256, 'disk': 10}})
+
+ def delete_flavors_flavordelete(self, **kw):
+ return (202, None)
+
+ def post_flavors(self, body, **kw):
+ return (202, {'flavor': self.get_flavors_detail()[1]['flavors'][0]})
+
+ #
+ # Floating ips
+ #
+
+ def get_os_floating_ip_pools(self):
+ return (200, {'floating_ip_pools': [{'name': 'foo', 'name': 'bar'}]})
+
+ def get_os_floating_ips(self, **kw):
+ return (200, {'floating_ips': [
+ {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1'},
+ {'id': 2, 'fixed_ip': '10.0.0.2', 'ip': '11.0.0.2'},
+ ]})
+
+ def get_os_floating_ips_1(self, **kw):
+ return (200, {'floating_ip':
+ {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1'}
+ })
+
+ def post_os_floating_ips(self, body, **kw):
+ return (202, self.get_os_floating_ips_1()[1])
+
+ def post_os_floating_ips(self, body):
+ if body.get('pool'):
+ return (200, {'floating_ip':
+ {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1',
+ 'pool': 'cinder'}})
+ else:
+ return (200, {'floating_ip':
+ {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1',
+ 'pool': None}})
+
+ def delete_os_floating_ips_1(self, **kw):
+ return (204, None)
+
+ def get_os_floating_ip_dns(self, **kw):
+ return (205, {'domain_entries':
+ [{'domain': 'example.org'},
+ {'domain': 'example.com'}]})
+
+ def get_os_floating_ip_dns_testdomain_entries(self, **kw):
+ if kw.get('ip'):
+ return (205, {'dns_entries':
+ [{'dns_entry':
+ {'ip': kw.get('ip'),
+ 'name': "host1",
+ 'type': "A",
+ 'domain': 'testdomain'}},
+ {'dns_entry':
+ {'ip': kw.get('ip'),
+ 'name': "host2",
+ 'type': "A",
+ 'domain': 'testdomain'}}]})
+ else:
+ return (404, None)
+
+ def get_os_floating_ip_dns_testdomain_entries_testname(self, **kw):
+ return (205, {'dns_entry':
+ {'ip': "10.10.10.10",
+ 'name': 'testname',
+ 'type': "A",
+ 'domain': 'testdomain'}})
+
+ def put_os_floating_ip_dns_testdomain(self, body, **kw):
+ if body['domain_entry']['scope'] == 'private':
+ fakes.assert_has_keys(body['domain_entry'],
+ required=['availability_zone', 'scope'])
+ elif body['domain_entry']['scope'] == 'public':
+ fakes.assert_has_keys(body['domain_entry'],
+ required=['project', 'scope'])
+
+ else:
+ fakes.assert_has_keys(body['domain_entry'],
+ required=['project', 'scope'])
+ return (205, None)
+
+ def put_os_floating_ip_dns_testdomain_entries_testname(self, body, **kw):
+ fakes.assert_has_keys(body['dns_entry'],
+ required=['ip', 'dns_type'])
+ return (205, None)
+
+ def delete_os_floating_ip_dns_testdomain(self, **kw):
+ return (200, None)
+
+ def delete_os_floating_ip_dns_testdomain_entries_testname(self, **kw):
+ return (200, None)
+
+ #
+ # Images
+ #
+ def get_images(self, **kw):
+ return (200, {'images': [
+ {'id': 1, 'name': 'CentOS 5.2'},
+ {'id': 2, 'name': 'My Server Backup'}
+ ]})
+
+ def get_images_detail(self, **kw):
+ return (200, {'images': [
+ {
+ 'id': 1,
+ 'name': 'CentOS 5.2',
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "ACTIVE",
+ "metadata": {
+ "test_key": "test_value",
+ },
+ "links": {},
+ },
+ {
+ "id": 743,
+ "name": "My Server Backup",
+ "serverId": 1234,
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "SAVING",
+ "progress": 80,
+ "links": {},
+ }
+ ]})
+
+ def get_images_1(self, **kw):
+ return (200, {'image': self.get_images_detail()[1]['images'][0]})
+
+ def get_images_2(self, **kw):
+ return (200, {'image': self.get_images_detail()[1]['images'][1]})
+
+ def post_images(self, body, **kw):
+ assert body.keys() == ['image']
+ fakes.assert_has_keys(body['image'], required=['serverId', 'name'])
+ return (202, self.get_images_1()[1])
+
+ def post_images_1_metadata(self, body, **kw):
+ assert body.keys() == ['metadata']
+ fakes.assert_has_keys(body['metadata'],
+ required=['test_key'])
+ return (200,
+ {'metadata': self.get_images_1()[1]['image']['metadata']})
+
+ def delete_images_1(self, **kw):
+ return (204, None)
+
+ def delete_images_1_metadata_test_key(self, **kw):
+ return (204, None)
+
+ #
+ # Keypairs
+ #
+ def get_os_keypairs(self, *kw):
+ return (200, {"keypairs": [
+ {'fingerprint': 'FAKE_KEYPAIR', 'name': 'test'}
+ ]})
+
+ def delete_os_keypairs_test(self, **kw):
+ return (202, None)
+
+ def post_os_keypairs(self, body, **kw):
+ assert body.keys() == ['keypair']
+ fakes.assert_has_keys(body['keypair'],
+ required=['name'])
+ r = {'keypair': self.get_os_keypairs()[1]['keypairs'][0]}
+ return (202, r)
+
+ #
+ # Virtual Interfaces
+ #
+ def get_servers_1234_os_virtual_interfaces(self, **kw):
+ return (200, {"virtual_interfaces": [
+ {'id': 'fakeid', 'mac_address': 'fakemac'}
+ ]})
+
+ #
+ # Quotas
+ #
+
+ def get_os_quota_sets_test(self, **kw):
+ return (200, {'quota_set': {
+ 'tenant_id': 'test',
+ 'metadata_items': [],
+ 'injected_file_content_bytes': 1,
+ 'volumes': 1,
+ 'gigabytes': 1,
+ 'ram': 1,
+ 'floating_ips': 1,
+ 'instances': 1,
+ 'injected_files': 1,
+ 'cores': 1}})
+
+ def get_os_quota_sets_test_defaults(self):
+ return (200, {'quota_set': {
+ 'tenant_id': 'test',
+ 'metadata_items': [],
+ 'injected_file_content_bytes': 1,
+ 'volumes': 1,
+ 'gigabytes': 1,
+ 'ram': 1,
+ 'floating_ips': 1,
+ 'instances': 1,
+ 'injected_files': 1,
+ 'cores': 1}})
+
+ def put_os_quota_sets_test(self, body, **kw):
+ assert body.keys() == ['quota_set']
+ fakes.assert_has_keys(body['quota_set'],
+ required=['tenant_id'])
+ return (200, {'quota_set': {
+ 'tenant_id': 'test',
+ 'metadata_items': [],
+ 'injected_file_content_bytes': 1,
+ 'volumes': 2,
+ 'gigabytes': 1,
+ 'ram': 1,
+ 'floating_ips': 1,
+ 'instances': 1,
+ 'injected_files': 1,
+ 'cores': 1}})
+
+ #
+ # Quota Classes
+ #
+
+ def get_os_quota_class_sets_test(self, **kw):
+ return (200, {'quota_class_set': {
+ 'class_name': 'test',
+ 'metadata_items': [],
+ 'injected_file_content_bytes': 1,
+ 'volumes': 1,
+ 'gigabytes': 1,
+ 'ram': 1,
+ 'floating_ips': 1,
+ 'instances': 1,
+ 'injected_files': 1,
+ 'cores': 1}})
+
+ def put_os_quota_class_sets_test(self, body, **kw):
+ assert body.keys() == ['quota_class_set']
+ fakes.assert_has_keys(body['quota_class_set'],
+ required=['class_name'])
+ return (200, {'quota_class_set': {
+ 'class_name': 'test',
+ 'metadata_items': [],
+ 'injected_file_content_bytes': 1,
+ 'volumes': 2,
+ 'gigabytes': 1,
+ 'ram': 1,
+ 'floating_ips': 1,
+ 'instances': 1,
+ 'injected_files': 1,
+ 'cores': 1}})
+
+ #
+ # Security Groups
+ #
+ def get_os_security_groups(self, **kw):
+ return (200, {"security_groups": [
+ {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'}
+ ]})
+
+ def get_os_security_groups_1(self, **kw):
+ return (200, {"security_group":
+ {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'}
+ })
+
+ def delete_os_security_groups_1(self, **kw):
+ return (202, None)
+
+ def post_os_security_groups(self, body, **kw):
+ assert body.keys() == ['security_group']
+ fakes.assert_has_keys(body['security_group'],
+ required=['name', 'description'])
+ r = {'security_group':
+ self.get_os_security_groups()[1]['security_groups'][0]}
+ return (202, r)
+
+ #
+ # Security Group Rules
+ #
+ def get_os_security_group_rules(self, **kw):
+ return (200, {"security_group_rules": [
+ {'id': 1, 'parent_group_id': 1, 'group_id': 2,
+ 'ip_protocol': 'TCP', 'from_port': '22', 'to_port': 22,
+ 'cidr': '10.0.0.0/8'}
+ ]})
+
+ def delete_os_security_group_rules_1(self, **kw):
+ return (202, None)
+
+ def post_os_security_group_rules(self, body, **kw):
+ assert body.keys() == ['security_group_rule']
+ fakes.assert_has_keys(body['security_group_rule'],
+ required=['parent_group_id'],
+ optional=['group_id', 'ip_protocol', 'from_port',
+ 'to_port', 'cidr'])
+ r = {'security_group_rule':
+ self.get_os_security_group_rules()[1]['security_group_rules'][0]}
+ return (202, r)
+
+ #
+ # Tenant Usage
+ #
+ def get_os_simple_tenant_usage(self, **kw):
+ return (200, {u'tenant_usages': [{
+ u'total_memory_mb_usage': 25451.762807466665,
+ u'total_vcpus_usage': 49.71047423333333,
+ u'total_hours': 49.71047423333333,
+ u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869',
+ u'stop': u'2012-01-22 19:48:41.750722',
+ u'server_usages': [{
+ u'hours': 49.71047423333333,
+ u'uptime': 27035, u'local_gb': 0, u'ended_at': None,
+ u'name': u'f15image1',
+ u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869',
+ u'vcpus': 1, u'memory_mb': 512, u'state': u'active',
+ u'flavor': u'm1.tiny',
+ u'started_at': u'2012-01-20 18:06:06.479998'}],
+ u'start': u'2011-12-25 19:48:41.750687',
+ u'total_local_gb_usage': 0.0}]})
+
+ def get_os_simple_tenant_usage_tenantfoo(self, **kw):
+ return (200, {u'tenant_usage': {
+ u'total_memory_mb_usage': 25451.762807466665,
+ u'total_vcpus_usage': 49.71047423333333,
+ u'total_hours': 49.71047423333333,
+ u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869',
+ u'stop': u'2012-01-22 19:48:41.750722',
+ u'server_usages': [{
+ u'hours': 49.71047423333333,
+ u'uptime': 27035, u'local_gb': 0, u'ended_at': None,
+ u'name': u'f15image1',
+ u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869',
+ u'vcpus': 1, u'memory_mb': 512, u'state': u'active',
+ u'flavor': u'm1.tiny',
+ u'started_at': u'2012-01-20 18:06:06.479998'}],
+ u'start': u'2011-12-25 19:48:41.750687',
+ u'total_local_gb_usage': 0.0}})
+
+ #
+ # Certificates
+ #
+ def get_os_certificates_root(self, **kw):
+ return (200, {'certificate': {'private_key': None, 'data': 'foo'}})
+
+ def post_os_certificates(self, **kw):
+ return (200, {'certificate': {'private_key': 'foo', 'data': 'bar'}})
+
+ #
+ # Aggregates
+ #
+ def get_os_aggregates(self, *kw):
+ return (200, {"aggregates": [
+ {'id':'1',
+ 'name': 'test',
+ 'availability_zone': 'cinder1'},
+ {'id':'2',
+ 'name': 'test2',
+ 'availability_zone': 'cinder1'},
+ ]})
+
+ def _return_aggregate(self):
+ r = {'aggregate': self.get_os_aggregates()[1]['aggregates'][0]}
+ return (200, r)
+
+ def get_os_aggregates_1(self, **kw):
+ return self._return_aggregate()
+
+ def post_os_aggregates(self, body, **kw):
+ return self._return_aggregate()
+
+ def put_os_aggregates_1(self, body, **kw):
+ return self._return_aggregate()
+
+ def put_os_aggregates_2(self, body, **kw):
+ return self._return_aggregate()
+
+ def post_os_aggregates_1_action(self, body, **kw):
+ return self._return_aggregate()
+
+ def post_os_aggregates_2_action(self, body, **kw):
+ return self._return_aggregate()
+
+ def delete_os_aggregates_1(self, **kw):
+ return (202, None)
+
+ #
+ # Hosts
+ #
+ def get_os_hosts_host(self, *kw):
+ return (200, {'host':
+ [{'resource': {'project': '(total)', 'host': 'dummy',
+ 'cpu': 16, 'memory_mb': 32234, 'disk_gb': 128}},
+ {'resource': {'project': '(used_now)', 'host': 'dummy',
+ 'cpu': 1, 'memory_mb': 2075, 'disk_gb': 45}},
+ {'resource': {'project': '(used_max)', 'host': 'dummy',
+ 'cpu': 1, 'memory_mb': 2048, 'disk_gb': 30}},
+ {'resource': {'project': 'admin', 'host': 'dummy',
+ 'cpu': 1, 'memory_mb': 2048, 'disk_gb': 30}}]})
+
+ def get_os_hosts_sample_host(self, *kw):
+ return (200, {'host': [{'resource': {'host': 'sample_host'}}], })
+
+ def put_os_hosts_sample_host_1(self, body, **kw):
+ return (200, {'host': 'sample-host_1',
+ 'status': 'enabled'})
+
+ def put_os_hosts_sample_host_2(self, body, **kw):
+ return (200, {'host': 'sample-host_2',
+ 'maintenance_mode': 'on_maintenance'})
+
+ def put_os_hosts_sample_host_3(self, body, **kw):
+ return (200, {'host': 'sample-host_3',
+ 'status': 'enabled',
+ 'maintenance_mode': 'on_maintenance'})
+
+ def get_os_hosts_sample_host_startup(self, **kw):
+ return (200, {'host': 'sample_host',
+ 'power_action': 'startup'})
+
+ def get_os_hosts_sample_host_reboot(self, **kw):
+ return (200, {'host': 'sample_host',
+ 'power_action': 'reboot'})
+
+ def get_os_hosts_sample_host_shutdown(self, **kw):
+ return (200, {'host': 'sample_host',
+ 'power_action': 'shutdown'})
+
+ def put_os_hosts_sample_host(self, body, **kw):
+ result = {'host': 'dummy'}
+ result.update(body)
+ return (200, result)
diff --git a/tests/v1/test_auth.py b/tests/v1/test_auth.py
new file mode 100644
index 0000000..6aeb5fc
--- /dev/null
+++ b/tests/v1/test_auth.py
@@ -0,0 +1,297 @@
+import httplib2
+import json
+import mock
+
+from cinderclient.v1 import client
+from cinderclient import exceptions
+from tests import utils
+
+
+def to_http_response(resp_dict):
+ """Converts dict of response attributes to httplib response."""
+ resp = httplib2.Response(resp_dict)
+ for k, v in resp_dict['headers'].items():
+ resp[k] = v
+ return resp
+
+
+class AuthenticateAgainstKeystoneTests(utils.TestCase):
+ def test_authenticate_success(self):
+ cs = client.Client("username", "password", "project_id",
+ "auth_url/v2.0", service_type='compute')
+ resp = {
+ "access": {
+ "token": {
+ "expires": "12345",
+ "id": "FAKE_ID",
+ },
+ "serviceCatalog": [
+ {
+ "type": "compute",
+ "endpoints": [
+ {
+ "region": "RegionOne",
+ "adminURL": "http://localhost:8774/v1",
+ "internalURL": "http://localhost:8774/v1",
+ "publicURL": "http://localhost:8774/v1/",
+ },
+ ],
+ },
+ ],
+ },
+ }
+ auth_response = httplib2.Response({
+ "status": 200,
+ "body": json.dumps(resp),
+ })
+
+ mock_request = mock.Mock(return_value=(auth_response,
+ json.dumps(resp)))
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ cs.client.authenticate()
+ headers = {
+ 'User-Agent': cs.client.USER_AGENT,
+ 'Content-Type': 'application/json',
+ 'Accept': 'application/json',
+ }
+ body = {
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': cs.client.user,
+ 'password': cs.client.password,
+ },
+ 'tenantName': cs.client.projectid,
+ },
+ }
+
+ token_url = cs.client.auth_url + "/tokens"
+ mock_request.assert_called_with(token_url, "POST",
+ headers=headers,
+ body=json.dumps(body))
+
+ endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
+ public_url = endpoints[0]["publicURL"].rstrip('/')
+ self.assertEqual(cs.client.management_url, public_url)
+ token_id = resp["access"]["token"]["id"]
+ self.assertEqual(cs.client.auth_token, token_id)
+
+ test_auth_call()
+
+ def test_authenticate_failure(self):
+ cs = client.Client("username", "password", "project_id",
+ "auth_url/v2.0")
+ resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
+ auth_response = httplib2.Response({
+ "status": 401,
+ "body": json.dumps(resp),
+ })
+
+ mock_request = mock.Mock(return_value=(auth_response,
+ json.dumps(resp)))
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
+
+ test_auth_call()
+
+ def test_auth_redirect(self):
+ cs = client.Client("username", "password", "project_id",
+ "auth_url/v1", service_type='compute')
+ dict_correct_response = {
+ "access": {
+ "token": {
+ "expires": "12345",
+ "id": "FAKE_ID",
+ },
+ "serviceCatalog": [
+ {
+ "type": "compute",
+ "endpoints": [
+ {
+ "adminURL": "http://localhost:8774/v1",
+ "region": "RegionOne",
+ "internalURL": "http://localhost:8774/v1",
+ "publicURL": "http://localhost:8774/v1/",
+ },
+ ],
+ },
+ ],
+ },
+ }
+ correct_response = json.dumps(dict_correct_response)
+ dict_responses = [
+ {"headers": {'location':'http://127.0.0.1:5001'},
+ "status": 305,
+ "body": "Use proxy"},
+ # Configured on admin port, cinder redirects to v2.0 port.
+ # When trying to connect on it, keystone auth succeed by v1.0
+ # protocol (through headers) but tokens are being returned in
+ # body (looks like keystone bug). Leaved for compatibility.
+ {"headers": {},
+ "status": 200,
+ "body": correct_response},
+ {"headers": {},
+ "status": 200,
+ "body": correct_response}
+ ]
+
+ responses = [(to_http_response(resp), resp['body']) \
+ for resp in dict_responses]
+
+ def side_effect(*args, **kwargs):
+ return responses.pop(0)
+
+ mock_request = mock.Mock(side_effect=side_effect)
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ cs.client.authenticate()
+ headers = {
+ 'User-Agent': cs.client.USER_AGENT,
+ 'Content-Type': 'application/json',
+ 'Accept': 'application/json',
+ }
+ body = {
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': cs.client.user,
+ 'password': cs.client.password,
+ },
+ 'tenantName': cs.client.projectid,
+ },
+ }
+
+ token_url = cs.client.auth_url + "/tokens"
+ mock_request.assert_called_with(token_url, "POST",
+ headers=headers,
+ body=json.dumps(body))
+
+ resp = dict_correct_response
+ endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
+ public_url = endpoints[0]["publicURL"].rstrip('/')
+ self.assertEqual(cs.client.management_url, public_url)
+ token_id = resp["access"]["token"]["id"]
+ self.assertEqual(cs.client.auth_token, token_id)
+
+ test_auth_call()
+
+ def test_ambiguous_endpoints(self):
+ cs = client.Client("username", "password", "project_id",
+ "auth_url/v2.0", service_type='compute')
+ resp = {
+ "access": {
+ "token": {
+ "expires": "12345",
+ "id": "FAKE_ID",
+ },
+ "serviceCatalog": [
+ {
+ "adminURL": "http://localhost:8774/v1",
+ "type": "compute",
+ "name": "Compute CLoud",
+ "endpoints": [
+ {
+ "region": "RegionOne",
+ "internalURL": "http://localhost:8774/v1",
+ "publicURL": "http://localhost:8774/v1/",
+ },
+ ],
+ },
+ {
+ "adminURL": "http://localhost:8774/v1",
+ "type": "compute",
+ "name": "Hyper-compute Cloud",
+ "endpoints": [
+ {
+ "internalURL": "http://localhost:8774/v1",
+ "publicURL": "http://localhost:8774/v1/",
+ },
+ ],
+ },
+ ],
+ },
+ }
+ auth_response = httplib2.Response({
+ "status": 200,
+ "body": json.dumps(resp),
+ })
+
+ mock_request = mock.Mock(return_value=(auth_response,
+ json.dumps(resp)))
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ self.assertRaises(exceptions.AmbiguousEndpoints,
+ cs.client.authenticate)
+
+ test_auth_call()
+
+
+class AuthenticationTests(utils.TestCase):
+ def test_authenticate_success(self):
+ cs = client.Client("username", "password", "project_id", "auth_url")
+ management_url = 'https://servers.api.rackspacecloud.com/v1.1/443470'
+ auth_response = httplib2.Response({
+ 'status': 204,
+ 'x-server-management-url': management_url,
+ 'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
+ })
+ mock_request = mock.Mock(return_value=(auth_response, None))
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ cs.client.authenticate()
+ headers = {
+ 'Accept': 'application/json',
+ 'X-Auth-User': 'username',
+ 'X-Auth-Key': 'password',
+ 'X-Auth-Project-Id': 'project_id',
+ 'User-Agent': cs.client.USER_AGENT
+ }
+ mock_request.assert_called_with(cs.client.auth_url, 'GET',
+ headers=headers)
+ self.assertEqual(cs.client.management_url,
+ auth_response['x-server-management-url'])
+ self.assertEqual(cs.client.auth_token,
+ auth_response['x-auth-token'])
+
+ test_auth_call()
+
+ def test_authenticate_failure(self):
+ cs = client.Client("username", "password", "project_id", "auth_url")
+ auth_response = httplib2.Response({'status': 401})
+ mock_request = mock.Mock(return_value=(auth_response, None))
+
+ @mock.patch.object(httplib2.Http, "request", mock_request)
+ def test_auth_call():
+ self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
+
+ test_auth_call()
+
+ def test_auth_automatic(self):
+ cs = client.Client("username", "password", "project_id", "auth_url")
+ http_client = cs.client
+ http_client.management_url = ''
+ mock_request = mock.Mock(return_value=(None, None))
+
+ @mock.patch.object(http_client, 'request', mock_request)
+ @mock.patch.object(http_client, 'authenticate')
+ def test_auth_call(m):
+ http_client.get('/')
+ m.assert_called()
+ mock_request.assert_called()
+
+ test_auth_call()
+
+ def test_auth_manual(self):
+ cs = client.Client("username", "password", "project_id", "auth_url")
+
+ @mock.patch.object(cs.client, 'authenticate')
+ def test_auth_call(m):
+ cs.authenticate()
+ m.assert_called()
+
+ test_auth_call()
diff --git a/tests/v1/test_shell.py b/tests/v1/test_shell.py
new file mode 100644
index 0000000..7efad0e
--- /dev/null
+++ b/tests/v1/test_shell.py
@@ -0,0 +1,77 @@
+# Copyright 2010 Jacob Kaplan-Moss
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from cinderclient import client
+from cinderclient import shell
+from tests.v1 import fakes
+from tests import utils
+
+
+class ShellTest(utils.TestCase):
+
+ # Patch os.environ to avoid required auth info.
+ def setUp(self):
+ """Run before each test."""
+ self.old_environment = os.environ.copy()
+ os.environ = {
+ 'CINDER_USERNAME': 'username',
+ 'CINDER_PASSWORD': 'password',
+ 'CINDER_PROJECT_ID': 'project_id',
+ 'OS_COMPUTE_API_VERSION': '1.1',
+ 'CINDER_URL': 'http://no.where',
+ }
+
+ self.shell = shell.OpenStackCinderShell()
+
+ #HACK(bcwaldon): replace this when we start using stubs
+ self.old_get_client_class = client.get_client_class
+ client.get_client_class = lambda *_: fakes.FakeClient
+
+ def tearDown(self):
+ os.environ = self.old_environment
+ # For some method like test_image_meta_bad_action we are
+ # testing a SystemExit to be thrown and object self.shell has
+ # no time to get instantatiated which is OK in this case, so
+ # we make sure the method is there before launching it.
+ if hasattr(self.shell, 'cs'):
+ self.shell.cs.clear_callstack()
+
+ #HACK(bcwaldon): replace this when we start using stubs
+ client.get_client_class = self.old_get_client_class
+
+ def run_command(self, cmd):
+ self.shell.main(cmd.split())
+
+ def assert_called(self, method, url, body=None, **kwargs):
+ return self.shell.cs.assert_called(method, url, body, **kwargs)
+
+ def assert_called_anytime(self, method, url, body=None):
+ return self.shell.cs.assert_called_anytime(method, url, body)
+
+ def test_list(self):
+ self.run_command('list')
+ # NOTE(jdg): we default to detail currently
+ self.assert_called('GET', '/volumes/detail')
+
+ def test_show(self):
+ self.run_command('show 1234')
+ self.assert_called('GET', '/volumes/1234')
+
+ def test_delete(self):
+ self.run_command('delete 1234')
diff --git a/tests/v1/testfile.txt b/tests/v1/testfile.txt
new file mode 100644
index 0000000..e4e860f
--- /dev/null
+++ b/tests/v1/testfile.txt
@@ -0,0 +1 @@
+BLAH
diff --git a/tests/v1/utils.py b/tests/v1/utils.py
new file mode 100644
index 0000000..f878a5e
--- /dev/null
+++ b/tests/v1/utils.py
@@ -0,0 +1,29 @@
+from nose.tools import ok_
+
+
+def fail(msg):
+ raise AssertionError(msg)
+
+
+def assert_in(thing, seq, msg=None):
+ msg = msg or "'%s' not found in %s" % (thing, seq)
+ ok_(thing in seq, msg)
+
+
+def assert_not_in(thing, seq, msg=None):
+ msg = msg or "unexpected '%s' found in %s" % (thing, seq)
+ ok_(thing not in seq, msg)
+
+
+def assert_has_keys(dict, required=[], optional=[]):
+ keys = dict.keys()
+ for k in required:
+ assert_in(k, keys, "required key %s missing from %s" % (k, dict))
+ allowed_keys = set(required) | set(optional)
+ extra_keys = set(keys).difference(set(required + optional))
+ if extra_keys:
+ fail("found unexpected keys: %s" % list(extra_keys))
+
+
+def assert_isinstance(thing, kls):
+ ok_(isinstance(thing, kls), "%s is not an instance of %s" % (thing, kls))