summaryrefslogtreecommitdiff
path: root/nova/tests/unit/virt/vmwareapi/test_ds_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests/unit/virt/vmwareapi/test_ds_util.py')
-rw-r--r--nova/tests/unit/virt/vmwareapi/test_ds_util.py548
1 files changed, 548 insertions, 0 deletions
diff --git a/nova/tests/unit/virt/vmwareapi/test_ds_util.py b/nova/tests/unit/virt/vmwareapi/test_ds_util.py
new file mode 100644
index 0000000000..6f5cf74b26
--- /dev/null
+++ b/nova/tests/unit/virt/vmwareapi/test_ds_util.py
@@ -0,0 +1,548 @@
+# Copyright (c) 2014 VMware, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import re
+
+import mock
+from oslo.utils import units
+from oslo.vmware import exceptions as vexc
+from testtools import matchers
+
+from nova import exception
+from nova.i18n import _
+from nova import test
+from nova.tests.unit.virt.vmwareapi import fake
+from nova.virt.vmwareapi import ds_util
+
+
+class DsUtilTestCase(test.NoDBTestCase):
+ def setUp(self):
+ super(DsUtilTestCase, self).setUp()
+ self.session = fake.FakeSession()
+ self.flags(api_retry_count=1, group='vmware')
+ fake.reset()
+
+ def tearDown(self):
+ super(DsUtilTestCase, self).tearDown()
+ fake.reset()
+
+ def test_file_delete(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ self.assertEqual('DeleteDatastoreFile_Task', method)
+ name = kwargs.get('name')
+ self.assertEqual('[ds] fake/path', name)
+ datacenter = kwargs.get('datacenter')
+ self.assertEqual('fake-dc-ref', datacenter)
+ return 'fake_delete_task'
+
+ with contextlib.nested(
+ mock.patch.object(self.session, '_wait_for_task'),
+ mock.patch.object(self.session, '_call_method',
+ fake_call_method)
+ ) as (_wait_for_task, _call_method):
+ ds_path = ds_util.DatastorePath('ds', 'fake/path')
+ ds_util.file_delete(self.session,
+ ds_path, 'fake-dc-ref')
+ _wait_for_task.assert_has_calls([
+ mock.call('fake_delete_task')])
+
+ def test_file_move(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ self.assertEqual('MoveDatastoreFile_Task', method)
+ sourceName = kwargs.get('sourceName')
+ self.assertEqual('[ds] tmp/src', sourceName)
+ destinationName = kwargs.get('destinationName')
+ self.assertEqual('[ds] base/dst', destinationName)
+ sourceDatacenter = kwargs.get('sourceDatacenter')
+ self.assertEqual('fake-dc-ref', sourceDatacenter)
+ destinationDatacenter = kwargs.get('destinationDatacenter')
+ self.assertEqual('fake-dc-ref', destinationDatacenter)
+ return 'fake_move_task'
+
+ with contextlib.nested(
+ mock.patch.object(self.session, '_wait_for_task'),
+ mock.patch.object(self.session, '_call_method',
+ fake_call_method)
+ ) as (_wait_for_task, _call_method):
+ src_ds_path = ds_util.DatastorePath('ds', 'tmp/src')
+ dst_ds_path = ds_util.DatastorePath('ds', 'base/dst')
+ ds_util.file_move(self.session,
+ 'fake-dc-ref', src_ds_path, dst_ds_path)
+ _wait_for_task.assert_has_calls([
+ mock.call('fake_move_task')])
+
+ def test_mkdir(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ self.assertEqual('MakeDirectory', method)
+ name = kwargs.get('name')
+ self.assertEqual('[ds] fake/path', name)
+ datacenter = kwargs.get('datacenter')
+ self.assertEqual('fake-dc-ref', datacenter)
+ createParentDirectories = kwargs.get('createParentDirectories')
+ self.assertTrue(createParentDirectories)
+
+ with mock.patch.object(self.session, '_call_method',
+ fake_call_method):
+ ds_path = ds_util.DatastorePath('ds', 'fake/path')
+ ds_util.mkdir(self.session, ds_path, 'fake-dc-ref')
+
+ def test_file_exists(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ if method == 'SearchDatastore_Task':
+ ds_browser = args[0]
+ self.assertEqual('fake-browser', ds_browser)
+ datastorePath = kwargs.get('datastorePath')
+ self.assertEqual('[ds] fake/path', datastorePath)
+ return 'fake_exists_task'
+
+ # Should never get here
+ self.fail()
+
+ def fake_wait_for_task(task_ref):
+ if task_ref == 'fake_exists_task':
+ result_file = fake.DataObject()
+ result_file.path = 'fake-file'
+
+ result = fake.DataObject()
+ result.file = [result_file]
+ result.path = '[ds] fake/path'
+
+ task_info = fake.DataObject()
+ task_info.result = result
+
+ return task_info
+
+ # Should never get here
+ self.fail()
+
+ with contextlib.nested(
+ mock.patch.object(self.session, '_call_method',
+ fake_call_method),
+ mock.patch.object(self.session, '_wait_for_task',
+ fake_wait_for_task)):
+ ds_path = ds_util.DatastorePath('ds', 'fake/path')
+ file_exists = ds_util.file_exists(self.session,
+ 'fake-browser', ds_path, 'fake-file')
+ self.assertTrue(file_exists)
+
+ def test_file_exists_fails(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ if method == 'SearchDatastore_Task':
+ return 'fake_exists_task'
+
+ # Should never get here
+ self.fail()
+
+ def fake_wait_for_task(task_ref):
+ if task_ref == 'fake_exists_task':
+ raise vexc.FileNotFoundException()
+
+ # Should never get here
+ self.fail()
+
+ with contextlib.nested(
+ mock.patch.object(self.session, '_call_method',
+ fake_call_method),
+ mock.patch.object(self.session, '_wait_for_task',
+ fake_wait_for_task)):
+ ds_path = ds_util.DatastorePath('ds', 'fake/path')
+ file_exists = ds_util.file_exists(self.session,
+ 'fake-browser', ds_path, 'fake-file')
+ self.assertFalse(file_exists)
+
+ def _mock_get_datastore_calls(self, *datastores):
+ """Mock vim_util calls made by get_datastore."""
+
+ datastores_i = [None]
+
+ # For the moment, at least, this list of datastores is simply passed to
+ # get_properties_for_a_collection_of_objects, which we mock below. We
+ # don't need to over-complicate the fake function by worrying about its
+ # contents.
+ fake_ds_list = ['fake-ds']
+
+ def fake_call_method(module, method, *args, **kwargs):
+ # Mock the call which returns a list of datastores for the cluster
+ if (module == ds_util.vim_util and
+ method == 'get_dynamic_property' and
+ args == ('fake-cluster', 'ClusterComputeResource',
+ 'datastore')):
+ fake_ds_mor = fake.DataObject()
+ fake_ds_mor.ManagedObjectReference = fake_ds_list
+ return fake_ds_mor
+
+ # Return the datastore result sets we were passed in, in the order
+ # given
+ if (module == ds_util.vim_util and
+ method == 'get_properties_for_a_collection_of_objects' and
+ args[0] == 'Datastore' and
+ args[1] == fake_ds_list):
+ # Start a new iterator over given datastores
+ datastores_i[0] = iter(datastores)
+ return datastores_i[0].next()
+
+ # Continue returning results from the current iterator.
+ if (module == ds_util.vim_util and
+ method == 'continue_to_get_objects'):
+ try:
+ return datastores_i[0].next()
+ except StopIteration:
+ return None
+
+ # Sentinel that get_datastore's use of vim has changed
+ self.fail('Unexpected vim call in get_datastore: %s' % method)
+
+ return mock.patch.object(self.session, '_call_method',
+ side_effect=fake_call_method)
+
+ def test_get_datastore(self):
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(fake.Datastore())
+ fake_objects.add_object(fake.Datastore("fake-ds-2", 2048, 1000,
+ False, "normal"))
+ fake_objects.add_object(fake.Datastore("fake-ds-3", 4096, 2000,
+ True, "inMaintenance"))
+
+ with self._mock_get_datastore_calls(fake_objects):
+ result = ds_util.get_datastore(self.session, 'fake-cluster')
+ self.assertEqual("fake-ds", result.name)
+ self.assertEqual(units.Ti, result.capacity)
+ self.assertEqual(500 * units.Gi, result.freespace)
+
+ def test_get_datastore_with_regex(self):
+ # Test with a regex that matches with a datastore
+ datastore_valid_regex = re.compile("^openstack.*\d$")
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(fake.Datastore("openstack-ds0"))
+ fake_objects.add_object(fake.Datastore("fake-ds0"))
+ fake_objects.add_object(fake.Datastore("fake-ds1"))
+
+ with self._mock_get_datastore_calls(fake_objects):
+ result = ds_util.get_datastore(self.session, 'fake-cluster',
+ datastore_valid_regex)
+ self.assertEqual("openstack-ds0", result.name)
+
+ def test_get_datastore_with_token(self):
+ regex = re.compile("^ds.*\d$")
+ fake0 = fake.FakeRetrieveResult()
+ fake0.add_object(fake.Datastore("ds0", 10 * units.Gi, 5 * units.Gi))
+ fake0.add_object(fake.Datastore("foo", 10 * units.Gi, 9 * units.Gi))
+ setattr(fake0, 'token', 'token-0')
+ fake1 = fake.FakeRetrieveResult()
+ fake1.add_object(fake.Datastore("ds2", 10 * units.Gi, 8 * units.Gi))
+ fake1.add_object(fake.Datastore("ds3", 10 * units.Gi, 1 * units.Gi))
+
+ with self._mock_get_datastore_calls(fake0, fake1):
+ result = ds_util.get_datastore(self.session, 'fake-cluster', regex)
+ self.assertEqual("ds2", result.name)
+
+ def test_get_datastore_with_list(self):
+ # Test with a regex containing whitelist of datastores
+ datastore_valid_regex = re.compile("(openstack-ds0|openstack-ds2)")
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(fake.Datastore("openstack-ds0"))
+ fake_objects.add_object(fake.Datastore("openstack-ds1"))
+ fake_objects.add_object(fake.Datastore("openstack-ds2"))
+
+ with self._mock_get_datastore_calls(fake_objects):
+ result = ds_util.get_datastore(self.session, 'fake-cluster',
+ datastore_valid_regex)
+ self.assertNotEqual("openstack-ds1", result.name)
+
+ def test_get_datastore_with_regex_error(self):
+ # Test with a regex that has no match
+ # Checks if code raises DatastoreNotFound with a specific message
+ datastore_invalid_regex = re.compile("unknown-ds")
+ exp_message = (_("Datastore regex %s did not match any datastores")
+ % datastore_invalid_regex.pattern)
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(fake.Datastore("fake-ds0"))
+ fake_objects.add_object(fake.Datastore("fake-ds1"))
+ # assertRaisesRegExp would have been a good choice instead of
+ # try/catch block, but it's available only from Py 2.7.
+ try:
+ with self._mock_get_datastore_calls(fake_objects):
+ ds_util.get_datastore(self.session, 'fake-cluster',
+ datastore_invalid_regex)
+ except exception.DatastoreNotFound as e:
+ self.assertEqual(exp_message, e.args[0])
+ else:
+ self.fail("DatastoreNotFound Exception was not raised with "
+ "message: %s" % exp_message)
+
+ def test_get_datastore_without_datastore(self):
+ self.assertRaises(exception.DatastoreNotFound,
+ ds_util.get_datastore,
+ fake.FakeObjectRetrievalSession(None), cluster="fake-cluster")
+
+ def test_get_datastore_inaccessible_ds(self):
+ data_store = fake.Datastore()
+ data_store.set("summary.accessible", False)
+
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(data_store)
+
+ with self._mock_get_datastore_calls(fake_objects):
+ self.assertRaises(exception.DatastoreNotFound,
+ ds_util.get_datastore,
+ self.session, 'fake-cluster')
+
+ def test_get_datastore_ds_in_maintenance(self):
+ data_store = fake.Datastore()
+ data_store.set("summary.maintenanceMode", "inMaintenance")
+
+ fake_objects = fake.FakeRetrieveResult()
+ fake_objects.add_object(data_store)
+
+ with self._mock_get_datastore_calls(fake_objects):
+ self.assertRaises(exception.DatastoreNotFound,
+ ds_util.get_datastore,
+ self.session, 'fake-cluster')
+
+ def test_get_datastore_no_host_in_cluster(self):
+ def fake_call_method(module, method, *args, **kwargs):
+ return ''
+
+ with mock.patch.object(self.session, '_call_method',
+ fake_call_method):
+ self.assertRaises(exception.DatastoreNotFound,
+ ds_util.get_datastore,
+ self.session, 'fake-cluster')
+
+ def _test_is_datastore_valid(self, accessible=True,
+ maintenance_mode="normal",
+ type="VMFS",
+ datastore_regex=None):
+ propdict = {}
+ propdict["summary.accessible"] = accessible
+ propdict["summary.maintenanceMode"] = maintenance_mode
+ propdict["summary.type"] = type
+ propdict["summary.name"] = "ds-1"
+
+ return ds_util._is_datastore_valid(propdict, datastore_regex)
+
+ def test_is_datastore_valid(self):
+ for ds_type in ds_util.ALLOWED_DATASTORE_TYPES:
+ self.assertTrue(self._test_is_datastore_valid(True,
+ "normal",
+ ds_type))
+
+ def test_is_datastore_valid_inaccessible_ds(self):
+ self.assertFalse(self._test_is_datastore_valid(False,
+ "normal",
+ "VMFS"))
+
+ def test_is_datastore_valid_ds_in_maintenance(self):
+ self.assertFalse(self._test_is_datastore_valid(True,
+ "inMaintenance",
+ "VMFS"))
+
+ def test_is_datastore_valid_ds_type_invalid(self):
+ self.assertFalse(self._test_is_datastore_valid(True,
+ "normal",
+ "vfat"))
+
+ def test_is_datastore_valid_not_matching_regex(self):
+ datastore_regex = re.compile("ds-2")
+ self.assertFalse(self._test_is_datastore_valid(True,
+ "normal",
+ "VMFS",
+ datastore_regex))
+
+ def test_is_datastore_valid_matching_regex(self):
+ datastore_regex = re.compile("ds-1")
+ self.assertTrue(self._test_is_datastore_valid(True,
+ "normal",
+ "VMFS",
+ datastore_regex))
+
+
+class DatastoreTestCase(test.NoDBTestCase):
+ def test_ds(self):
+ ds = ds_util.Datastore(
+ "fake_ref", "ds_name", 2 * units.Gi, 1 * units.Gi)
+ self.assertEqual('ds_name', ds.name)
+ self.assertEqual('fake_ref', ds.ref)
+ self.assertEqual(2 * units.Gi, ds.capacity)
+ self.assertEqual(1 * units.Gi, ds.freespace)
+
+ def test_ds_invalid_space(self):
+ self.assertRaises(ValueError, ds_util.Datastore,
+ "fake_ref", "ds_name", 1 * units.Gi, 2 * units.Gi)
+ self.assertRaises(ValueError, ds_util.Datastore,
+ "fake_ref", "ds_name", None, 2 * units.Gi)
+
+ def test_ds_no_capacity_no_freespace(self):
+ ds = ds_util.Datastore("fake_ref", "ds_name")
+ self.assertIsNone(ds.capacity)
+ self.assertIsNone(ds.freespace)
+
+ def test_ds_invalid(self):
+ self.assertRaises(ValueError, ds_util.Datastore, None, "ds_name")
+ self.assertRaises(ValueError, ds_util.Datastore, "fake_ref", None)
+
+ def test_build_path(self):
+ ds = ds_util.Datastore("fake_ref", "ds_name")
+ ds_path = ds.build_path("some_dir", "foo.vmdk")
+ self.assertEqual('[ds_name] some_dir/foo.vmdk', str(ds_path))
+
+
+class DatastorePathTestCase(test.NoDBTestCase):
+
+ def test_ds_path(self):
+ p = ds_util.DatastorePath('dsname', 'a/b/c', 'file.iso')
+ self.assertEqual('[dsname] a/b/c/file.iso', str(p))
+ self.assertEqual('a/b/c/file.iso', p.rel_path)
+ self.assertEqual('a/b/c', p.parent.rel_path)
+ self.assertEqual('[dsname] a/b/c', str(p.parent))
+ self.assertEqual('dsname', p.datastore)
+ self.assertEqual('file.iso', p.basename)
+ self.assertEqual('a/b/c', p.dirname)
+
+ def test_ds_path_no_ds_name(self):
+ bad_args = [
+ ('', ['a/b/c', 'file.iso']),
+ (None, ['a/b/c', 'file.iso'])]
+ for t in bad_args:
+ self.assertRaises(
+ ValueError, ds_util.DatastorePath,
+ t[0], *t[1])
+
+ def test_ds_path_invalid_path_components(self):
+ bad_args = [
+ ('dsname', [None]),
+ ('dsname', ['', None]),
+ ('dsname', ['a', None]),
+ ('dsname', ['a', None, 'b']),
+ ('dsname', [None, '']),
+ ('dsname', [None, 'b'])]
+
+ for t in bad_args:
+ self.assertRaises(
+ ValueError, ds_util.DatastorePath,
+ t[0], *t[1])
+
+ def test_ds_path_no_subdir(self):
+ args = [
+ ('dsname', ['', 'x.vmdk']),
+ ('dsname', ['x.vmdk'])]
+
+ canonical_p = ds_util.DatastorePath('dsname', 'x.vmdk')
+ self.assertEqual('[dsname] x.vmdk', str(canonical_p))
+ self.assertEqual('', canonical_p.dirname)
+ self.assertEqual('x.vmdk', canonical_p.basename)
+ self.assertEqual('x.vmdk', canonical_p.rel_path)
+ for t in args:
+ p = ds_util.DatastorePath(t[0], *t[1])
+ self.assertEqual(str(canonical_p), str(p))
+
+ def test_ds_path_ds_only(self):
+ args = [
+ ('dsname', []),
+ ('dsname', ['']),
+ ('dsname', ['', ''])]
+
+ canonical_p = ds_util.DatastorePath('dsname')
+ self.assertEqual('[dsname]', str(canonical_p))
+ self.assertEqual('', canonical_p.rel_path)
+ self.assertEqual('', canonical_p.basename)
+ self.assertEqual('', canonical_p.dirname)
+ for t in args:
+ p = ds_util.DatastorePath(t[0], *t[1])
+ self.assertEqual(str(canonical_p), str(p))
+ self.assertEqual(canonical_p.rel_path, p.rel_path)
+
+ def test_ds_path_equivalence(self):
+ args = [
+ ('dsname', ['a/b/c/', 'x.vmdk']),
+ ('dsname', ['a/', 'b/c/', 'x.vmdk']),
+ ('dsname', ['a', 'b', 'c', 'x.vmdk']),
+ ('dsname', ['a/b/c', 'x.vmdk'])]
+
+ canonical_p = ds_util.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
+ for t in args:
+ p = ds_util.DatastorePath(t[0], *t[1])
+ self.assertEqual(str(canonical_p), str(p))
+ self.assertEqual(canonical_p.datastore, p.datastore)
+ self.assertEqual(canonical_p.rel_path, p.rel_path)
+ self.assertEqual(str(canonical_p.parent), str(p.parent))
+
+ def test_ds_path_non_equivalence(self):
+ args = [
+ # leading slash
+ ('dsname', ['/a', 'b', 'c', 'x.vmdk']),
+ ('dsname', ['/a/b/c/', 'x.vmdk']),
+ ('dsname', ['a/b/c', '/x.vmdk']),
+ # leading space
+ ('dsname', ['a/b/c/', ' x.vmdk']),
+ ('dsname', ['a/', ' b/c/', 'x.vmdk']),
+ ('dsname', [' a', 'b', 'c', 'x.vmdk']),
+ # trailing space
+ ('dsname', ['/a/b/c/', 'x.vmdk ']),
+ ('dsname', ['a/b/c/ ', 'x.vmdk'])]
+
+ canonical_p = ds_util.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
+ for t in args:
+ p = ds_util.DatastorePath(t[0], *t[1])
+ self.assertNotEqual(str(canonical_p), str(p))
+
+ def test_ds_path_hashable(self):
+ ds1 = ds_util.DatastorePath('dsname', 'path')
+ ds2 = ds_util.DatastorePath('dsname', 'path')
+
+ # If the above objects have the same hash, they will only be added to
+ # the set once
+ self.assertThat(set([ds1, ds2]), matchers.HasLength(1))
+
+ def test_equal(self):
+ a = ds_util.DatastorePath('ds_name', 'a')
+ b = ds_util.DatastorePath('ds_name', 'a')
+ self.assertEqual(a, b)
+
+ def test_join(self):
+ p = ds_util.DatastorePath('ds_name', 'a')
+ ds_path = p.join('b')
+ self.assertEqual('[ds_name] a/b', str(ds_path))
+
+ p = ds_util.DatastorePath('ds_name', 'a')
+ ds_path = p.join()
+ self.assertEqual('[ds_name] a', str(ds_path))
+
+ bad_args = [
+ [None],
+ ['', None],
+ ['a', None],
+ ['a', None, 'b']]
+ for arg in bad_args:
+ self.assertRaises(ValueError, p.join, *arg)
+
+ def test_ds_path_parse(self):
+ p = ds_util.DatastorePath.parse('[dsname]')
+ self.assertEqual('dsname', p.datastore)
+ self.assertEqual('', p.rel_path)
+
+ p = ds_util.DatastorePath.parse('[dsname] folder')
+ self.assertEqual('dsname', p.datastore)
+ self.assertEqual('folder', p.rel_path)
+
+ p = ds_util.DatastorePath.parse('[dsname] folder/file')
+ self.assertEqual('dsname', p.datastore)
+ self.assertEqual('folder/file', p.rel_path)
+
+ for p in [None, '']:
+ self.assertRaises(ValueError, ds_util.DatastorePath.parse, p)
+
+ for p in ['bad path', '/a/b/c', 'a/b/c']:
+ self.assertRaises(IndexError, ds_util.DatastorePath.parse, p)