diff options
author | Jenkins <jenkins@review.openstack.org> | 2012-05-21 16:32:35 -0400 |
---|---|---|
committer | Monty Taylor <mordred@inaugust.com> | 2012-05-21 16:32:35 -0400 |
commit | 471704df644eced17026c280b0aab9e549718e14 (patch) | |
tree | c2d8d0ec74fa45e0b61ca4b2153fb5b0e7bf490d /tests | |
download | python-cinderclient-0.0.tar.gz |
Initial split from python-novaclient.0.0
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/fakes.py | 71 | ||||
-rw-r--r-- | tests/test_base.py | 48 | ||||
-rw-r--r-- | tests/test_client.py | 18 | ||||
-rw-r--r-- | tests/test_http.py | 74 | ||||
-rw-r--r-- | tests/test_service_catalog.py | 127 | ||||
-rw-r--r-- | tests/test_shell.py | 75 | ||||
-rw-r--r-- | tests/test_utils.py | 74 | ||||
-rw-r--r-- | tests/utils.py | 5 | ||||
-rw-r--r-- | tests/v1/__init__.py | 0 | ||||
-rw-r--r-- | tests/v1/fakes.py | 765 | ||||
-rw-r--r-- | tests/v1/test_auth.py | 297 | ||||
-rw-r--r-- | tests/v1/test_shell.py | 77 | ||||
-rw-r--r-- | tests/v1/testfile.txt | 1 | ||||
-rw-r--r-- | tests/v1/utils.py | 29 |
15 files changed, 1661 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/fakes.py b/tests/fakes.py new file mode 100644 index 0000000..248214f --- /dev/null +++ b/tests/fakes.py @@ -0,0 +1,71 @@ +""" +A fake server that "responds" to API methods with pre-canned responses. + +All of these responses come from the spec, so if for some reason the spec's +wrong the tests might raise AssertionError. I've indicated in comments the +places where actual behavior differs from the spec. +""" + + +def assert_has_keys(dict, required=[], optional=[]): + keys = dict.keys() + for k in required: + try: + assert k in keys + except AssertionError: + extra_keys = set(keys).difference(set(required + optional)) + raise AssertionError("found unexpected keys: %s" % + list(extra_keys)) + + +class FakeClient(object): + + def assert_called(self, method, url, body=None, pos=-1): + """ + Assert than an API method was just called. + """ + expected = (method, url) + called = self.client.callstack[pos][0:2] + + assert self.client.callstack, \ + "Expected %s %s but no calls were made." % expected + + assert expected == called, 'Expected %s %s; got %s %s' % \ + (expected + called) + + if body is not None: + assert self.client.callstack[pos][2] == body + + def assert_called_anytime(self, method, url, body=None): + """ + Assert than an API method was called anytime in the test. + """ + expected = (method, url) + + assert self.client.callstack, \ + "Expected %s %s but no calls were made." % expected + + found = False + for entry in self.client.callstack: + if expected == entry[0:2]: + found = True + break + + assert found, 'Expected %s %s; got %s' % \ + (expected, self.client.callstack) + if body is not None: + try: + assert entry[2] == body + except AssertionError: + print entry[2] + print "!=" + print body + raise + + self.client.callstack = [] + + def clear_callstack(self): + self.client.callstack = [] + + def authenticate(self): + pass diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 0000000..7eba986 --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,48 @@ +from cinderclient import base +from cinderclient import exceptions +from cinderclient.v1 import volumes +from tests import utils +from tests.v1 import fakes + + +cs = fakes.FakeClient() + + +class BaseTest(utils.TestCase): + + def test_resource_repr(self): + r = base.Resource(None, dict(foo="bar", baz="spam")) + self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>") + + def test_getid(self): + self.assertEqual(base.getid(4), 4) + + class TmpObject(object): + id = 4 + self.assertEqual(base.getid(TmpObject), 4) + + def test_eq(self): + # Two resources of the same type with the same id: equal + r1 = base.Resource(None, {'id': 1, 'name': 'hi'}) + r2 = base.Resource(None, {'id': 1, 'name': 'hello'}) + self.assertEqual(r1, r2) + + # Two resoruces of different types: never equal + r1 = base.Resource(None, {'id': 1}) + r2 = volumes.Volume(None, {'id': 1}) + self.assertNotEqual(r1, r2) + + # Two resources with no ID: equal if their info is equal + r1 = base.Resource(None, {'name': 'joe', 'age': 12}) + r2 = base.Resource(None, {'name': 'joe', 'age': 12}) + self.assertEqual(r1, r2) + + def test_findall_invalid_attribute(self): + # Make sure findall with an invalid attribute doesn't cause errors. + # The following should not raise an exception. + cs.volumes.findall(vegetable='carrot') + + # However, find() should raise an error + self.assertRaises(exceptions.NotFound, + cs.volumes.find, + vegetable='carrot') diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..f5e4bab --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,18 @@ + +import cinderclient.client +import cinderclient.v1.client +from tests import utils + + +class ClientTest(utils.TestCase): + + def setUp(self): + pass + + def test_get_client_class_v1(self): + output = cinderclient.client.get_client_class('1') + self.assertEqual(output, cinderclient.v1.client.Client) + + def test_get_client_class_unknown(self): + self.assertRaises(cinderclient.exceptions.UnsupportedVersion, + cinderclient.client.get_client_class, '0') diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..13d744e --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,74 @@ +import httplib2 +import mock + +from cinderclient import client +from cinderclient import exceptions +from tests import utils + + +fake_response = httplib2.Response({"status": 200}) +fake_body = '{"hi": "there"}' +mock_request = mock.Mock(return_value=(fake_response, fake_body)) + + +def get_client(): + cl = client.HTTPClient("username", "password", + "project_id", "auth_test") + return cl + + +def get_authed_client(): + cl = get_client() + cl.management_url = "http://example.com" + cl.auth_token = "token" + return cl + + +class ClientTest(utils.TestCase): + + def test_get(self): + cl = get_authed_client() + + @mock.patch.object(httplib2.Http, "request", mock_request) + @mock.patch('time.time', mock.Mock(return_value=1234)) + def test_get_call(): + resp, body = cl.get("/hi") + headers = {"X-Auth-Token": "token", + "X-Auth-Project-Id": "project_id", + "User-Agent": cl.USER_AGENT, + 'Accept': 'application/json', + } + mock_request.assert_called_with("http://example.com/hi", + "GET", headers=headers) + # Automatic JSON parsing + self.assertEqual(body, {"hi": "there"}) + + test_get_call() + + def test_post(self): + cl = get_authed_client() + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_post_call(): + cl.post("/hi", body=[1, 2, 3]) + headers = { + "X-Auth-Token": "token", + "X-Auth-Project-Id": "project_id", + "Content-Type": "application/json", + 'Accept': 'application/json', + "User-Agent": cl.USER_AGENT + } + mock_request.assert_called_with("http://example.com/hi", "POST", + headers=headers, body='[1, 2, 3]') + + test_post_call() + + def test_auth_failure(self): + cl = get_client() + + # response must not have x-server-management-url header + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + self.assertRaises(exceptions.AuthorizationFailure, cl.authenticate) + + test_auth_call() diff --git a/tests/test_service_catalog.py b/tests/test_service_catalog.py new file mode 100644 index 0000000..bb93dcf --- /dev/null +++ b/tests/test_service_catalog.py @@ -0,0 +1,127 @@ +from cinderclient import exceptions +from cinderclient import service_catalog +from tests import utils + + +# Taken directly from keystone/content/common/samples/auth.json +# Do not edit this structure. Instead, grab the latest from there. + +SERVICE_CATALOG = { + "access": { + "token": { + "id": "ab48a9efdfedb23ty3494", + "expires": "2010-11-01T03:32:15-05:00", + "tenant": { + "id": "345", + "name": "My Project" + } + }, + "user": { + "id": "123", + "name": "jqsmith", + "roles": [ + { + "id": "234", + "name": "compute:admin", + }, + { + "id": "235", + "name": "object-store:admin", + "tenantId": "1", + } + ], + "roles_links": [], + }, + "serviceCatalog": [ + { + "name": "Cloud Servers", + "type": "compute", + "endpoints": [ + { + "tenantId": "1", + "publicURL": "https://compute1.host/v1/1234", + "internalURL": "https://compute1.host/v1/1234", + "region": "North", + "versionId": "1.0", + "versionInfo": "https://compute1.host/v1/", + "versionList": "https://compute1.host/" + }, + { + "tenantId": "2", + "publicURL": "https://compute1.host/v1/3456", + "internalURL": "https://compute1.host/v1/3456", + "region": "North", + "versionId": "1.1", + "versionInfo": "https://compute1.host/v1/", + "versionList": "https://compute1.host/" + }, + ], + "endpoints_links": [], + }, + { + "name": "Nova Volumes", + "type": "volume", + "endpoints": [ + { + "tenantId": "1", + "publicURL": "https://volume1.host/v1/1234", + "internalURL": "https://volume1.host/v1/1234", + "region": "South", + "versionId": "1.0", + "versionInfo": "uri", + "versionList": "uri" + }, + { + "tenantId": "2", + "publicURL": "https://volume1.host/v1/3456", + "internalURL": "https://volume1.host/v1/3456", + "region": "South", + "versionId": "1.1", + "versionInfo": "https://volume1.host/v1/", + "versionList": "https://volume1.host/" + }, + ], + "endpoints_links": [ + { + "rel": "next", + "href": "https://identity1.host/v2.0/endpoints" + }, + ], + }, + ], + "serviceCatalog_links": [ + { + "rel": "next", + "href": "https://identity.host/v2.0/endpoints?session=2hfh8Ar", + }, + ], + }, +} + + +class ServiceCatalogTest(utils.TestCase): + def test_building_a_service_catalog(self): + sc = service_catalog.ServiceCatalog(SERVICE_CATALOG) + + self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for, + service_type='compute') + self.assertEquals(sc.url_for('tenantId', '1', service_type='compute'), + "https://compute1.host/v1/1234") + self.assertEquals(sc.url_for('tenantId', '2', service_type='compute'), + "https://compute1.host/v1/3456") + + self.assertRaises(exceptions.EndpointNotFound, sc.url_for, + "region", "South", service_type='compute') + + def test_alternate_service_type(self): + sc = service_catalog.ServiceCatalog(SERVICE_CATALOG) + + self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for, + service_type='volume') + self.assertEquals(sc.url_for('tenantId', '1', service_type='volume'), + "https://volume1.host/v1/1234") + self.assertEquals(sc.url_for('tenantId', '2', service_type='volume'), + "https://volume1.host/v1/3456") + + self.assertRaises(exceptions.EndpointNotFound, sc.url_for, + "region", "North", service_type='volume') diff --git a/tests/test_shell.py b/tests/test_shell.py new file mode 100644 index 0000000..902aec5 --- /dev/null +++ b/tests/test_shell.py @@ -0,0 +1,75 @@ +import cStringIO +import os +import httplib2 +import sys + +from cinderclient import exceptions +import cinderclient.shell +from tests import utils + + +class ShellTest(utils.TestCase): + + # Patch os.environ to avoid required auth info. + def setUp(self): + global _old_env + fake_env = { + 'OS_USERNAME': 'username', + 'OS_PASSWORD': 'password', + 'OS_TENANT_NAME': 'tenant_name', + 'OS_AUTH_URL': 'http://no.where', + } + _old_env, os.environ = os.environ, fake_env.copy() + + def shell(self, argstr): + orig = sys.stdout + try: + sys.stdout = cStringIO.StringIO() + _shell = cinderclient.shell.OpenStackCinderShell() + _shell.main(argstr.split()) + except SystemExit: + exc_type, exc_value, exc_traceback = sys.exc_info() + self.assertEqual(exc_value.code, 0) + finally: + out = sys.stdout.getvalue() + sys.stdout.close() + sys.stdout = orig + + return out + + def tearDown(self): + global _old_env + os.environ = _old_env + + def test_help_unknown_command(self): + self.assertRaises(exceptions.CommandError, self.shell, 'help foofoo') + + def test_debug(self): + httplib2.debuglevel = 0 + self.shell('--debug help') + assert httplib2.debuglevel == 1 + + def test_help(self): + required = [ + '^usage: ', + '(?m)^\s+create\s+Add a new volume.', + '(?m)^See "cinder help COMMAND" for help on a specific command', + ] + for argstr in ['--help', 'help']: + help_text = self.shell(argstr) + for r in required: + self.assertRegexpMatches(help_text, r) + + def test_help_on_subcommand(self): + required = [ + '^usage: cinder list', + '(?m)^List all the volumes.', + ] + argstrings = [ + 'list --help', + 'help list', + ] + for argstr in argstrings: + help_text = self.shell(argstr) + for r in required: + self.assertRegexpMatches(help_text, r) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..39fb2c9 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,74 @@ + +from cinderclient import exceptions +from cinderclient import utils +from cinderclient import base +from tests import utils as test_utils + +UUID = '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0' + + +class FakeResource(object): + + def __init__(self, _id, properties): + self.id = _id + try: + self.name = properties['name'] + except KeyError: + pass + try: + self.display_name = properties['display_name'] + except KeyError: + pass + + +class FakeManager(base.ManagerWithFind): + + resource_class = FakeResource + + resources = [ + FakeResource('1234', {'name': 'entity_one'}), + FakeResource(UUID, {'name': 'entity_two'}), + FakeResource('4242', {'display_name': 'entity_three'}), + FakeResource('5678', {'name': '9876'}) + ] + + def get(self, resource_id): + for resource in self.resources: + if resource.id == str(resource_id): + return resource + raise exceptions.NotFound(resource_id) + + def list(self): + return self.resources + + +class FindResourceTestCase(test_utils.TestCase): + + def setUp(self): + self.manager = FakeManager(None) + + def test_find_none(self): + self.assertRaises(exceptions.CommandError, + utils.find_resource, + self.manager, + 'asdf') + + def test_find_by_integer_id(self): + output = utils.find_resource(self.manager, 1234) + self.assertEqual(output, self.manager.get('1234')) + + def test_find_by_str_id(self): + output = utils.find_resource(self.manager, '1234') + self.assertEqual(output, self.manager.get('1234')) + + def test_find_by_uuid(self): + output = utils.find_resource(self.manager, UUID) + self.assertEqual(output, self.manager.get(UUID)) + + def test_find_by_str_name(self): + output = utils.find_resource(self.manager, 'entity_one') + self.assertEqual(output, self.manager.get('1234')) + + def test_find_by_str_displayname(self): + output = utils.find_resource(self.manager, 'entity_three') + self.assertEqual(output, self.manager.get('4242')) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..7f1c5dc --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,5 @@ +import unittest2 + + +class TestCase(unittest2.TestCase): + pass diff --git a/tests/v1/__init__.py b/tests/v1/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/v1/__init__.py diff --git a/tests/v1/fakes.py b/tests/v1/fakes.py new file mode 100644 index 0000000..e430970 --- /dev/null +++ b/tests/v1/fakes.py @@ -0,0 +1,765 @@ +# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. +# Copyright 2011 OpenStack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import httplib2 +import urlparse + +from cinderclient import client as base_client +from cinderclient.v1 import client +from tests import fakes + + +class FakeClient(fakes.FakeClient, client.Client): + + def __init__(self, *args, **kwargs): + client.Client.__init__(self, 'username', 'password', + 'project_id', 'auth_url') + self.client = FakeHTTPClient(**kwargs) + + +class FakeHTTPClient(base_client.HTTPClient): + + def __init__(self, **kwargs): + self.username = 'username' + self.password = 'password' + self.auth_url = 'auth_url' + self.callstack = [] + + def _cs_request(self, url, method, **kwargs): + # Check that certain things are called correctly + if method in ['GET', 'DELETE']: + assert 'body' not in kwargs + elif method == 'PUT': + assert 'body' in kwargs + + # Call the method + args = urlparse.parse_qsl(urlparse.urlparse(url)[4]) + kwargs.update(args) + munged_url = url.rsplit('?', 1)[0] + munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_') + munged_url = munged_url.replace('-', '_') + + callback = "%s_%s" % (method.lower(), munged_url) + + if not hasattr(self, callback): + raise AssertionError('Called unknown API method: %s %s, ' + 'expected fakes method name: %s' % + (method, url, callback)) + + # Note the call + self.callstack.append((method, url, kwargs.get('body', None))) + + status, body = getattr(self, callback)(**kwargs) + if hasattr(status, 'items'): + return httplib2.Response(status), body + else: + return httplib2.Response({"status": status}), body + + # + # Limits + # + + def get_limits(self, **kw): + return (200, {"limits": { + "rate": [ + { + "uri": "*", + "regex": ".*", + "limit": [ + { + "value": 10, + "verb": "POST", + "remaining": 2, + "unit": "MINUTE", + "next-available": "2011-12-15T22:42:45Z" + }, + { + "value": 10, + "verb": "PUT", + "remaining": 2, + "unit": "MINUTE", + "next-available": "2011-12-15T22:42:45Z" + }, + { + "value": 100, + "verb": "DELETE", + "remaining": 100, + "unit": "MINUTE", + "next-available": "2011-12-15T22:42:45Z" + } + ] + }, + { + "uri": "*/servers", + "regex": "^/servers", + "limit": [ + { + "verb": "POST", + "value": 25, + "remaining": 24, + "unit": "DAY", + "next-available": "2011-12-15T22:42:45Z" + } + ] + } + ], + "absolute": { + "maxTotalRAMSize": 51200, + "maxServerMeta": 5, + "maxImageMeta": 5, + "maxPersonality": 5, + "maxPersonalitySize": 10240 + }, + }, + }) + + # + # Servers + # + + def get_volumes(self, **kw): + return (200, {"volumes": [ + {'id': 1234, 'name': 'sample-volume'}, + {'id': 5678, 'name': 'sample-volume2'} + ]}) + + # TODO(jdg): This will need to change + # at the very least it's not complete + def get_volumes_detail(self, **kw): + return (200, {"volumes": [ + {'id': 1234, + 'name': 'sample-volume', + 'attachments': [{'server_id': 1234}] + }, + ]}) + + def get_volumes_1234(self, **kw): + r = {'volume': self.get_volumes_detail()[1]['volumes'][0]} + return (200, r) + + def post_servers(self, body, **kw): + assert set(body.keys()) <= set(['server', 'os:scheduler_hints']) + fakes.assert_has_keys(body['server'], + required=['name', 'imageRef', 'flavorRef'], + optional=['metadata', 'personality']) + if 'personality' in body['server']: + for pfile in body['server']['personality']: + fakes.assert_has_keys(pfile, required=['path', 'contents']) + return (202, self.get_servers_1234()[1]) + + def get_servers_1234(self, **kw): + r = {'server': self.get_servers_detail()[1]['servers'][0]} + return (200, r) + + def get_servers_5678(self, **kw): + r = {'server': self.get_servers_detail()[1]['servers'][1]} + return (200, r) + + def put_servers_1234(self, body, **kw): + assert body.keys() == ['server'] + fakes.assert_has_keys(body['server'], optional=['name', 'adminPass']) + return (204, None) + + def delete_servers_1234(self, **kw): + return (202, None) + + def delete_volumes_1234(self, **kw): + return (202, None) + + def delete_servers_1234_metadata_test_key(self, **kw): + return (204, None) + + def delete_servers_1234_metadata_key1(self, **kw): + return (204, None) + + def delete_servers_1234_metadata_key2(self, **kw): + return (204, None) + + def post_servers_1234_metadata(self, **kw): + return (204, {'metadata': {'test_key': 'test_value'}}) + + def get_servers_1234_diagnostics(self, **kw): + return (200, {'data': 'Fake diagnostics'}) + + def get_servers_1234_actions(self, **kw): + return (200, {'actions': [ + { + 'action': 'rebuild', + 'error': None, + 'created_at': '2011-12-30 11:45:36' + }, + { + 'action': 'reboot', + 'error': 'Failed!', + 'created_at': '2011-12-30 11:40:29' + }, + ]}) + + # + # Server Addresses + # + + def get_servers_1234_ips(self, **kw): + return (200, {'addresses': + self.get_servers_1234()[1]['server']['addresses']}) + + def get_servers_1234_ips_public(self, **kw): + return (200, {'public': + self.get_servers_1234_ips()[1]['addresses']['public']}) + + def get_servers_1234_ips_private(self, **kw): + return (200, {'private': + self.get_servers_1234_ips()[1]['addresses']['private']}) + + def delete_servers_1234_ips_public_1_2_3_4(self, **kw): + return (202, None) + + # + # Server actions + # + + def post_servers_1234_action(self, body, **kw): + _body = None + resp = 202 + assert len(body.keys()) == 1 + action = body.keys()[0] + if action == 'reboot': + assert body[action].keys() == ['type'] + assert body[action]['type'] in ['HARD', 'SOFT'] + elif action == 'rebuild': + keys = body[action].keys() + if 'adminPass' in keys: + keys.remove('adminPass') + assert keys == ['imageRef'] + _body = self.get_servers_1234()[1] + elif action == 'resize': + assert body[action].keys() == ['flavorRef'] + elif action == 'confirmResize': + assert body[action] is None + # This one method returns a different response code + return (204, None) + elif action == 'revertResize': + assert body[action] is None + elif action == 'migrate': + assert body[action] is None + elif action == 'rescue': + assert body[action] is None + elif action == 'unrescue': + assert body[action] is None + elif action == 'lock': + assert body[action] is None + elif action == 'unlock': + assert body[action] is None + elif action == 'addFixedIp': + assert body[action].keys() == ['networkId'] + elif action == 'removeFixedIp': + assert body[action].keys() == ['address'] + elif action == 'addFloatingIp': + assert body[action].keys() == ['address'] + elif action == 'removeFloatingIp': + assert body[action].keys() == ['address'] + elif action == 'createImage': + assert set(body[action].keys()) == set(['name', 'metadata']) + resp = dict(status=202, location="http://blah/images/456") + elif action == 'changePassword': + assert body[action].keys() == ['adminPass'] + elif action == 'os-getConsoleOutput': + assert body[action].keys() == ['length'] + return (202, {'output': 'foo'}) + elif action == 'os-getVNCConsole': + assert body[action].keys() == ['type'] + elif action == 'os-migrateLive': + assert set(body[action].keys()) == set(['host', + 'block_migration', + 'disk_over_commit']) + else: + raise AssertionError("Unexpected server action: %s" % action) + return (resp, _body) + + # + # Cloudpipe + # + + def get_os_cloudpipe(self, **kw): + return (200, {'cloudpipes': [ + {'project_id':1} + ]}) + + def post_os_cloudpipe(self, **ks): + return (202, {'instance_id': '9d5824aa-20e6-4b9f-b967-76a699fc51fd'}) + + # + # Flavors + # + + def get_flavors(self, **kw): + return (200, {'flavors': [ + {'id': 1, 'name': '256 MB Server'}, + {'id': 2, 'name': '512 MB Server'} + ]}) + + def get_flavors_detail(self, **kw): + return (200, {'flavors': [ + {'id': 1, 'name': '256 MB Server', 'ram': 256, 'disk': 10, + 'OS-FLV-EXT-DATA:ephemeral': 10}, + {'id': 2, 'name': '512 MB Server', 'ram': 512, 'disk': 20, + 'OS-FLV-EXT-DATA:ephemeral': 20} + ]}) + + def get_flavors_1(self, **kw): + return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][0]}) + + def get_flavors_2(self, **kw): + return (200, {'flavor': self.get_flavors_detail()[1]['flavors'][1]}) + + def get_flavors_3(self, **kw): + # Diablo has no ephemeral + return (200, {'flavor': {'id': 3, 'name': '256 MB Server', + 'ram': 256, 'disk': 10}}) + + def delete_flavors_flavordelete(self, **kw): + return (202, None) + + def post_flavors(self, body, **kw): + return (202, {'flavor': self.get_flavors_detail()[1]['flavors'][0]}) + + # + # Floating ips + # + + def get_os_floating_ip_pools(self): + return (200, {'floating_ip_pools': [{'name': 'foo', 'name': 'bar'}]}) + + def get_os_floating_ips(self, **kw): + return (200, {'floating_ips': [ + {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1'}, + {'id': 2, 'fixed_ip': '10.0.0.2', 'ip': '11.0.0.2'}, + ]}) + + def get_os_floating_ips_1(self, **kw): + return (200, {'floating_ip': + {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1'} + }) + + def post_os_floating_ips(self, body, **kw): + return (202, self.get_os_floating_ips_1()[1]) + + def post_os_floating_ips(self, body): + if body.get('pool'): + return (200, {'floating_ip': + {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1', + 'pool': 'cinder'}}) + else: + return (200, {'floating_ip': + {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1', + 'pool': None}}) + + def delete_os_floating_ips_1(self, **kw): + return (204, None) + + def get_os_floating_ip_dns(self, **kw): + return (205, {'domain_entries': + [{'domain': 'example.org'}, + {'domain': 'example.com'}]}) + + def get_os_floating_ip_dns_testdomain_entries(self, **kw): + if kw.get('ip'): + return (205, {'dns_entries': + [{'dns_entry': + {'ip': kw.get('ip'), + 'name': "host1", + 'type': "A", + 'domain': 'testdomain'}}, + {'dns_entry': + {'ip': kw.get('ip'), + 'name': "host2", + 'type': "A", + 'domain': 'testdomain'}}]}) + else: + return (404, None) + + def get_os_floating_ip_dns_testdomain_entries_testname(self, **kw): + return (205, {'dns_entry': + {'ip': "10.10.10.10", + 'name': 'testname', + 'type': "A", + 'domain': 'testdomain'}}) + + def put_os_floating_ip_dns_testdomain(self, body, **kw): + if body['domain_entry']['scope'] == 'private': + fakes.assert_has_keys(body['domain_entry'], + required=['availability_zone', 'scope']) + elif body['domain_entry']['scope'] == 'public': + fakes.assert_has_keys(body['domain_entry'], + required=['project', 'scope']) + + else: + fakes.assert_has_keys(body['domain_entry'], + required=['project', 'scope']) + return (205, None) + + def put_os_floating_ip_dns_testdomain_entries_testname(self, body, **kw): + fakes.assert_has_keys(body['dns_entry'], + required=['ip', 'dns_type']) + return (205, None) + + def delete_os_floating_ip_dns_testdomain(self, **kw): + return (200, None) + + def delete_os_floating_ip_dns_testdomain_entries_testname(self, **kw): + return (200, None) + + # + # Images + # + def get_images(self, **kw): + return (200, {'images': [ + {'id': 1, 'name': 'CentOS 5.2'}, + {'id': 2, 'name': 'My Server Backup'} + ]}) + + def get_images_detail(self, **kw): + return (200, {'images': [ + { + 'id': 1, + 'name': 'CentOS 5.2', + "updated": "2010-10-10T12:00:00Z", + "created": "2010-08-10T12:00:00Z", + "status": "ACTIVE", + "metadata": { + "test_key": "test_value", + }, + "links": {}, + }, + { + "id": 743, + "name": "My Server Backup", + "serverId": 1234, + "updated": "2010-10-10T12:00:00Z", + "created": "2010-08-10T12:00:00Z", + "status": "SAVING", + "progress": 80, + "links": {}, + } + ]}) + + def get_images_1(self, **kw): + return (200, {'image': self.get_images_detail()[1]['images'][0]}) + + def get_images_2(self, **kw): + return (200, {'image': self.get_images_detail()[1]['images'][1]}) + + def post_images(self, body, **kw): + assert body.keys() == ['image'] + fakes.assert_has_keys(body['image'], required=['serverId', 'name']) + return (202, self.get_images_1()[1]) + + def post_images_1_metadata(self, body, **kw): + assert body.keys() == ['metadata'] + fakes.assert_has_keys(body['metadata'], + required=['test_key']) + return (200, + {'metadata': self.get_images_1()[1]['image']['metadata']}) + + def delete_images_1(self, **kw): + return (204, None) + + def delete_images_1_metadata_test_key(self, **kw): + return (204, None) + + # + # Keypairs + # + def get_os_keypairs(self, *kw): + return (200, {"keypairs": [ + {'fingerprint': 'FAKE_KEYPAIR', 'name': 'test'} + ]}) + + def delete_os_keypairs_test(self, **kw): + return (202, None) + + def post_os_keypairs(self, body, **kw): + assert body.keys() == ['keypair'] + fakes.assert_has_keys(body['keypair'], + required=['name']) + r = {'keypair': self.get_os_keypairs()[1]['keypairs'][0]} + return (202, r) + + # + # Virtual Interfaces + # + def get_servers_1234_os_virtual_interfaces(self, **kw): + return (200, {"virtual_interfaces": [ + {'id': 'fakeid', 'mac_address': 'fakemac'} + ]}) + + # + # Quotas + # + + def get_os_quota_sets_test(self, **kw): + return (200, {'quota_set': { + 'tenant_id': 'test', + 'metadata_items': [], + 'injected_file_content_bytes': 1, + 'volumes': 1, + 'gigabytes': 1, + 'ram': 1, + 'floating_ips': 1, + 'instances': 1, + 'injected_files': 1, + 'cores': 1}}) + + def get_os_quota_sets_test_defaults(self): + return (200, {'quota_set': { + 'tenant_id': 'test', + 'metadata_items': [], + 'injected_file_content_bytes': 1, + 'volumes': 1, + 'gigabytes': 1, + 'ram': 1, + 'floating_ips': 1, + 'instances': 1, + 'injected_files': 1, + 'cores': 1}}) + + def put_os_quota_sets_test(self, body, **kw): + assert body.keys() == ['quota_set'] + fakes.assert_has_keys(body['quota_set'], + required=['tenant_id']) + return (200, {'quota_set': { + 'tenant_id': 'test', + 'metadata_items': [], + 'injected_file_content_bytes': 1, + 'volumes': 2, + 'gigabytes': 1, + 'ram': 1, + 'floating_ips': 1, + 'instances': 1, + 'injected_files': 1, + 'cores': 1}}) + + # + # Quota Classes + # + + def get_os_quota_class_sets_test(self, **kw): + return (200, {'quota_class_set': { + 'class_name': 'test', + 'metadata_items': [], + 'injected_file_content_bytes': 1, + 'volumes': 1, + 'gigabytes': 1, + 'ram': 1, + 'floating_ips': 1, + 'instances': 1, + 'injected_files': 1, + 'cores': 1}}) + + def put_os_quota_class_sets_test(self, body, **kw): + assert body.keys() == ['quota_class_set'] + fakes.assert_has_keys(body['quota_class_set'], + required=['class_name']) + return (200, {'quota_class_set': { + 'class_name': 'test', + 'metadata_items': [], + 'injected_file_content_bytes': 1, + 'volumes': 2, + 'gigabytes': 1, + 'ram': 1, + 'floating_ips': 1, + 'instances': 1, + 'injected_files': 1, + 'cores': 1}}) + + # + # Security Groups + # + def get_os_security_groups(self, **kw): + return (200, {"security_groups": [ + {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'} + ]}) + + def get_os_security_groups_1(self, **kw): + return (200, {"security_group": + {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'} + }) + + def delete_os_security_groups_1(self, **kw): + return (202, None) + + def post_os_security_groups(self, body, **kw): + assert body.keys() == ['security_group'] + fakes.assert_has_keys(body['security_group'], + required=['name', 'description']) + r = {'security_group': + self.get_os_security_groups()[1]['security_groups'][0]} + return (202, r) + + # + # Security Group Rules + # + def get_os_security_group_rules(self, **kw): + return (200, {"security_group_rules": [ + {'id': 1, 'parent_group_id': 1, 'group_id': 2, + 'ip_protocol': 'TCP', 'from_port': '22', 'to_port': 22, + 'cidr': '10.0.0.0/8'} + ]}) + + def delete_os_security_group_rules_1(self, **kw): + return (202, None) + + def post_os_security_group_rules(self, body, **kw): + assert body.keys() == ['security_group_rule'] + fakes.assert_has_keys(body['security_group_rule'], + required=['parent_group_id'], + optional=['group_id', 'ip_protocol', 'from_port', + 'to_port', 'cidr']) + r = {'security_group_rule': + self.get_os_security_group_rules()[1]['security_group_rules'][0]} + return (202, r) + + # + # Tenant Usage + # + def get_os_simple_tenant_usage(self, **kw): + return (200, {u'tenant_usages': [{ + u'total_memory_mb_usage': 25451.762807466665, + u'total_vcpus_usage': 49.71047423333333, + u'total_hours': 49.71047423333333, + u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869', + u'stop': u'2012-01-22 19:48:41.750722', + u'server_usages': [{ + u'hours': 49.71047423333333, + u'uptime': 27035, u'local_gb': 0, u'ended_at': None, + u'name': u'f15image1', + u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869', + u'vcpus': 1, u'memory_mb': 512, u'state': u'active', + u'flavor': u'm1.tiny', + u'started_at': u'2012-01-20 18:06:06.479998'}], + u'start': u'2011-12-25 19:48:41.750687', + u'total_local_gb_usage': 0.0}]}) + + def get_os_simple_tenant_usage_tenantfoo(self, **kw): + return (200, {u'tenant_usage': { + u'total_memory_mb_usage': 25451.762807466665, + u'total_vcpus_usage': 49.71047423333333, + u'total_hours': 49.71047423333333, + u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869', + u'stop': u'2012-01-22 19:48:41.750722', + u'server_usages': [{ + u'hours': 49.71047423333333, + u'uptime': 27035, u'local_gb': 0, u'ended_at': None, + u'name': u'f15image1', + u'tenant_id': u'7b0a1d73f8fb41718f3343c207597869', + u'vcpus': 1, u'memory_mb': 512, u'state': u'active', + u'flavor': u'm1.tiny', + u'started_at': u'2012-01-20 18:06:06.479998'}], + u'start': u'2011-12-25 19:48:41.750687', + u'total_local_gb_usage': 0.0}}) + + # + # Certificates + # + def get_os_certificates_root(self, **kw): + return (200, {'certificate': {'private_key': None, 'data': 'foo'}}) + + def post_os_certificates(self, **kw): + return (200, {'certificate': {'private_key': 'foo', 'data': 'bar'}}) + + # + # Aggregates + # + def get_os_aggregates(self, *kw): + return (200, {"aggregates": [ + {'id':'1', + 'name': 'test', + 'availability_zone': 'cinder1'}, + {'id':'2', + 'name': 'test2', + 'availability_zone': 'cinder1'}, + ]}) + + def _return_aggregate(self): + r = {'aggregate': self.get_os_aggregates()[1]['aggregates'][0]} + return (200, r) + + def get_os_aggregates_1(self, **kw): + return self._return_aggregate() + + def post_os_aggregates(self, body, **kw): + return self._return_aggregate() + + def put_os_aggregates_1(self, body, **kw): + return self._return_aggregate() + + def put_os_aggregates_2(self, body, **kw): + return self._return_aggregate() + + def post_os_aggregates_1_action(self, body, **kw): + return self._return_aggregate() + + def post_os_aggregates_2_action(self, body, **kw): + return self._return_aggregate() + + def delete_os_aggregates_1(self, **kw): + return (202, None) + + # + # Hosts + # + def get_os_hosts_host(self, *kw): + return (200, {'host': + [{'resource': {'project': '(total)', 'host': 'dummy', + 'cpu': 16, 'memory_mb': 32234, 'disk_gb': 128}}, + {'resource': {'project': '(used_now)', 'host': 'dummy', + 'cpu': 1, 'memory_mb': 2075, 'disk_gb': 45}}, + {'resource': {'project': '(used_max)', 'host': 'dummy', + 'cpu': 1, 'memory_mb': 2048, 'disk_gb': 30}}, + {'resource': {'project': 'admin', 'host': 'dummy', + 'cpu': 1, 'memory_mb': 2048, 'disk_gb': 30}}]}) + + def get_os_hosts_sample_host(self, *kw): + return (200, {'host': [{'resource': {'host': 'sample_host'}}], }) + + def put_os_hosts_sample_host_1(self, body, **kw): + return (200, {'host': 'sample-host_1', + 'status': 'enabled'}) + + def put_os_hosts_sample_host_2(self, body, **kw): + return (200, {'host': 'sample-host_2', + 'maintenance_mode': 'on_maintenance'}) + + def put_os_hosts_sample_host_3(self, body, **kw): + return (200, {'host': 'sample-host_3', + 'status': 'enabled', + 'maintenance_mode': 'on_maintenance'}) + + def get_os_hosts_sample_host_startup(self, **kw): + return (200, {'host': 'sample_host', + 'power_action': 'startup'}) + + def get_os_hosts_sample_host_reboot(self, **kw): + return (200, {'host': 'sample_host', + 'power_action': 'reboot'}) + + def get_os_hosts_sample_host_shutdown(self, **kw): + return (200, {'host': 'sample_host', + 'power_action': 'shutdown'}) + + def put_os_hosts_sample_host(self, body, **kw): + result = {'host': 'dummy'} + result.update(body) + return (200, result) diff --git a/tests/v1/test_auth.py b/tests/v1/test_auth.py new file mode 100644 index 0000000..6aeb5fc --- /dev/null +++ b/tests/v1/test_auth.py @@ -0,0 +1,297 @@ +import httplib2 +import json +import mock + +from cinderclient.v1 import client +from cinderclient import exceptions +from tests import utils + + +def to_http_response(resp_dict): + """Converts dict of response attributes to httplib response.""" + resp = httplib2.Response(resp_dict) + for k, v in resp_dict['headers'].items(): + resp[k] = v + return resp + + +class AuthenticateAgainstKeystoneTests(utils.TestCase): + def test_authenticate_success(self): + cs = client.Client("username", "password", "project_id", + "auth_url/v2.0", service_type='compute') + resp = { + "access": { + "token": { + "expires": "12345", + "id": "FAKE_ID", + }, + "serviceCatalog": [ + { + "type": "compute", + "endpoints": [ + { + "region": "RegionOne", + "adminURL": "http://localhost:8774/v1", + "internalURL": "http://localhost:8774/v1", + "publicURL": "http://localhost:8774/v1/", + }, + ], + }, + ], + }, + } + auth_response = httplib2.Response({ + "status": 200, + "body": json.dumps(resp), + }) + + mock_request = mock.Mock(return_value=(auth_response, + json.dumps(resp))) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + cs.client.authenticate() + headers = { + 'User-Agent': cs.client.USER_AGENT, + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + body = { + 'auth': { + 'passwordCredentials': { + 'username': cs.client.user, + 'password': cs.client.password, + }, + 'tenantName': cs.client.projectid, + }, + } + + token_url = cs.client.auth_url + "/tokens" + mock_request.assert_called_with(token_url, "POST", + headers=headers, + body=json.dumps(body)) + + endpoints = resp["access"]["serviceCatalog"][0]['endpoints'] + public_url = endpoints[0]["publicURL"].rstrip('/') + self.assertEqual(cs.client.management_url, public_url) + token_id = resp["access"]["token"]["id"] + self.assertEqual(cs.client.auth_token, token_id) + + test_auth_call() + + def test_authenticate_failure(self): + cs = client.Client("username", "password", "project_id", + "auth_url/v2.0") + resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}} + auth_response = httplib2.Response({ + "status": 401, + "body": json.dumps(resp), + }) + + mock_request = mock.Mock(return_value=(auth_response, + json.dumps(resp))) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + self.assertRaises(exceptions.Unauthorized, cs.client.authenticate) + + test_auth_call() + + def test_auth_redirect(self): + cs = client.Client("username", "password", "project_id", + "auth_url/v1", service_type='compute') + dict_correct_response = { + "access": { + "token": { + "expires": "12345", + "id": "FAKE_ID", + }, + "serviceCatalog": [ + { + "type": "compute", + "endpoints": [ + { + "adminURL": "http://localhost:8774/v1", + "region": "RegionOne", + "internalURL": "http://localhost:8774/v1", + "publicURL": "http://localhost:8774/v1/", + }, + ], + }, + ], + }, + } + correct_response = json.dumps(dict_correct_response) + dict_responses = [ + {"headers": {'location':'http://127.0.0.1:5001'}, + "status": 305, + "body": "Use proxy"}, + # Configured on admin port, cinder redirects to v2.0 port. + # When trying to connect on it, keystone auth succeed by v1.0 + # protocol (through headers) but tokens are being returned in + # body (looks like keystone bug). Leaved for compatibility. + {"headers": {}, + "status": 200, + "body": correct_response}, + {"headers": {}, + "status": 200, + "body": correct_response} + ] + + responses = [(to_http_response(resp), resp['body']) \ + for resp in dict_responses] + + def side_effect(*args, **kwargs): + return responses.pop(0) + + mock_request = mock.Mock(side_effect=side_effect) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + cs.client.authenticate() + headers = { + 'User-Agent': cs.client.USER_AGENT, + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + body = { + 'auth': { + 'passwordCredentials': { + 'username': cs.client.user, + 'password': cs.client.password, + }, + 'tenantName': cs.client.projectid, + }, + } + + token_url = cs.client.auth_url + "/tokens" + mock_request.assert_called_with(token_url, "POST", + headers=headers, + body=json.dumps(body)) + + resp = dict_correct_response + endpoints = resp["access"]["serviceCatalog"][0]['endpoints'] + public_url = endpoints[0]["publicURL"].rstrip('/') + self.assertEqual(cs.client.management_url, public_url) + token_id = resp["access"]["token"]["id"] + self.assertEqual(cs.client.auth_token, token_id) + + test_auth_call() + + def test_ambiguous_endpoints(self): + cs = client.Client("username", "password", "project_id", + "auth_url/v2.0", service_type='compute') + resp = { + "access": { + "token": { + "expires": "12345", + "id": "FAKE_ID", + }, + "serviceCatalog": [ + { + "adminURL": "http://localhost:8774/v1", + "type": "compute", + "name": "Compute CLoud", + "endpoints": [ + { + "region": "RegionOne", + "internalURL": "http://localhost:8774/v1", + "publicURL": "http://localhost:8774/v1/", + }, + ], + }, + { + "adminURL": "http://localhost:8774/v1", + "type": "compute", + "name": "Hyper-compute Cloud", + "endpoints": [ + { + "internalURL": "http://localhost:8774/v1", + "publicURL": "http://localhost:8774/v1/", + }, + ], + }, + ], + }, + } + auth_response = httplib2.Response({ + "status": 200, + "body": json.dumps(resp), + }) + + mock_request = mock.Mock(return_value=(auth_response, + json.dumps(resp))) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + self.assertRaises(exceptions.AmbiguousEndpoints, + cs.client.authenticate) + + test_auth_call() + + +class AuthenticationTests(utils.TestCase): + def test_authenticate_success(self): + cs = client.Client("username", "password", "project_id", "auth_url") + management_url = 'https://servers.api.rackspacecloud.com/v1.1/443470' + auth_response = httplib2.Response({ + 'status': 204, + 'x-server-management-url': management_url, + 'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1', + }) + mock_request = mock.Mock(return_value=(auth_response, None)) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + cs.client.authenticate() + headers = { + 'Accept': 'application/json', + 'X-Auth-User': 'username', + 'X-Auth-Key': 'password', + 'X-Auth-Project-Id': 'project_id', + 'User-Agent': cs.client.USER_AGENT + } + mock_request.assert_called_with(cs.client.auth_url, 'GET', + headers=headers) + self.assertEqual(cs.client.management_url, + auth_response['x-server-management-url']) + self.assertEqual(cs.client.auth_token, + auth_response['x-auth-token']) + + test_auth_call() + + def test_authenticate_failure(self): + cs = client.Client("username", "password", "project_id", "auth_url") + auth_response = httplib2.Response({'status': 401}) + mock_request = mock.Mock(return_value=(auth_response, None)) + + @mock.patch.object(httplib2.Http, "request", mock_request) + def test_auth_call(): + self.assertRaises(exceptions.Unauthorized, cs.client.authenticate) + + test_auth_call() + + def test_auth_automatic(self): + cs = client.Client("username", "password", "project_id", "auth_url") + http_client = cs.client + http_client.management_url = '' + mock_request = mock.Mock(return_value=(None, None)) + + @mock.patch.object(http_client, 'request', mock_request) + @mock.patch.object(http_client, 'authenticate') + def test_auth_call(m): + http_client.get('/') + m.assert_called() + mock_request.assert_called() + + test_auth_call() + + def test_auth_manual(self): + cs = client.Client("username", "password", "project_id", "auth_url") + + @mock.patch.object(cs.client, 'authenticate') + def test_auth_call(m): + cs.authenticate() + m.assert_called() + + test_auth_call() diff --git a/tests/v1/test_shell.py b/tests/v1/test_shell.py new file mode 100644 index 0000000..7efad0e --- /dev/null +++ b/tests/v1/test_shell.py @@ -0,0 +1,77 @@ +# Copyright 2010 Jacob Kaplan-Moss + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from cinderclient import client +from cinderclient import shell +from tests.v1 import fakes +from tests import utils + + +class ShellTest(utils.TestCase): + + # Patch os.environ to avoid required auth info. + def setUp(self): + """Run before each test.""" + self.old_environment = os.environ.copy() + os.environ = { + 'CINDER_USERNAME': 'username', + 'CINDER_PASSWORD': 'password', + 'CINDER_PROJECT_ID': 'project_id', + 'OS_COMPUTE_API_VERSION': '1.1', + 'CINDER_URL': 'http://no.where', + } + + self.shell = shell.OpenStackCinderShell() + + #HACK(bcwaldon): replace this when we start using stubs + self.old_get_client_class = client.get_client_class + client.get_client_class = lambda *_: fakes.FakeClient + + def tearDown(self): + os.environ = self.old_environment + # For some method like test_image_meta_bad_action we are + # testing a SystemExit to be thrown and object self.shell has + # no time to get instantatiated which is OK in this case, so + # we make sure the method is there before launching it. + if hasattr(self.shell, 'cs'): + self.shell.cs.clear_callstack() + + #HACK(bcwaldon): replace this when we start using stubs + client.get_client_class = self.old_get_client_class + + def run_command(self, cmd): + self.shell.main(cmd.split()) + + def assert_called(self, method, url, body=None, **kwargs): + return self.shell.cs.assert_called(method, url, body, **kwargs) + + def assert_called_anytime(self, method, url, body=None): + return self.shell.cs.assert_called_anytime(method, url, body) + + def test_list(self): + self.run_command('list') + # NOTE(jdg): we default to detail currently + self.assert_called('GET', '/volumes/detail') + + def test_show(self): + self.run_command('show 1234') + self.assert_called('GET', '/volumes/1234') + + def test_delete(self): + self.run_command('delete 1234') diff --git a/tests/v1/testfile.txt b/tests/v1/testfile.txt new file mode 100644 index 0000000..e4e860f --- /dev/null +++ b/tests/v1/testfile.txt @@ -0,0 +1 @@ +BLAH diff --git a/tests/v1/utils.py b/tests/v1/utils.py new file mode 100644 index 0000000..f878a5e --- /dev/null +++ b/tests/v1/utils.py @@ -0,0 +1,29 @@ +from nose.tools import ok_ + + +def fail(msg): + raise AssertionError(msg) + + +def assert_in(thing, seq, msg=None): + msg = msg or "'%s' not found in %s" % (thing, seq) + ok_(thing in seq, msg) + + +def assert_not_in(thing, seq, msg=None): + msg = msg or "unexpected '%s' found in %s" % (thing, seq) + ok_(thing not in seq, msg) + + +def assert_has_keys(dict, required=[], optional=[]): + keys = dict.keys() + for k in required: + assert_in(k, keys, "required key %s missing from %s" % (k, dict)) + allowed_keys = set(required) | set(optional) + extra_keys = set(keys).difference(set(required + optional)) + if extra_keys: + fail("found unexpected keys: %s" % list(extra_keys)) + + +def assert_isinstance(thing, kls): + ok_(isinstance(thing, kls), "%s is not an instance of %s" % (thing, kls)) |