summaryrefslogtreecommitdiff
path: root/trove/tests/unittests/extensions
diff options
context:
space:
mode:
authorZhao Chao <zhaochao1984@gmail.com>2018-02-24 21:59:58 +0800
committerZhao Chao <zhaochao1984@gmail.com>2018-02-28 09:14:36 +0800
commit579f5afc6441e2279d3d660b90268c760238202a (patch)
treed7ae47d05c79bf348c8e979e5c63709ec7d4daaf /trove/tests/unittests/extensions
parentb59b6ae9680d69e96439b925850d4bf96e63adf1 (diff)
downloadtrove-579f5afc6441e2279d3d660b90268c760238202a.tar.gz
Use RootHistory to check if root is ever enabled
When disabling root, there is no need to call guestagent to check whether the root user is ever enabled. Root hisotry table should be used for this purpose. As datastore specific root controller of MySQL/Cassandra/PostgreSQL were created only for the '_find_root_user' which were calling guestagent to find root user, these controllers are removed and 'DefaultRootController' is used instead. RedisRootController is also updated as it didn't do this check previously. Unittests directory structure is also slightly changed. It's more clear to use similar directory hierarchies for testing and source code, e.g. trove/extensions/common/service.py trove/tests/unitests/extensions/common/test_service.py Change-Id: I9faac61d9650347b51f23e8fcaf5a92aed5fbf93 Signed-off-by: Zhao Chao <zhaochao1984@gmail.com>
Diffstat (limited to 'trove/tests/unittests/extensions')
-rw-r--r--trove/tests/unittests/extensions/__init__.py0
-rw-r--r--trove/tests/unittests/extensions/common/__init__.py0
-rw-r--r--trove/tests/unittests/extensions/common/test_service.py408
-rw-r--r--trove/tests/unittests/extensions/redis/__init__.py0
-rw-r--r--trove/tests/unittests/extensions/redis/test_service.py236
5 files changed, 644 insertions, 0 deletions
diff --git a/trove/tests/unittests/extensions/__init__.py b/trove/tests/unittests/extensions/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/trove/tests/unittests/extensions/__init__.py
diff --git a/trove/tests/unittests/extensions/common/__init__.py b/trove/tests/unittests/extensions/common/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/trove/tests/unittests/extensions/common/__init__.py
diff --git a/trove/tests/unittests/extensions/common/test_service.py b/trove/tests/unittests/extensions/common/test_service.py
new file mode 100644
index 00000000..c9341337
--- /dev/null
+++ b/trove/tests/unittests/extensions/common/test_service.py
@@ -0,0 +1,408 @@
+# Copyright 2013 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from mock import Mock
+from mock import patch
+from oslo_config.cfg import NoSuchOptError
+
+from trove.common import exception
+from trove.common import utils
+from trove.extensions.common import models
+from trove.extensions.common.service import ClusterRootController
+from trove.extensions.common.service import DefaultRootController
+from trove.extensions.common.service import RootController
+from trove.instance import models as instance_models
+from trove.instance.models import DBInstance
+from trove.tests.unittests import trove_testtools
+
+
+class TestDefaultRootController(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestDefaultRootController, self).setUp()
+ self.controller = DefaultRootController()
+
+ @patch.object(models.Root, "load")
+ def test_root_index(self, root_load):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = False
+ self.controller.root_index(req, tenant_id, uuid, is_cluster)
+ root_load.assert_called_with(context, uuid)
+
+ def test_root_index_with_cluster(self):
+ req = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = True
+ self.assertRaises(
+ exception.ClusterOperationNotSupported,
+ self.controller.root_index,
+ req, tenant_id, uuid, is_cluster)
+
+ @patch.object(models.Root, "create")
+ def test_root_create(self, root_create):
+ user = Mock()
+ context = Mock()
+ context.user = Mock()
+ context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = False
+ password = Mock()
+ body = {'password': password}
+ self.controller.root_create(req, body, tenant_id, uuid, is_cluster)
+ root_create.assert_called_with(context, uuid, password)
+
+ def test_root_create_with_cluster(self):
+ req = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = True
+ password = Mock()
+ body = {'password': password}
+ self.assertRaises(
+ exception.ClusterOperationNotSupported,
+ self.controller.root_create,
+ req, body, tenant_id, uuid, is_cluster)
+
+ @patch.object(models.Root, "delete")
+ @patch.object(models.Root, "load")
+ def test_root_delete(self, root_load, root_delete):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = Mock()
+ instance_id = utils.generate_uuid()
+ is_cluster = False
+ root_load.return_value = True
+ self.controller.root_delete(req, tenant_id, instance_id, is_cluster)
+ root_load.assert_called_with(context, instance_id)
+ root_delete.assert_called_with(context, instance_id)
+
+ @patch.object(models.Root, "delete")
+ @patch.object(models.Root, "load")
+ def test_root_delete_without_root_enabled(self, root_load, root_delete):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = Mock()
+ instance_id = utils.generate_uuid()
+ is_cluster = False
+ root_load.return_value = False
+ self.assertRaises(
+ exception.RootHistoryNotFound,
+ self.controller.root_delete,
+ req, tenant_id, instance_id, is_cluster)
+ root_load.assert_called_with(context, instance_id)
+ root_delete.assert_not_called()
+
+ def test_root_delete_with_cluster(self):
+ req = Mock()
+ tenant_id = Mock()
+ instance_id = utils.generate_uuid()
+ is_cluster = True
+ self.assertRaises(
+ exception.ClusterOperationNotSupported,
+ self.controller.root_delete,
+ req, tenant_id, instance_id, is_cluster)
+
+
+class TestRootController(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestRootController, self).setUp()
+ self.context = trove_testtools.TroveTestContext(self)
+ self.controller = RootController()
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RootController, "load_root_controller")
+ @patch.object(RootController, "_get_datastore")
+ def test_index(self, service_get_datastore, service_load_root_controller,
+ service_load_instance):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ ds_manager = Mock()
+ is_cluster = False
+ service_get_datastore.return_value = (ds_manager, is_cluster)
+ root_controller = Mock()
+ ret = Mock()
+ root_controller.root_index = Mock(return_value=ret)
+ service_load_root_controller.return_value = root_controller
+
+ self.assertEqual(ret, self.controller.index(req, tenant_id, uuid))
+ service_get_datastore.assert_called_with(tenant_id, uuid)
+ service_load_root_controller.assert_called_with(ds_manager)
+ root_controller.root_index.assert_called_with(
+ req, tenant_id, uuid, is_cluster)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RootController, "load_root_controller")
+ @patch.object(RootController, "_get_datastore")
+ def test_create(self, service_get_datastore, service_load_root_controller,
+ service_load_instance):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ body = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ ds_manager = Mock()
+ is_cluster = False
+ service_get_datastore.return_value = (ds_manager, is_cluster)
+ root_controller = Mock()
+ ret = Mock()
+ root_controller.root_create = Mock(return_value=ret)
+ service_load_root_controller.return_value = root_controller
+
+ self.assertEqual(
+ ret, self.controller.create(req, tenant_id, uuid, body=body))
+ service_get_datastore.assert_called_with(tenant_id, uuid)
+ service_load_root_controller.assert_called_with(ds_manager)
+ root_controller.root_create.assert_called_with(
+ req, body, tenant_id, uuid, is_cluster)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RootController, "load_root_controller")
+ @patch.object(RootController, "_get_datastore")
+ def test_create_with_no_root_controller(self,
+ service_get_datastore,
+ service_load_root_controller,
+ service_load_instance):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ body = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ ds_manager = Mock()
+ is_cluster = False
+ service_get_datastore.return_value = (ds_manager, is_cluster)
+ service_load_root_controller.return_value = None
+
+ self.assertRaises(
+ NoSuchOptError,
+ self.controller.create,
+ req, tenant_id, uuid, body=body)
+ service_get_datastore.assert_called_with(tenant_id, uuid)
+ service_load_root_controller.assert_called_with(ds_manager)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RootController, "load_root_controller")
+ @patch.object(RootController, "_get_datastore")
+ def test_delete(self, service_get_datastore, service_load_root_controller,
+ service_load_instance):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ ds_manager = Mock()
+ is_cluster = False
+ service_get_datastore.return_value = (ds_manager, is_cluster)
+ root_controller = Mock()
+ ret = Mock()
+ root_controller.root_delete = Mock(return_value=ret)
+ service_load_root_controller.return_value = root_controller
+
+ self.assertEqual(
+ ret, self.controller.delete(req, tenant_id, uuid))
+ service_get_datastore.assert_called_with(tenant_id, uuid)
+ service_load_root_controller.assert_called_with(ds_manager)
+ root_controller.root_delete.assert_called_with(
+ req, tenant_id, uuid, is_cluster)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RootController, "load_root_controller")
+ @patch.object(RootController, "_get_datastore")
+ def test_delete_with_no_root_controller(self,
+ service_get_datastore,
+ service_load_root_controller,
+ service_load_instance):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ ds_manager = Mock()
+ is_cluster = False
+ service_get_datastore.return_value = (ds_manager, is_cluster)
+ service_load_root_controller.return_value = None
+
+ self.assertRaises(
+ NoSuchOptError,
+ self.controller.delete,
+ req, tenant_id, uuid)
+ service_get_datastore.assert_called_with(tenant_id, uuid)
+ service_load_root_controller.assert_called_with(ds_manager)
+
+
+class TestClusterRootController(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestClusterRootController, self).setUp()
+ self.context = trove_testtools.TroveTestContext(self)
+ self.controller = ClusterRootController()
+
+ @patch.object(ClusterRootController, "cluster_root_index")
+ def test_root_index_cluster(self, mock_cluster_root_index):
+ req = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = True
+ self.controller.root_index(req, tenant_id, uuid, is_cluster)
+ mock_cluster_root_index.assert_called_with(req, tenant_id, uuid)
+
+ @patch.object(ClusterRootController, "instance_root_index")
+ def test_root_index_instance(self, mock_instance_root_index):
+ req = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = False
+ self.controller.root_index(req, tenant_id, uuid, is_cluster)
+ mock_instance_root_index.assert_called_with(req, tenant_id, uuid)
+
+ @patch.object(ClusterRootController, "cluster_root_create")
+ def test_root_create_cluster(self, mock_cluster_root_create):
+ req = Mock()
+ body = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = True
+ self.controller.root_create(req, body, tenant_id, uuid, is_cluster)
+ mock_cluster_root_create.assert_called_with(req, body, tenant_id, uuid)
+
+ @patch.object(ClusterRootController, "check_cluster_instance_actions")
+ @patch.object(ClusterRootController, "instance_root_create")
+ def test_root_create_instance(self, mock_instance_root_create, mock_check):
+ req = Mock()
+ body = Mock()
+ tenant_id = Mock()
+ uuid = utils.generate_uuid()
+ is_cluster = False
+ self.controller.root_create(req, body, tenant_id, uuid, is_cluster)
+ mock_check.assert_called_with(uuid)
+ mock_instance_root_create.assert_called_with(req, body, uuid)
+
+ @patch.object(models.ClusterRoot, "load")
+ def test_instance_root_index(self, mock_cluster_root_load):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ tenant_id = Mock()
+ instance_id = utils.generate_uuid()
+ self.controller.instance_root_index(req, tenant_id, instance_id)
+ mock_cluster_root_load.assert_called_with(self.context, instance_id)
+
+ @patch.object(models.ClusterRoot, "load",
+ side_effect=exception.UnprocessableEntity())
+ def test_instance_root_index_exception(self, mock_cluster_root_load):
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ tenant_id = Mock()
+ instance_id = utils.generate_uuid()
+ self.assertRaises(
+ exception.UnprocessableEntity,
+ self.controller.instance_root_index,
+ req, tenant_id, instance_id
+ )
+ mock_cluster_root_load.assert_called_with(self.context, instance_id)
+
+ @patch.object(ClusterRootController, "instance_root_index")
+ @patch.object(ClusterRootController, "_get_cluster_instance_id")
+ def test_cluster_root_index(self, mock_get_cluster_instance,
+ mock_instance_root_index):
+ req = Mock()
+ tenant_id = Mock()
+ cluster_id = utils.generate_uuid()
+ single_instance_id = Mock()
+ mock_get_cluster_instance.return_value = (single_instance_id, Mock())
+ self.controller.cluster_root_index(req, tenant_id, cluster_id)
+ mock_get_cluster_instance.assert_called_with(tenant_id, cluster_id)
+ mock_instance_root_index.assert_called_with(req, tenant_id,
+ single_instance_id)
+
+ @patch.object(ClusterRootController, "instance_root_create")
+ @patch.object(ClusterRootController, "_get_cluster_instance_id")
+ def test_cluster_root_create(self, mock_get_cluster_instance,
+ mock_instance_root_create):
+ req = Mock()
+ body = Mock()
+ tenant_id = Mock()
+ cluster_id = utils.generate_uuid()
+ single_instance_id = Mock()
+ cluster_instances = Mock()
+ mock_get_cluster_instance.return_value = (single_instance_id,
+ cluster_instances)
+ self.controller.cluster_root_create(req, body, tenant_id, cluster_id)
+ mock_get_cluster_instance.assert_called_with(tenant_id, cluster_id)
+ mock_instance_root_create.assert_called_with(req, body,
+ single_instance_id,
+ cluster_instances)
+
+ @patch.object(DBInstance, "find_all")
+ def test_get_cluster_instance_id(self, mock_find_all):
+ tenant_id = Mock()
+ cluster_id = Mock()
+ db_inst_1 = Mock()
+ db_inst_1.id.return_value = utils.generate_uuid()
+ db_inst_2 = Mock()
+ db_inst_2.id.return_value = utils.generate_uuid()
+ cluster_instances = [db_inst_1, db_inst_2]
+ mock_find_all.return_value.all.return_value = cluster_instances
+ ret = self.controller._get_cluster_instance_id(tenant_id, cluster_id)
+ self.assertEqual(db_inst_1.id, ret[0])
+ self.assertEqual([db_inst_1.id, db_inst_2.id], ret[1])
+
+ @patch.object(models.ClusterRoot, "create")
+ def test_instance_root_create(self, mock_cluster_root_create):
+ user = Mock()
+ self.context.user = Mock()
+ self.context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ password = Mock()
+ body = {'password': password}
+ instance_id = utils.generate_uuid()
+ cluster_instances = Mock()
+ self.controller.instance_root_create(
+ req, body, instance_id, cluster_instances)
+ mock_cluster_root_create.assert_called_with(
+ self.context, instance_id, password,
+ cluster_instances)
+
+ @patch.object(models.ClusterRoot, "create")
+ def test_instance_root_create_no_body(self, mock_cluster_root_create):
+ user = Mock()
+ self.context.user = Mock()
+ self.context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = {'trove.context': self.context}
+ password = None
+ body = None
+ instance_id = utils.generate_uuid()
+ cluster_instances = Mock()
+ self.controller.instance_root_create(
+ req, body, instance_id, cluster_instances)
+ mock_cluster_root_create.assert_called_with(
+ self.context, instance_id, password,
+ cluster_instances)
diff --git a/trove/tests/unittests/extensions/redis/__init__.py b/trove/tests/unittests/extensions/redis/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/trove/tests/unittests/extensions/redis/__init__.py
diff --git a/trove/tests/unittests/extensions/redis/test_service.py b/trove/tests/unittests/extensions/redis/test_service.py
new file mode 100644
index 00000000..ad98b30f
--- /dev/null
+++ b/trove/tests/unittests/extensions/redis/test_service.py
@@ -0,0 +1,236 @@
+# Copyright 2017 Eayun, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+import uuid
+
+from mock import Mock, patch
+
+from trove.common import exception
+from trove.datastore import models as datastore_models
+from trove.extensions.common import models
+from trove.extensions.redis.models import RedisRoot
+from trove.extensions.redis.service import RedisRootController
+from trove.instance import models as instance_models
+from trove.instance.models import DBInstance
+from trove.instance.tasks import InstanceTasks
+from trove.taskmanager import api as task_api
+from trove.tests.unittests import trove_testtools
+from trove.tests.unittests.util import util
+
+
+class TestRedisRootController(trove_testtools.TestCase):
+
+ @patch.object(task_api.API, 'get_client', Mock(return_value=Mock()))
+ def setUp(self):
+ util.init_db()
+ self.context = trove_testtools.TroveTestContext(self, is_admin=True)
+ self.datastore = datastore_models.DBDatastore.create(
+ id=str(uuid.uuid4()),
+ name='redis' + str(uuid.uuid4()),
+ )
+ self.datastore_version = (
+ datastore_models.DBDatastoreVersion.create(
+ id=str(uuid.uuid4()),
+ datastore_id=self.datastore.id,
+ name="3.2" + str(uuid.uuid4()),
+ manager="redis",
+ image_id="image_id",
+ packages="",
+ active=True))
+ self.tenant_id = "UUID"
+ self.single_db_info = DBInstance.create(
+ id="redis-single",
+ name="redis-single",
+ flavor_id=1,
+ datastore_version_id=self.datastore_version.id,
+ tenant_id=self.tenant_id,
+ volume_size=None,
+ task_status=InstanceTasks.NONE)
+ self.master_db_info = DBInstance.create(
+ id="redis-master",
+ name="redis-master",
+ flavor_id=1,
+ datastore_version_id=self.datastore_version.id,
+ tenant_id=self.tenant_id,
+ volume_size=None,
+ task_status=InstanceTasks.NONE)
+ self.slave_db_info = DBInstance.create(
+ id="redis-slave",
+ name="redis-slave",
+ flavor_id=1,
+ datastore_version_id=self.datastore_version.id,
+ tenant_id=self.tenant_id,
+ volume_size=None,
+ task_status=InstanceTasks.NONE,
+ slave_of_id=self.master_db_info.id)
+
+ super(TestRedisRootController, self).setUp()
+ self.controller = RedisRootController()
+
+ def tearDown(self):
+ self.datastore.delete()
+ self.datastore_version.delete()
+ self.master_db_info.delete()
+ self.slave_db_info.delete()
+ super(TestRedisRootController, self).tearDown()
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(models.Root, "create")
+ def test_root_create_on_single_instance(self, root_create, *args):
+ user = Mock()
+ context = Mock()
+ context.user = Mock()
+ context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.single_db_info.id
+ is_cluster = False
+ password = Mock()
+ body = {"password": password}
+ self.controller.root_create(req, body, tenant_id,
+ instance_id, is_cluster)
+ root_create.assert_called_with(context, instance_id,
+ password)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(models.Root, "create")
+ def test_root_create_on_master_instance(self, root_create, *args):
+ user = Mock()
+ context = Mock()
+ context.user = Mock()
+ context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.master_db_info.id
+ slave_instance_id = self.slave_db_info.id
+ is_cluster = False
+ password = Mock()
+ body = {"password": password}
+ self.controller.root_create(req, body, tenant_id,
+ instance_id, is_cluster)
+ root_create.assert_called_with(context, slave_instance_id,
+ password)
+
+ def test_root_create_on_slave(self):
+ user = Mock()
+ context = Mock()
+ context.user = Mock()
+ context.user.__getitem__ = Mock(return_value=user)
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.slave_db_info.id
+ is_cluster = False
+ body = {}
+ self.assertRaises(
+ exception.SlaveOperationNotSupported,
+ self.controller.root_create,
+ req, body, tenant_id, instance_id, is_cluster)
+
+ def test_root_create_with_cluster(self):
+ req = Mock()
+ tenant_id = self.tenant_id
+ instance_id = self.master_db_info.id
+ is_cluster = True
+ body = {}
+ self.assertRaises(
+ exception.ClusterOperationNotSupported,
+ self.controller.root_create,
+ req, body, tenant_id, instance_id, is_cluster)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RedisRoot, "get_auth_password")
+ @patch.object(models.Root, "delete")
+ @patch.object(models.Root, "load")
+ def test_root_delete_on_single_instance(self, root_load,
+ root_delete, *args):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.single_db_info.id
+ is_cluster = False
+ root_load.return_value = True
+ self.controller.root_delete(req, tenant_id, instance_id, is_cluster)
+ root_load.assert_called_with(context, instance_id)
+ root_delete.assert_called_with(context, instance_id)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(RedisRoot, "get_auth_password")
+ @patch.object(models.Root, "delete")
+ @patch.object(models.Root, "load")
+ def test_root_delete_on_master_instance(self, root_load,
+ root_delete, *args):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.master_db_info.id
+ slave_instance_id = self.slave_db_info.id
+ is_cluster = False
+ root_load.return_value = True
+ self.controller.root_delete(req, tenant_id, instance_id, is_cluster)
+ root_load.assert_called_with(context, instance_id)
+ root_delete.assert_called_with(context, slave_instance_id)
+
+ def test_root_delete_on_slave(self):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.slave_db_info.id
+ is_cluster = False
+ self.assertRaises(
+ exception.SlaveOperationNotSupported,
+ self.controller.root_delete,
+ req, tenant_id, instance_id, is_cluster)
+
+ def test_root_delete_with_cluster(self):
+ req = Mock()
+ tenant_id = self.tenant_id
+ instance_id = self.master_db_info.id
+ is_cluster = True
+ self.assertRaises(
+ exception.ClusterOperationNotSupported,
+ self.controller.root_delete,
+ req, tenant_id, instance_id, is_cluster)
+
+ @patch.object(instance_models.Instance, "load")
+ @patch.object(models.Root, "delete")
+ @patch.object(models.Root, "load")
+ def test_root_delete_without_root_enabled(self, root_load,
+ root_delete, *args):
+ context = Mock()
+ req = Mock()
+ req.environ = Mock()
+ req.environ.__getitem__ = Mock(return_value=context)
+ tenant_id = self.tenant_id
+ instance_id = self.single_db_info.id
+ is_cluster = False
+ root_load.return_value = False
+ self.assertRaises(
+ exception.RootHistoryNotFound,
+ self.controller.root_delete,
+ req, tenant_id, instance_id, is_cluster)
+ root_load.assert_called_with(context, instance_id)
+ root_delete.assert_not_called()