summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Simpson <tim.simpson@rackspace.com>2014-11-17 13:51:24 -0600
committerTim Simpson <tim.simpson@rackspace.com>2014-11-17 13:51:24 -0600
commit52fc67e51ebcc10404e43205c48080e594886898 (patch)
treedd2fca0f45e1081b24d2cb0901b321635385f1ab
parentfc80208a46323669dc7d7541c8c6100f1bb8f739 (diff)
downloadtrove-52fc67e51ebcc10404e43205c48080e594886898.tar.gz
Create example generator
This code adds a feature to the tests where all of the example snippets are generated and then validated. Tests fail if the new examples don't match the old ones, meaning a dev changing the API must update the snippets, triggering a conversation about the changes during the pull request. Implements: blueprint example-snippet-generator Change-Id: I5f1bfd47558a646a56e519614ae76a55759a4422
-rw-r--r--etc/tests/localhost.test.conf11
-rw-r--r--generate_examples.py10
-rw-r--r--tox.ini1
-rw-r--r--trove/tests/examples/__init__.py0
-rw-r--r--trove/tests/examples/client.py266
-rw-r--r--trove/tests/examples/snippets.py1310
6 files changed, 1598 insertions, 0 deletions
diff --git a/etc/tests/localhost.test.conf b/etc/tests/localhost.test.conf
index 72eb8b57..701fa954 100644
--- a/etc/tests/localhost.test.conf
+++ b/etc/tests/localhost.test.conf
@@ -117,5 +117,16 @@
}
],
+
+ "examples": {
+ "directory":"apidocs/src/samples",
+ "normal_user_name":"hub_cap",
+ "normal_user_tenant":"3000",
+ "admin_user_name":"admin",
+ "admin_user_tenant":"admin-1000",
+ "replace_host":"https://troveapi.org",
+ "replace_dns_hostname": "e09ad9a3f73309469cf1f43d11e79549caf9acf2.troveexampledb.com"
+ },
+
"sentinel": null
}
diff --git a/generate_examples.py b/generate_examples.py
new file mode 100644
index 00000000..ff9bcd08
--- /dev/null
+++ b/generate_examples.py
@@ -0,0 +1,10 @@
+import run_tests
+
+
+def import_tests():
+ from trove.tests.examples import snippets
+ snippets.monkey_patch_uuid_and_date()
+
+
+if __name__ == "__main__":
+ run_tests.main(import_tests)
diff --git a/tox.ini b/tox.ini
index d2e89698..a8278389 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,6 +11,7 @@ deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = {envpython} run_tests.py
python setup.py testr --slowest
+ {envpython} generate_examples.py
whitelist_externals = bash
[tox:jenkins]
diff --git a/trove/tests/examples/__init__.py b/trove/tests/examples/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/trove/tests/examples/__init__.py
diff --git a/trove/tests/examples/client.py b/trove/tests/examples/client.py
new file mode 100644
index 00000000..208f9728
--- /dev/null
+++ b/trove/tests/examples/client.py
@@ -0,0 +1,266 @@
+# Copyright 2014 Rackspace
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+import re
+import time
+from urlparse import urlparse
+
+from proboscis.asserts import fail
+from troveclient.compat.client import TroveHTTPClient
+from trove.tests.config import CONFIG
+
+print_req = True
+
+
+def shorten_url(url):
+ parsed = urlparse(url)
+ if parsed.query:
+ method_url = parsed.path + '?' + parsed.query
+ else:
+ method_url = parsed.path
+ return method_url
+
+
+class SnippetWriter(object):
+
+ def __init__(self, conf, get_replace_list):
+ self.conf = conf
+ self.get_replace_list = get_replace_list
+
+ def output_request(self, user_details, name, url, output_headers, body,
+ content_type, method, static_auth_token=True):
+ headers = []
+ parsed = urlparse(url)
+ method_url = shorten_url(url)
+ headers.append("%s %s HTTP/1.1" % (method, method_url))
+ headers.append("User-Agent: %s" % output_headers['User-Agent'])
+ headers.append("Host: %s" % parsed.netloc)
+ # static_auth_token option for documentation purposes
+ if static_auth_token:
+ output_token = '87c6033c-9ff6-405f-943e-2deb73f278b7'
+ else:
+ output_token = output_headers['X-Auth-Token']
+ headers.append("X-Auth-Token: %s" % output_token)
+ headers.append("Accept: %s" % output_headers['Accept'])
+ print("OUTPUT HEADERS: %s" % output_headers)
+ headers.append("Content-Type: %s" % output_headers['Content-Type'])
+ self.write_file(user_details, name, "-%s.txt" % content_type, url,
+ method, "request", output='\n'.join(headers))
+
+ pretty_body = self.format_body(body, content_type)
+ self.write_file(user_details, name, ".%s" % content_type, url,
+ method, "request", output=pretty_body)
+
+ def output_response(self, user_details, name, content_type, url, method,
+ resp, body):
+ version = "1.1" # if resp.version == 11 else "1.0"
+ lines = [
+ ["HTTP/%s %s %s" % (version, resp.status, resp.reason)],
+ ["Content-Type: %s" % resp['content-type']],
+ ]
+ if 'via' in resp:
+ lines.append(["Via: %s" % resp['via']])
+ lines.append(["Content-Length: %s" % resp['content-length']])
+ lines.append(["Date: Mon, 18 Mar 2013 19:09:17 GMT"])
+ if 'server' in resp:
+ lines.append(["Server: %s" % resp["server"]])
+
+ new_lines = [x[0] for x in lines]
+ joined_lines = '\n'.join(new_lines)
+
+ self.write_file(user_details, name, "-%s.txt" % content_type, url,
+ method, "response", output=joined_lines)
+
+ if body:
+ pretty_body = self.format_body(body, content_type)
+ self.write_file(user_details, name, ".%s" % content_type, url,
+ method, "response", output=pretty_body)
+
+ def format_body(self, body, content_type):
+ assert content_type == 'json'
+ try:
+ if self.conf['replace_dns_hostname']:
+ before = r'\"hostname\": \"[a-zA-Z0-9-_\.]*\"'
+ after = '\"hostname\": \"%s\"' % self.conf[
+ 'replace_dns_hostname']
+ body = re.sub(before, after, body)
+ return json.dumps(json.loads(body), sort_keys=True, indent=4)
+ except Exception:
+ return body or ''
+
+ def write_request_file(self, user_details, name, content_type, url, method,
+ req_headers, request_body):
+ if print_req:
+ print("\t%s req url:%s" % (content_type, url))
+ print("\t%s req method:%s" % (content_type, method))
+ print("\t%s req headers:%s" % (content_type, req_headers))
+ print("\t%s req body:%s" % (content_type, request_body))
+ self.output_request(user_details, name, url, req_headers, request_body,
+ content_type, method)
+
+ def write_response_file(self, user_details, name, content_type, url,
+ method, resp, resp_content):
+ if print_req:
+ print("\t%s resp:%s" % (content_type, resp))
+ print("\t%s resp content:%s" % (content_type, resp_content))
+ self.output_response(user_details, name, content_type, url, method,
+ resp, resp_content)
+
+ def write_file(self, user_details, name, content_type, url, method,
+ in_or_out, output):
+ output = output.replace(user_details['tenant'], '1234')
+ if self.conf['replace_host']:
+ output = output.replace(user_details['api_url'],
+ self.conf['replace_host'])
+ pre_host_port = urlparse(user_details['service_url']).netloc
+ post_host = urlparse(self.conf['replace_host']).netloc
+ output = output.replace(pre_host_port, post_host)
+ output = output.replace("fake_host", "hostname")
+ output = output.replace("FAKE_", "")
+ for resource in self.get_replace_list():
+ output = output.replace(str(resource[0]), str(resource[1]))
+ filename = "%s/db-%s-%s%s" % (self.conf['directory'],
+ name.replace('_', '-'), in_or_out,
+ content_type)
+ self._write_file(filename, output)
+
+ def _write_file(self, filename, output):
+ empty = len(output.strip()) == 0
+ # Manipulate actual data to appease doc niceness checks
+ actual = [line.rstrip() for line in output.split("\n")]
+ if not empty and actual[len(actual) - 1] != '':
+ actual.append("")
+
+ def goofy_diff(a, b):
+ diff = []
+ for i in range(len(a)):
+ if i < len(b):
+ if a[i].rstrip() != b[i].rstrip():
+ diff.append('Expected line %d :%s\n'
+ ' Actual line %d :%s'
+ % (i + 1, a[i], i + 1, b[i]))
+ else:
+ diff.append("Expected line %d :%s" % (i + 1, a[i]))
+ for j in range(len(b) - len(a)):
+ i2 = len(a) + j
+ diff.append(" Actual line %d :%s" % (i2 + 1, b[i2]))
+ return diff
+
+ def write_actual_file():
+ # Always write the file.
+ with open(filename, "w") as file:
+ for line in actual:
+ file.write("%s\n" % line)
+
+ def assert_output_matches():
+ # If this test is failing for you, comment out this next
+ if os.path.isfile(filename):
+ with open(filename, 'r') as original_file:
+ original = original_file.read()
+ if empty:
+ fail('Error: output missing in new snippet generation '
+ 'for %s. Old content follows:\n"""%s"""'
+ % (filename, original))
+ expected = original.split('\n')
+ # Remove the last item which will look like a duplicated
+ # file ending newline
+ expected.pop()
+ diff = '\n'.join(goofy_diff(expected, actual))
+ if diff:
+ fail('Error: output files differ for %s:\n%s'
+ % (filename, diff))
+ elif not empty:
+ fail('Error: new file necessary where there was no file '
+ 'before. Filename=%s\nContent follows:\n"""%s"""'
+ % (filename, output))
+
+ # If this test is failing for you, comment out this line, generate
+ # the files, and then commit the changed files as part of your review.
+ #assert_output_matches()
+
+ if not empty:
+ write_actual_file()
+
+
+# This method is mixed into the client class.
+# It requires the following fields: snippet_writer, content_type, and
+# "name," the last of which must be set before each call.
+def write_to_snippet(self, args, kwargs, resp, body):
+ if self.name is None:
+ raise RuntimeError("'name' not set before call.")
+ url = args[0]
+ method = args[1]
+ request_headers = kwargs['headers']
+ request_body = kwargs.get('body', None)
+ response_headers = resp
+ response_body = body
+
+ # Log request
+ user_details = {
+ 'api_url': self.service_url,
+ 'service_url': self.service_url,
+ 'tenant': self.tenant,
+ }
+ self.snippet_writer.write_request_file(user_details, self.name,
+ self.content_type, url, method,
+ request_headers, request_body)
+ self.snippet_writer.write_response_file(user_details, self.name,
+ self.content_type, url, method,
+ response_headers, response_body)
+
+ # Create a short url to assert against.
+ short_url = url
+ base_url = self.service_url
+ for prefix in (base_url):
+ if short_url.startswith(prefix):
+ short_url = short_url[len(prefix):]
+ self.old_info = {
+ 'url': shorten_url(short_url),
+ 'method': method,
+ 'request_headers': request_headers,
+ 'request_body': request_body,
+ 'response_headers': response_headers,
+ 'response_body': response_body
+ }
+
+
+def add_fake_response_headers(headers):
+ """
+ Fakes other items that would appear if you were using, just to make up
+ an example, a proxy.
+ """
+ conf = CONFIG.examples
+ if 'via' in conf and 'via' not in headers:
+ headers['via'] = conf['via']
+ if 'server' in conf and 'server' not in headers:
+ headers['server'] = conf['server']
+ if 'date' not in headers:
+ date_string = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
+ headers['date'] = date_string
+
+
+class JsonClient(TroveHTTPClient):
+
+ content_type = 'json'
+
+ def http_log(self, args, kwargs, resp, body):
+ add_fake_response_headers(resp)
+ self.pretty_log(args, kwargs, resp, body)
+
+ def write_snippet():
+ return write_to_snippet(self, args, kwargs, resp, body)
+
+ self.write_snippet = write_snippet
diff --git a/trove/tests/examples/snippets.py b/trove/tests/examples/snippets.py
new file mode 100644
index 00000000..d373a113
--- /dev/null
+++ b/trove/tests/examples/snippets.py
@@ -0,0 +1,1310 @@
+# Copyright 2014 Rackspace
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import time
+import logging
+
+from proboscis import before_class
+from proboscis import test
+from proboscis import SkipTest
+from proboscis import TestProgram
+from proboscis.asserts import assert_equal
+from proboscis.asserts import assert_true
+from proboscis.asserts import Check
+
+from troveclient.compat import TroveHTTPClient
+from troveclient.compat import client as trove_client
+from trove.tests.config import CONFIG
+
+from troveclient.compat import Dbaas
+
+from trove.tests.examples.client import SnippetWriter
+from trove.tests.examples.client import JsonClient
+
+trove_client._logger.setLevel(logging.CRITICAL)
+
+FAKE_INFO = {'m': 30, 's': 0, 'uuid': 'abcdef00-aaaa-aaaa-aaaa-bbbbbbbbbbbb'}
+EXAMPLE_BACKUP_ID = "a9832168-7541-4536-b8d9-a8a9b79cf1b4"
+EXAMPLE_CONFIG_ID = "43a6ea86-e959-4735-9e46-a6a5d4a2d80f"
+EXAMPLE_INSTANCE_ID = "44b277eb-39be-4921-be31-3d61b43651d7"
+EXAMPLE_INSTANCE_ID_2 = "d5a9db64-7ef7-41c5-8e1e-4013166874bc"
+EXAMPLE_CONFIG_SERVER_ID = "271898715"
+
+
+def get_now():
+ from datetime import datetime
+ return datetime(2014, 10, 30, hour=12, minute=FAKE_INFO['m'],
+ second=FAKE_INFO['s'])
+
+
+def get_uuid():
+ return FAKE_INFO['uuid']
+
+
+def set_fake_stuff(uuid=None, minute=None, unique_id=None):
+ if uuid:
+ FAKE_INFO['uuid'] = uuid
+ if minute:
+ FAKE_INFO['minute'] = minute
+ if unique_id:
+ from trove.common.template import SingleInstanceConfigTemplate
+
+ def fake_calc_id(self):
+ return unique_id
+
+ SingleInstanceConfigTemplate._calculate_unique_id = fake_calc_id
+
+
+def monkey_patch_uuid_and_date():
+ import uuid
+ uuid.uuid4 = get_uuid
+ from trove.common import utils
+ utils.utcnow = get_now
+ utils.generate_uuid = get_uuid
+
+
+@test
+def load_config_file():
+ global conf
+ if CONFIG.get("examples", None) is None:
+ fail("Missing 'examples' config in test config.")
+ conf = CONFIG.examples
+ global normal_user
+ normal_user = CONFIG.users.find_user_by_name(conf['normal_user_name'])
+ global admin_user
+ admin_user = CONFIG.users.find_user_by_name(conf['admin_user_name'])
+
+
+def create_client_args(user):
+
+ auth_strategy = None
+
+ kwargs = {
+ 'service_type': 'trove',
+ 'insecure': CONFIG.values['trove_client_insecure'],
+ }
+
+ def set_optional(kwargs_name, test_conf_name):
+ value = CONFIG.values.get(test_conf_name, None)
+ if value is not None:
+ kwargs[kwargs_name] = value
+
+ service_url = CONFIG.get('override_trove_api_url', None)
+ if user.requirements.is_admin:
+ service_url = CONFIG.get('override_admin_trove_api_url',
+ service_url)
+ if service_url:
+ kwargs['service_url'] = service_url
+
+ auth_strategy = None
+ if user.requirements.is_admin:
+ auth_strategy = CONFIG.get('admin_auth_strategy',
+ CONFIG.auth_strategy)
+ else:
+ auth_strategy = CONFIG.auth_strategy
+ set_optional('region_name', 'trove_client_region_name')
+ if CONFIG.values.get('override_trove_api_url_append_tenant',
+ False):
+ kwargs['service_url'] += "/" + user.tenant
+
+ if auth_strategy == 'fake':
+ from troveclient.compat import auth
+
+ class FakeAuth(auth.Authenticator):
+
+ def authenticate(self):
+ class FakeCatalog(object):
+ def __init__(self, auth):
+ self.auth = auth
+
+ def get_public_url(self):
+ return "%s/%s" % (CONFIG.dbaas_url,
+ self.auth.tenant)
+
+ def get_token(self):
+ return self.auth.tenant
+
+ return FakeCatalog(self)
+
+ auth_strategy = FakeAuth
+
+ if auth_strategy:
+ kwargs['auth_strategy'] = auth_strategy
+
+ if not user.requirements.is_admin:
+ auth_url = CONFIG.trove_auth_url
+ else:
+ auth_url = CONFIG.values.get('trove_admin_auth_url',
+ CONFIG.trove_auth_url)
+
+ if CONFIG.values.get('trove_client_cls'):
+ cls_name = CONFIG.trove_client_cls
+ kwargs['client_cls'] = import_class(cls_name)
+
+ kwargs['tenant'] = user.tenant
+ kwargs['auth_url'] = auth_url
+ return (user.auth_user, user.auth_key), kwargs
+
+
+def create_client(cls, user):
+ args, kwargs = create_client_args(user)
+ kwargs['client_cls'] = cls
+ client = Dbaas(*args, **kwargs)
+ return client
+
+
+def make_client(user):
+ #snippet_writer = SnippetWriter(conf)
+ args, kwargs = create_client_args(user)
+ kwargs['client_cls'] = JsonClient
+ client = Dbaas(*args, **kwargs)
+ client.client.name = "auth"
+ #client.client.snippet_writer = snippet_writer
+ client.authenticate()
+ return client
+
+
+def write_snippet(get_replace_list, client, name, url, method, status, reason,
+ func, *func_args):
+ """
+ 'name' is the name of the file, while 'url,' 'method,' 'status,'
+ and 'reason' are expected values that are asserted against.
+ If func_args is present, it is a list of lists, each one of which
+ is passed as the *args to the two invocations of "func".
+ """
+ func_args = func_args or []
+ snippet_writer = SnippetWriter(conf, get_replace_list)
+ results = []
+ client.client.snippet_writer = snippet_writer
+ client.client.name = name
+ args = func_args
+ result = func(client, *args)
+
+ # Now write the snippet (if this happens earlier we can't replace
+ # data such as the instance ID).
+ client.client.write_snippet()
+ with Check() as check:
+ check.equal(client.client.old_info['url'], url)
+ check.equal(client.client.old_info['method'], method)
+ check.equal(client.client.old_info['response_headers'].status,
+ status)
+ check.equal(client.client.old_info['response_headers'].reason,
+ reason)
+ results.append(result)
+ # To prevent this from writing a snippet somewhere else...
+ client.client.name = "junk"
+
+ return results
+
+
+JSON_INDEX = 0
+
+
+class Example(object):
+
+ @classmethod
+ def get_replace_list(cls):
+ return []
+
+ def snippet(self, *args, **kwargs):
+ return write_snippet(self.get_replace_list, self.client,
+ *args, **kwargs)
+
+
+@test(depends_on=[load_config_file], enabled=False)
+class Versions(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def get_versions(self):
+ self.snippet(
+ "versions",
+ "", "GET", 200, "OK",
+ lambda client: client.versions.index(conf['version_url']))
+
+ @test
+ def get_version(self):
+ def version_call(client):
+ return client.versions.index(conf['version_url'] + "/v1.0/")
+
+ self.snippet("versions", "/v1.0", "GET", 200, "OK", get_version)
+
+
+@test(depends_on=[load_config_file])
+class Flavors(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def get_flavors(self):
+ self.snippet(
+ "flavors",
+ "/flavors", "GET", 200, "OK",
+ lambda client: client.flavors.list())
+
+ @test
+ def get_flavor_by_id(self):
+ self.snippet(
+ "flavors_by_id",
+ "/flavors/1", "GET", 200, "OK",
+ lambda client: client.flavors.get(1))
+
+
+@test(depends_on=[load_config_file])
+def clean_slate():
+ client = create_client(TroveHTTPClient, admin_user)
+ client.client.name = "list"
+ instances = client.instances.list()
+ assert_equal(0, len(instances), "Instance count must be zero.")
+
+
+@test(depends_on=[clean_slate])
+class CreateInstance(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def post_create_instance(self):
+ set_fake_stuff(uuid=EXAMPLE_INSTANCE_ID)
+
+ def create_instance(client, name):
+ instance = client.instances.create(
+ name, 1,
+ volume={'size': 2},
+ databases=[
+ {
+ "name": "sampledb",
+ "character_set": "utf8",
+ "collate": "utf8_general_ci"
+ },
+ {
+ "name": "nextround"
+ }
+ ],
+ users=[
+ {
+ "databases": [{"name": "sampledb"}],
+ "name": "demouser",
+ "password": "demopassword"
+ }
+ ])
+ assert_equal(instance.status, "BUILD")
+ return instance
+ self.instances = self.snippet(
+ "create_instance",
+ "/instances", "POST", 200, "OK",
+ create_instance,
+ "json_rack_instance")
+
+ def an_instance_is_not_active(self):
+ for instance in self.instances:
+ instance = self.client.instances.get(instance.id)
+ if instance.status != "ACTIVE":
+ assert_equal(instance.status, "BUILD")
+ return True
+ return False
+
+ @test(depends_on=[post_create_instance])
+ def wait_for_instances(self):
+ while self.an_instance_is_not_active():
+ time.sleep(1)
+ global json_instance
+ json_instance = self.instances[0]
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Databases(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def post_create_databases(self):
+ self.snippet(
+ "create_databases",
+ "/instances/%s/databases" % json_instance.id,
+ "POST", 202, "Accepted",
+ lambda client: client.databases.create(
+ json_instance.id,
+ databases=
+ [
+ {
+ "name": "testingdb",
+ "character_set": "utf8",
+ "collate": "utf8_general_ci"
+ }, {
+ "name": "anotherdb"
+ }, {
+ "name": "oneMoreDB"
+ }
+ ]))
+
+ @test(depends_on=[post_create_databases])
+ def get_list_databases(self):
+ self.snippet(
+ "list_databases",
+ "/instances/%s/databases" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.databases.list(json_instance.id))
+
+ @test(depends_on=[post_create_databases])
+ def get_list_databases_limit_two(self):
+ results = self.snippet(
+ "list_databases_pagination",
+ "/instances/%s/databases?limit=1" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.databases.list(json_instance.id, limit=1))
+ assert_equal(1, len(results[JSON_INDEX]))
+ assert_equal("anotherdb", results[JSON_INDEX].next)
+
+ @test(depends_on=[post_create_databases],
+ runs_after=[get_list_databases, get_list_databases_limit_two])
+ def delete_databases(self):
+ self.snippet(
+ "delete_databases",
+ "/instances/%s/databases/testingdb" % json_instance.id,
+ "DELETE", 202, "Accepted",
+ lambda client:
+ client.databases.delete(json_instance.id, 'testingdb'))
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Users(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def post_create_users(self):
+ self.snippet(
+ "create_users",
+ "/instances/%s/users" % json_instance.id,
+ "POST", 202, "Accepted",
+ lambda client: client.users.create(
+ json_instance.id,
+ [{
+ "name": "dbuser1",
+ "password": "password",
+ "databases": [
+ {
+ "name": "databaseA"
+ }
+ ]
+ }, {
+ "name": "dbuser2",
+ "password": "password",
+ "databases": [
+ {
+ "name": "databaseB"
+ },
+ {
+ "name": "databaseC"
+ }
+ ]
+ }, {
+ "name": "dbuser3",
+ "password": "password",
+ "databases": [
+ {
+ "name": "databaseD"
+ }
+ ]
+ }]))
+
+ @test(depends_on=[post_create_users])
+ def get_list_users(self):
+ self.snippet(
+ "list_users",
+ "/instances/%s/users" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.users.list(json_instance.id))
+
+ @test(depends_on=[post_create_users])
+ def get_list_users_limit_two(self):
+ self.snippet(
+ "list_users_pagination",
+ "/instances/%s/users?limit=2" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.users.list(json_instance.id, limit=2))
+
+ @test(depends_on=[post_create_users],
+ runs_after=[get_list_users, get_list_users_limit_two])
+ def delete_users(self):
+ user_name = "demouser"
+ self.snippet(
+ "delete_users",
+ "/instances/%s/users/%s" % (json_instance.id, user_name),
+ "DELETE", 202, "Accepted",
+ lambda client: client.users.delete(json_instance.id,
+ username=user_name))
+
+ @test(depends_on=[post_create_users])
+ def modify_user_attributes(self):
+ old_user_name = "dbuser1"
+ self.snippet(
+ "change_user_attributes",
+ "/instances/%s/users/%s" % (json_instance.id, old_user_name),
+ "PUT", 202, "Accepted",
+ lambda client: client.users.update_attributes(
+ json_instance.id,
+ username=old_user_name,
+ newuserattr={
+ "name": "new_username",
+ "password": "new_password"
+ }
+ )
+ )
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Root(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def post_enable_root_access(self):
+ self.snippet(
+ "enable_root_user",
+ "/instances/%s/root" % json_instance.id,
+ "POST", 200, "OK",
+ lambda client: client.root.create(json_instance.id))
+
+ @test(depends_on=[post_enable_root_access])
+ def get_check_root_access(self):
+ results = self.snippet(
+ "check_root_user",
+ "/instances/%s/root" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.root.is_root_enabled(json_instance.id))
+ assert_equal(results[JSON_INDEX].rootEnabled, True)
+
+
+class ActiveMixin(Example):
+ """Adds a method to wait for instance status to become ACTIVE."""
+
+ def _wait_for_active(self, *acceptable_states):
+ global json_instance
+ json_instance = self.client.instances.get(json_instance.id)
+ print('instance.status=%s' % json_instance.status)
+ while json_instance.status != "ACTIVE":
+ assert_true(
+ json_instance.status in acceptable_states,
+ "Instance status == %s; expected it to be one of: %s"
+ % (json_instance.status, acceptable_states))
+ time.sleep(0.1)
+ json_instance = self.client.instances.get(json_instance.id)
+
+ def _wait_for_restore_active(self, *acceptable_states):
+ for instance in (self.json_restore, ):
+ instance = self.client.instances.get(instance.id)
+ print('instance.status=%s' % instance.status)
+ while instance.status != "ACTIVE":
+ assert_true(
+ instance.status in acceptable_states,
+ "Instance status == %s; expected it to be one of: %s"
+ % (instance.status, acceptable_states))
+ time.sleep(0.1)
+ instance = self.client.instances.get(instance.id)
+
+
+STATE = {
+ "CONFIGURATION": None,
+ "DATASTORE_ID": None,
+ "DATASTORE_VERSION_ID": None,
+}
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Datastores(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def get_datastores_list(self):
+ self.datastores = self.snippet(
+ "datastores_list",
+ "/datastores",
+ "GET", 200, "OK",
+ lambda client: client.datastores.list())
+ for result in self.datastores:
+ assert_equal(1, len(result))
+
+ @test(depends_on=[get_datastores_list])
+ def get_datastore_by_id(self):
+ ds, = self.datastores
+ mysql_ds = [x for x in ds if x.name == 'mysql']
+ if not mysql_ds:
+ fail('no mysql datastore found in list')
+ ds_id = STATE["DATASTORE_ID"] = mysql_ds[JSON_INDEX].id
+ self.datastore = self.snippet(
+ "datastore_by_id",
+ "/datastores/%s" % ds_id,
+ "GET", 200, "OK",
+ lambda client: client.datastores.get(ds_id))
+
+ @test(depends_on=[get_datastore_by_id])
+ def get_datastore_versions_list(self):
+ ds_id = STATE["DATASTORE_ID"]
+ self.datastore_versions = self.snippet(
+ "datastore_versions_list",
+ "/datastores/%s/versions" % ds_id,
+ "GET", 200, "OK",
+ lambda client: client.datastore_versions.list(ds_id))
+
+ @test(depends_on=[get_datastore_versions_list])
+ def get_datastore_version_by_id(self):
+ ds_id = STATE["DATASTORE_ID"]
+ ds_v_id = STATE["DATASTORE_VERSION_ID"] = (
+ self.datastore_versions[JSON_INDEX][0].id
+ )
+ self.datastore_version = self.snippet(
+ "datastore_version_by_id",
+ "/datastores/%s/versions/%s" % (ds_id, ds_v_id),
+ "GET", 200, "OK",
+ lambda client: client.datastore_versions.get(ds_id, ds_v_id))
+
+
+@test(depends_on=[Datastores], groups=['uses_instances'])
+class Configurations(ActiveMixin):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def get_configuration_parameters_for_datastore_version(self):
+ ds_id = STATE["DATASTORE_ID"]
+ ds_v_id = STATE["DATASTORE_VERSION_ID"]
+ self.snippet(
+ "configuration_parameters_for_datastore_version",
+ "/datastores/%s/versions/%s/parameters" % (ds_id, ds_v_id),
+ "GET", 200, "OK",
+ lambda client: client.configuration_parameters.parameters(
+ ds_id, ds_v_id
+ )
+ )
+
+ @test
+ def get_configuration_parameters_without_datastore_version(self):
+ ds_v_id = STATE["DATASTORE_VERSION_ID"]
+ self.params = self.snippet(
+ "configuration_parameters_without_datastore_version",
+ "/datastores/versions/%s/parameters" % (ds_v_id),
+ "GET", 200, "OK",
+ lambda client: (
+ client.configuration_parameters.parameters_by_version(ds_v_id)
+ )
+ )
+ assert_true(self.params)
+
+ @test(depends_on=[get_configuration_parameters_without_datastore_version])
+ def get_configuration_parameter_for_datastore_version(self):
+ ds_id = STATE["DATASTORE_ID"]
+ ds_v_id = STATE["DATASTORE_VERSION_ID"]
+ param = self.params[JSON_INDEX][0].name
+ self.snippet(
+ "configuration_parameter_for_datastore_version",
+ "/datastores/%s/versions/%s/parameters/%s"
+ % (ds_id, ds_v_id, param),
+ "GET", 200, "OK",
+ lambda client: client.configuration_parameters.get_parameter(
+ ds_id, ds_v_id, param))
+
+ @test(depends_on=[get_configuration_parameters_without_datastore_version])
+ def get_configuration_parameter_without_datastore_version(self):
+ ds_v_id = STATE["DATASTORE_VERSION_ID"]
+ param = self.params[JSON_INDEX][0].name
+
+ def get_param(client):
+ return client.configuration_parameters.get_parameter_by_version(
+ ds_v_id,
+ param
+ )
+
+ self.params = self.snippet(
+ "configuration_parameter_without_datastore_version",
+ "/datastores/versions/%s/parameters/%s" % (ds_v_id, param),
+ "GET", 200, "OK",
+ get_param
+ )
+
+ @test(depends_on=[get_configuration_parameter_without_datastore_version])
+ def create_configuration(self):
+ set_fake_stuff(uuid=EXAMPLE_CONFIG_ID)
+ ds_id = STATE["DATASTORE_ID"]
+ ds_v_id = STATE["DATASTORE_VERSION_ID"]
+ values = {
+ "connect_timeout": 120,
+ "collation_server": "latin1_swedish_ci"
+ }
+
+ def create(client):
+ config = client.configurations.create(
+ 'example-configuration-name', json.dumps(values),
+ 'example description', ds_id, ds_v_id)
+ return config
+
+ self.configurations = self.snippet(
+ "configuration_create",
+ "/configurations",
+ "POST", 200, "OK",
+ create)
+ STATE["CONFIGURATION"] = self.configurations[JSON_INDEX]
+
+ @test(depends_on=[create_configuration])
+ def get_configuration(self):
+ config = STATE["CONFIGURATION"]
+ self.config = self.snippet(
+ "configuration_details",
+ "/configurations/%s" % config.id,
+ "GET", 200, "OK",
+ lambda client: client.configurations.get(config.id))
+
+ @test(depends_on=[create_configuration])
+ def list_configurations(self):
+ self.configs = self.snippet(
+ "configuration_list",
+ "/configurations",
+ "GET", 200, "OK",
+ lambda client: client.configurations.list())
+
+ @test(depends_on=[list_configurations, get_configuration])
+ def edit_configuration(self):
+ config = STATE["CONFIGURATION"]
+ values = {
+ 'connect_timeout': 300
+ }
+ self.snippet(
+ "configuration_edit_parameters",
+ "/configurations/%s" % config.id,
+ "PATCH", 200, "OK",
+ lambda client: client.configurations.edit(
+ config.id, json.dumps(values)))
+
+ @test(depends_on=[edit_configuration])
+ def update_configuration(self):
+ config = STATE["CONFIGURATION"]
+ values = {
+ 'connect_timeout': 150,
+ 'collation_server': 'utf8_unicode_ci'
+ }
+ self.snippet(
+ "configuration_update_parameters",
+ "/configurations/%s" % config.id,
+ "PUT", 202, "Accepted",
+ lambda client: client.configurations.update(
+ config.id, json.dumps(values),
+ 'example-updated-name', 'example updated description'))
+
+ @test(depends_on=[update_configuration])
+ def attach_configuration_to_instance(self):
+ config = STATE["CONFIGURATION"]
+ self.snippet(
+ "configuration_attach_to_instance",
+ "/instances/%s" % json_instance.id,
+ "PUT", 202, "Accepted",
+ lambda client: client.instances.modify(
+ json_instance.id,
+ config.id
+ )
+ )
+
+ @test(depends_on=[attach_configuration_to_instance])
+ def list_configurations_instances(self):
+ config = STATE["CONFIGURATION"]
+ self.config_instances = self.snippet(
+ "configuration_list_instances",
+ "/configurations/%s/instances" % config.id,
+ "GET", 200, "OK",
+ lambda client: client.configurations.instances(config.id))
+
+ @test(depends_on=[list_configurations_instances])
+ def detach_configuration_from_instance(self):
+ self.snippet(
+ "configuration_detach_from_instance",
+ "/instances/%s" % json_instance.id,
+ "PUT", 202, "Accepted",
+ lambda client: client.instances.modify(
+ json_instance.id, ""))
+
+ @test(depends_on=[detach_configuration_from_instance])
+ def instance_restart_after_configration_change(self):
+ self.client.instances.restart(json_instance.id)
+ self._wait_for_active("REBOOT")
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class InstanceList(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def get_list_instance_index(self):
+ results = self.snippet(
+ "instances_index",
+ "/instances", "GET", 200, "OK",
+ lambda client: client.instances.list())
+ for result in results:
+ assert_equal(1, len(result))
+
+ @test
+ def get_instance_details(self):
+ results = self.snippet(
+ "instance_status_detail",
+ "/instances/%s" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.instances.get(json_instance.id))
+ assert_equal(results[JSON_INDEX].id, json_instance.id)
+
+ @test
+ def get_default_instance_configuration(self):
+ set_fake_stuff(unique_id=EXAMPLE_CONFIG_SERVER_ID)
+ self.snippet(
+ "get_default_instance_configuration",
+ "/instances/%s/configuration" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.instances.configuration(json_instance.id))
+
+ @test
+ def get_list_instance_index_limit_two(self):
+ third_instance = self.client.instances.create(
+ "The Third Instance", 1, volume={'size': 2})
+ third_instance = self.client.instances.get(third_instance.id)
+ while third_instance.status != "ACTIVE":
+ time.sleep(0.1)
+ third_instance = self.client.instances.get(third_instance.id)
+
+ results = self.snippet(
+ "instances_index_pagination",
+ "/instances?limit=2", "GET", 200, "OK",
+ lambda client: client.instances.list(limit=2))
+ for result in results:
+ assert_equal(2, len(result))
+
+ self.client.instances.delete(third_instance.id)
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Backups(ActiveMixin):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def create_backup(self):
+ set_fake_stuff(uuid=EXAMPLE_BACKUP_ID)
+
+ def create_backup(client):
+ backup = client.backups.create(name='snapshot',
+ instance=json_instance.id,
+ description="My Backup")
+ with open("/tmp/mario", 'a') as f:
+ f.write("BACKUP = %s\n" % backup.id)
+ return backup
+
+ results = self.snippet(
+ "backup_create", "/backups", "POST", 202, "Accepted",
+ create_backup)
+ self._wait_for_active("BACKUP")
+ assert_equal(len(results), 1)
+ self.json_backup = results[JSON_INDEX]
+
+ @test(depends_on=[create_backup])
+ def get_backup(self):
+ results = self.snippet(
+ "backup_get",
+ "/backups/%s" % self.json_backup.id,
+ "GET", 200, "OK",
+ lambda client: client.backups.get(self.json_backup.id))
+ assert_equal(len(results), 1)
+
+ @test(depends_on=[create_backup])
+ def get_backups_for_instance(self):
+ results = self.snippet(
+ "backups_by_instance",
+ "/instances/%s/backups" % json_instance.id,
+ "GET", 200, "OK",
+ lambda client: client.instances.backups(json_instance.id))
+ assert_equal(len(results), 1)
+
+ @test(depends_on=[create_backup])
+ def list_backups(self):
+ results = self.snippet(
+ "backup_list",
+ "/backups", "GET", 200, "OK",
+ lambda client: client.backups.list())
+ assert_equal(len(results), 1)
+
+ @test(depends_on=[create_backup])
+ def restore(self):
+ set_fake_stuff(uuid=EXAMPLE_INSTANCE_ID_2)
+
+ def create_instance(client, name, backup):
+ instance = client.instances.create(
+ name, 1,
+ volume={'size': 2},
+ restorePoint={'backupRef': backup})
+ assert_equal(instance.status, "BUILD")
+ return instance
+ results = self.snippet(
+ "backup_restore",
+ "/instances", "POST", 200, "OK",
+ lambda client: create_instance(
+ client, "backup_instance", self.json_backup.id))
+ assert_equal(len(results), 1)
+ self.json_restore = results[JSON_INDEX]
+ self._wait_for_restore_active("BUILD")
+ self.json_restore = self.client.instances.get(self.json_restore.id)
+ assert_equal(self.json_restore.status, "ACTIVE")
+
+ @test(depends_on=[restore])
+ def delete_restores(self):
+ self.snippet(
+ "restore_delete",
+ "/instances/%s" % self.json_restore.id,
+ "DELETE", 202, "Accepted",
+ lambda client: client.instances.delete(self.json_restore.id))
+ self.json_restore = self.client.instances.get(self.json_restore.id)
+ assert_equal(self.json_restore.status, "SHUTDOWN")
+
+ @test(depends_on=[create_backup],
+ runs_after=[get_backup, list_backups, restore,
+ get_backups_for_instance])
+ def delete_backup(self):
+ results = self.snippet(
+ "backup_delete",
+ "/backups/%s" % self.json_backup.id,
+ "DELETE", 202, "Accepted",
+ lambda client: client.backups.delete(self.json_backup.id))
+ assert_equal(len(results), 1)
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class Actions(ActiveMixin):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def instance_restart(self):
+ self.snippet(
+ "instance_restart",
+ "/instances/%s/action" % json_instance.id,
+ "POST", 202, "Accepted",
+ lambda client: client.instances.restart(json_instance.id))
+ self._wait_for_active("REBOOT")
+
+ @test
+ def instance_resize_volume(self):
+ self.snippet(
+ "instance_resize_volume",
+ "/instances/%s/action" % json_instance.id,
+ "POST", 202, "Accepted",
+ lambda client: client.instances.resize_volume(json_instance.id, 4))
+ self._wait_for_active("RESIZE")
+ assert_equal(json_instance.volume['size'], 4)
+
+ @test
+ def instance_resize_flavor(self):
+ self.snippet(
+ "instance_resize_flavor",
+ ("/instances/%s/action" % json_instance.id),
+ "POST", 202, "Accepted",
+ lambda client: client.instances.resize_instance(
+ json_instance.id, 3))
+ self._wait_for_active("RESIZE")
+ # TODO(imsplitbit): remove coercion when troveclient fixes are in
+ assert_equal(int(json_instance.flavor['id']), 3)
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances', "MgmtHosts"])
+class MgmtHosts(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_list_hosts(self):
+ results = self.snippet(
+ "mgmt_list_hosts",
+ "/mgmt/hosts", "GET", 200, "OK",
+ lambda client: client.mgmt.hosts.index())
+
+ with Check() as check:
+ for hosts in results:
+ check.equal(2, len(hosts))
+ check.true("fake_host_1" == hosts[0].name
+ or "fake_host_1" == hosts[1].name)
+ check.true("fake_host_2" == hosts[0].name
+ or "fake_host_2" == hosts[1].name)
+ check.true(1 == results[0][1].instanceCount
+ or 1 == results[0][0].instanceCount)
+
+ @test
+ def mgmt_get_host_detail(self):
+ results = self.snippet(
+ "mgmt_get_host_detail",
+ "/mgmt/hosts/fake_host_1", "GET", 200, "OK",
+ lambda client: client.mgmt.hosts.get("fake_host_1"))
+ with Check() as check:
+ for host in results:
+ check.equal(results[0].name, "fake_host_1")
+ # XML entries won't come back as these types. :(
+ check.true(isinstance(results[0].percentUsed, int)),
+ check.true(isinstance(results[0].totalRAM, int)),
+ check.true(isinstance(results[0].usedRAM, int)),
+ with Check() as check:
+ for host in results:
+ check.equal(1, len(host.instances))
+ for instance in host.instances:
+ check.equal(instance['status'], 'ACTIVE')
+ check.true(isinstance(instance['name'], basestring))
+ check.true(isinstance(instance['id'], basestring))
+ check.true(isinstance(instance['server_id'], basestring))
+ check.true(isinstance(instance['tenant_id'], basestring))
+
+ @test
+ def mgmt_host_update_all(self):
+ raise SkipTest("This isn't working... :(")
+ self.snippet(
+ "mgmt_host_update",
+ "/mgmt/hosts/fake_host_1/instances/action",
+ "POST", 202, "Accepted",
+ lambda client: client.mgmt.hosts.update_all("fake_host_1"))
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtStorage(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_get_storage(self):
+ results = self.snippet(
+ "mgmt_get_storage",
+ "/mgmt/storage", "GET", 200, "OK",
+ lambda client: client.mgmt.storage.index())
+ for index, devices in enumerate(results):
+ with Check() as check:
+ check.equal(1, len(devices))
+ device = devices[0]
+ check.equal(int(device.capacity['available']), 90)
+ check.equal(int(device.capacity['total']), 100)
+ check.equal(device.name, "fake_storage")
+ check.equal(int(device.provision['available']), 40)
+ check.equal(int(device.provision['percent']), 10)
+ check.equal(int(device.provision['total']), 50)
+ check.equal(device.type, "test_type")
+ check.equal(int(device.used), 10)
+ if index == JSON_INDEX:
+ check.true(isinstance(device.capacity['available'], int))
+ check.true(isinstance(device.capacity['total'], int))
+ check.true(isinstance(device.provision['available'], int))
+ check.true(isinstance(device.provision['percent'], int))
+ check.true(isinstance(device.provision['total'], int))
+ check.true(isinstance(device.used, int))
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtAccount(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_get_account_details(self):
+ results = self.snippet(
+ "mgmt_get_account_details",
+ "/mgmt/accounts/%s" % conf['normal_user_tenant'],
+ "GET", 200, "OK",
+ lambda client: client.mgmt.accounts.show(
+ conf['normal_user_tenant'], ))
+ with Check() as check:
+ for account_info in results:
+ check.equal(conf['normal_user_tenant'], account_info.id)
+
+ @test
+ def mgmt_get_account_list(self):
+ results = self.snippet(
+ "mgmt_list_accounts",
+ "/mgmt/accounts", "GET", 200, "OK",
+ lambda client: client.mgmt.accounts.index())
+ matches = {conf['normal_user_tenant']: 2,
+ conf['admin_user_tenant']: 0}
+ for index, result in enumerate(results):
+ for account in result.accounts:
+ if account['id'] not in matches:
+ fail("Did not expect this account ID: %s" % account['id'])
+ expected_count = matches[account['id']]
+ if index == JSON_INDEX:
+ assert_equal(2, expected_count)
+ else:
+ assert_equal(2, expected_count)
+
+
+def for_both(func):
+ def both(self):
+ for result in self.results:
+ func(self, result)
+ return both
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtInstance(Example):
+
+ @before_class
+ def mgmt_get_instance_details(self):
+ self.client = make_client(admin_user)
+ self.results = self.snippet(
+ "mgmt_get_instance_details",
+ ("/mgmt/instances/%s" % json_instance.id),
+ "GET", 200, "OK",
+ lambda client: client.mgmt.instances.show(json_instance.id))
+
+ @test
+ @for_both
+ def created(self, result):
+ assert_true(isinstance(result.created, basestring))
+
+ @test
+ def deleted(self):
+ assert_equal(self.results[JSON_INDEX].deleted, False)
+
+ @test
+ @for_both
+ def flavor(self, result):
+ # TODO(imsplitbit): remove the coercion when python-troveclient fixes
+ # land in the public.
+ assert_true(
+ int(result.flavor['id']) == 1 or int(result.flavor['id']) == 3)
+ assert_equal(len(result.flavor['links']), 2)
+
+ @test
+ @for_both
+ def guest_status(self, result):
+ assert_equal(result.guest_status['state_description'], 'running')
+
+ @test(enabled=False)
+ @for_both
+ def host(self, result):
+ assert_equal(result.host, 'fake_host_1')
+
+ @test
+ def id(self):
+ assert_equal(self.results[JSON_INDEX].id, json_instance.id)
+
+ @test
+ @for_both
+ def links(self, result):
+ assert_true(isinstance(result.links, list))
+ for link in result.links:
+ assert_true(isinstance(link, dict))
+ assert_true(isinstance(link['href'], basestring))
+ assert_true(isinstance(link['rel'], basestring))
+
+ @test
+ def local_id(self):
+ assert_true(isinstance(self.results[JSON_INDEX].server['local_id'],
+ int))
+
+ @test
+ @for_both
+ def name(self, result):
+ assert_true(isinstance(result.name, basestring))
+
+ @test
+ @for_both
+ def server_id(self, result):
+ assert_true(isinstance(result.server['id'], basestring))
+
+ @test
+ @for_both
+ def status(self, result):
+ assert_equal("ACTIVE", result.status)
+
+ @test
+ @for_both
+ def task_description(self, result):
+ assert_equal(result.task_description, "No tasks for the instance.")
+
+ @test
+ @for_both
+ def tenant_id(self, result):
+ assert_equal(result.tenant_id, conf['normal_user_tenant'])
+
+ @test
+ @for_both
+ def updated(self, result):
+ assert_true(isinstance(result.updated, basestring))
+
+ @test
+ @for_both
+ def volume(self, result):
+ assert_true(isinstance(result.volume, dict))
+ assert_true('id' in result.volume)
+ assert_true('size' in result.volume)
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtInstanceIndex(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_instance_index(self, deleted=False):
+ self.snippet(
+ "mgmt_instance_index",
+ "/mgmt/instances?deleted=false", "GET", 200, "OK",
+ lambda client: client.mgmt.instances.index(deleted=False))
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtInstanceDiagnostics(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_get_instance_diagnostics(self):
+ self.snippet(
+ "mgmt_instance_diagnostics",
+ ("/mgmt/instances/%s/diagnostics" % json_instance.id),
+ "GET", 200, "OK",
+ lambda client: client.diagnostics.get(json_instance.id))
+
+
+@test(depends_on=[CreateInstance])
+class MgmtInstanceRoot(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_get_root_details(self):
+ self.snippet(
+ "mgmt_get_root_details",
+ ("/mgmt/instances/%s/root" % json_instance.id),
+ "GET", 200, "OK",
+ lambda client: client.mgmt.instances.root_enabled_history(
+ json_instance.id)
+ )
+
+
+@test(depends_on=[CreateInstance], enabled=False)
+class MgmtInstanceHWInfo(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_get_hw_info(self):
+ self.snippet(
+ "mgmt_get_hw_info",
+ ("/mgmt/instances/%s/hwinfo" % json_instance.id),
+ "GET", 200, "OK",
+ lambda client, id: client.hw_info.get(id),
+ ([json_instance.id], ))
+
+
+@test(depends_on=[CreateInstance], groups=['uses_instances'])
+class MgmtInstanceReboot(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_instance_reboot(self):
+ self.snippet(
+ "instance_reboot",
+ ("/mgmt/instances/%s/action" % json_instance.id),
+ "POST", 202, "Accepted",
+ lambda client: client.mgmt.instances.reboot(json_instance.id))
+
+
+@test(depends_on=[CreateInstance],
+ groups=['uses_instances'], enabled=False)
+class MgmtInstanceGuestUpdate(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(admin_user)
+
+ @test
+ def mgmt_instance_guest_update(self):
+ self.snippet(
+ "guest_update",
+ ("/mgmt/instances/%s/action" % json_instance.id),
+ "POST", 202, "Accepted",
+ lambda client: client.mgmt.instances.update(json_instance.id))
+
+
+@test(depends_on=[CreateInstance], runs_after_groups=['uses_instances'])
+class ZzzDeleteInstance(Example):
+
+ @before_class
+ def setup(self):
+ self.client = make_client(normal_user)
+
+ @test
+ def zzz_delete_instance(self):
+ global json_instance
+ self.snippet(
+ "delete_instance",
+ "/instances/%s" % json_instance.id,
+ "DELETE", 202, "Accepted",
+ lambda client: client.instances.delete(json_instance.id))
+ json_instance = self.client.instances.get(json_instance.id)
+ assert_equal(json_instance.status, "SHUTDOWN")
+
+ @test(depends_on=[zzz_delete_instance])
+ def delete_configuration(self):
+ config = STATE["CONFIGURATION"]
+ self.configs = self.snippet(
+ "configuration_delete",
+ ("/configurations/%s" % config.id),
+ "DELETE", 202, "Accepted",
+ lambda client: client.configurations.delete(config.id))
+
+
+if __name__ == "__main__":
+ CONFIG.load_from_file("etc/tests/localhost.test.conf")
+ TestProgram().run_and_exit()