diff options
author | Tony Asleson <tasleson@redhat.com> | 2016-02-19 15:16:05 -0600 |
---|---|---|
committer | Tony Asleson <tasleson@redhat.com> | 2016-02-19 15:16:05 -0600 |
commit | 8b6cad32199dd08d8f56d5f0cec0f5e24928211c (patch) | |
tree | 6a75642319d5c9ea6a2734672a57d1b53d5d37d6 | |
parent | eefcbd5b28aae6c0bed22a6ff8cb97141184083f (diff) | |
download | lvm2-8b6cad32199dd08d8f56d5f0cec0f5e24928211c.tar.gz |
lvmdbustest.py: resync latest changes
This file update was missed, copied latest file from:
https://github.com/tasleson/lvm-dubstep/blob/master/test/lvmdbustest.py
Signed-off-by: Tony Asleson <tasleson@redhat.com>
-rw-r--r-- | test/dbus/lvmdbustest.py | 1907 |
1 files changed, 1060 insertions, 847 deletions
diff --git a/test/dbus/lvmdbustest.py b/test/dbus/lvmdbustest.py index 7f3bcca8d..bdeb1a30a 100644 --- a/test/dbus/lvmdbustest.py +++ b/test/dbus/lvmdbustest.py @@ -1,19 +1,13 @@ #!/usr/bin/env python3 -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. +# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. +# This copyrighted material is made available to anyone wishing to use, +# modify, copy, or redistribute it subject to the terms and conditions +# of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# Copyright 2015, Tony Asleson <tasleson@redhat.com> import dbus # noinspection PyUnresolvedReferences @@ -29,7 +23,6 @@ import os import xml.etree.ElementTree as Et from collections import OrderedDict - BUSNAME = "com.redhat.lvmdbus1" MANAGER_INT = BUSNAME + '.Manager' MANAGER_OBJ = '/' + BUSNAME.replace('.', '/') + 'Manager' @@ -46,1008 +39,1228 @@ CACHE_LV_INT = BUSNAME + ".CachedLv" THINPOOL_LV_PATH = '/' + THINPOOL_INT.replace('.', '/') -def rs(length, suffix): - return ''.join(random.choice(string.ascii_lowercase) - for _ in range(length)) + suffix +def rs(length, suffix, character_set=string.ascii_lowercase): + return ''.join(random.choice(character_set) + for _ in range(length)) + suffix + bus = dbus.SystemBus(mainloop=DBusGMainLoop()) class DbusIntrospection(object): - - @staticmethod - def introspect(xml_representation): - interfaces = {} - - root = Et.fromstring(xml_representation) - - for c in root: - if c.tag == "interface": - in_f = c.attrib['name'] - interfaces[in_f] = \ - dict(methods=OrderedDict(), properties={}) - for nested in c: - if nested.tag == "method": - mn = nested.attrib['name'] - interfaces[in_f]['methods'][mn] = OrderedDict() - - for arg in nested: - if arg.tag == 'arg': - arg_dir = arg.attrib['direction'] - if arg_dir == 'in': - n = arg.attrib['name'] - else: - n = None - - arg_type = arg.attrib['type'] - - if n: - v = dict(name=mn, - a_dir=arg_dir, - a_type=arg_type) - interfaces[in_f]['methods'][mn][n] = v - - elif nested.tag == 'property': - pn = nested.attrib['name'] - p_access = nested.attrib['access'] - p_type = nested.attrib['type'] - - interfaces[in_f]['properties'][pn] = \ - dict(p_access=p_access, p_type=p_type) - else: - pass - - # print('Interfaces...') - # for k, v in list(interfaces.items()): - # print('Interface %s' % k) - # if v['methods']: - # for m, args in list(v['methods'].items()): - # print(' method: %s' % m) - # for a, aa in args.items(): - # print(' method arg: %s' % (a)) - # if v['properties']: - # for p, d in list(v['properties'].items()): - # print(' Property: %s' % (p)) - # print('End interfaces') - - return interfaces + @staticmethod + def introspect(xml_representation): + interfaces = {} + + root = Et.fromstring(xml_representation) + + for c in root: + if c.tag == "interface": + in_f = c.attrib['name'] + interfaces[in_f] = \ + dict(methods=OrderedDict(), properties={}) + for nested in c: + if nested.tag == "method": + mn = nested.attrib['name'] + interfaces[in_f]['methods'][mn] = OrderedDict() + + for arg in nested: + if arg.tag == 'arg': + arg_dir = arg.attrib['direction'] + if arg_dir == 'in': + n = arg.attrib['name'] + else: + n = None + + arg_type = arg.attrib['type'] + + if n: + v = dict( + name=mn, + a_dir=arg_dir, + a_type=arg_type) + interfaces[in_f]['methods'][mn][n] = v + + elif nested.tag == 'property': + pn = nested.attrib['name'] + p_access = nested.attrib['access'] + p_type = nested.attrib['type'] + + interfaces[in_f]['properties'][pn] = \ + dict(p_access=p_access, p_type=p_type) + else: + pass + + # print('Interfaces...') + # for k, v in list(interfaces.items()): + # print('Interface %s' % k) + # if v['methods']: + # for m, args in list(v['methods'].items()): + # print(' method: %s' % m) + # for a, aa in args.items(): + # print(' method arg: %s' % (a)) + # if v['properties']: + # for p, d in list(v['properties'].items()): + # print(' Property: %s' % (p)) + # print('End interfaces') + + return interfaces class RemoteObject(object): - - def _set_props(self, props=None): - #print 'Fetching properties' - if not props: - #prop_fetch = dbus.Interface(self.bus.get_object( - # BUSNAME, self.object_path), 'org.freedesktop.DBus.Properties') - - for i in range(0, 3): - try: - prop_fetch = dbus.Interface(self.bus.get_object( - BUSNAME, self.object_path), - 'org.freedesktop.DBus.Properties') - props = prop_fetch.GetAll(self.interface) - break - except dbus.exceptions.DBusException as dbe: - if "GetAll" not in str(dbe): - raise dbe - if props: - for kl, vl in list(props.items()): - setattr(self, kl, vl) - - def __init__(self, specified_bus, object_path, interface, properties=None): - self.object_path = object_path - self.interface = interface - self.bus = specified_bus - - self.dbus_method = dbus.Interface(specified_bus.get_object( - BUSNAME, self.object_path), self.interface) - - self._set_props(properties) - - def __getattr__(self, item): - if hasattr(self.dbus_method, item): - return functools.partial(self._wrapper, item) - else: - return functools.partial(self, item) - - def _wrapper(self, _method_name, *args, **kwargs): - return getattr(self.dbus_method, _method_name)(*args, **kwargs) - - def update(self): - self._set_props() + def _set_props(self, props=None): + # print 'Fetching properties' + if not props: + # prop_fetch = dbus.Interface(self.bus.get_object( + # BUSNAME, self.object_path), 'org.freedesktop.DBus.Properties') + + for i in range(0, 3): + try: + prop_fetch = dbus.Interface(self.bus.get_object( + BUSNAME, self.object_path), + 'org.freedesktop.DBus.Properties') + props = prop_fetch.GetAll(self.interface) + break + except dbus.exceptions.DBusException as dbe: + if "GetAll" not in str(dbe): + raise dbe + if props: + for kl, vl in list(props.items()): + setattr(self, kl, vl) + + def __init__(self, specified_bus, object_path, interface, properties=None): + self.object_path = object_path + self.interface = interface + self.bus = specified_bus + + self.dbus_method = dbus.Interface(specified_bus.get_object( + BUSNAME, self.object_path), self.interface) + + self._set_props(properties) + + def __getattr__(self, item): + if hasattr(self.dbus_method, item): + return functools.partial(self._wrapper, item) + else: + return functools.partial(self, item) + + def _wrapper(self, _method_name, *args, **kwargs): + return getattr(self.dbus_method, _method_name)(*args, **kwargs) + + def update(self): + self._set_props() class ClientProxy(object): + @staticmethod + def _intf_short_name(nm): + return nm.split('.')[-1:][0] - @staticmethod - def _intf_short_name(nm): - return nm.split('.')[-1:][0] - - def __init__(self, specified_bus, object_path, interface=None, props=None): - i = dbus.Interface(specified_bus.get_object( - BUSNAME, object_path), 'org.freedesktop.DBus.Introspectable') - self.intro_spect = DbusIntrospection.introspect(i.Introspect()) + def __init__(self, specified_bus, object_path, interface=None, props=None): + i = dbus.Interface(specified_bus.get_object( + BUSNAME, object_path), 'org.freedesktop.DBus.Introspectable') + self.intro_spect = DbusIntrospection.introspect(i.Introspect()) - for k in self.intro_spect.keys(): - sn = ClientProxy._intf_short_name(k) - #print('Client proxy has interface: %s %s' % (k, sn)) + for k in self.intro_spect.keys(): + sn = ClientProxy._intf_short_name(k) + # print('Client proxy has interface: %s %s' % (k, sn)) - if interface and interface == k and props is not None: - ro = RemoteObject(specified_bus, object_path, k, props) - else: - ro = RemoteObject(specified_bus, object_path, k) + if interface and interface == k and props is not None: + ro = RemoteObject(specified_bus, object_path, k, props) + else: + ro = RemoteObject(specified_bus, object_path, k) - setattr(self, sn, ro) + setattr(self, sn, ro) - self.object_path = object_path + self.object_path = object_path - def update(self): - # Go through all interfaces and update them - for int_f in self.intro_spect.keys(): - sn = ClientProxy._intf_short_name(int_f) - getattr(self, sn).update() + def update(self): + # Go through all interfaces and update them + for int_f in self.intro_spect.keys(): + sn = ClientProxy._intf_short_name(int_f) + getattr(self, sn).update() def get_objects(): - rc = {MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [], - THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [], - CACHE_POOL_INT: [], CACHE_LV_INT: []} + rc = {MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [], + THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [], + CACHE_POOL_INT: [], CACHE_LV_INT: []} - manager = dbus.Interface(bus.get_object( - BUSNAME, "/com/redhat/lvmdbus1"), - "org.freedesktop.DBus.ObjectManager") + manager = dbus.Interface(bus.get_object( + BUSNAME, "/com/redhat/lvmdbus1"), + "org.freedesktop.DBus.ObjectManager") - objects = manager.GetManagedObjects() + objects = manager.GetManagedObjects() - for object_path, val in list(objects.items()): - for interface, props in list(val.items()): - o = ClientProxy(bus, object_path, interface, props) - rc[interface].append(o) + for object_path, val in list(objects.items()): + for interface, props in list(val.items()): + o = ClientProxy(bus, object_path, interface, props) + rc[interface].append(o) - return rc, bus + return rc, bus def set_execution(lvmshell): - lvm_manager = dbus.Interface(bus.get_object( - BUSNAME, "/com/redhat/lvmdbus1/Manager"), - "com.redhat.lvmdbus1.Manager") - lvm_manager.UseLvmShell(lvmshell) + lvm_manager = dbus.Interface(bus.get_object( + BUSNAME, "/com/redhat/lvmdbus1/Manager"), + "com.redhat.lvmdbus1.Manager") + lvm_manager.UseLvmShell(lvmshell) # noinspection PyUnresolvedReferences class TestDbusService(unittest.TestCase): - def setUp(self): - # Because of the sensitive nature of running LVM tests we will only - # run if we have PVs and nothing else, so that we can be confident that - # we are not mucking with someones data on their system - self.objs, self.bus = get_objects() - if len(self.objs[PV_INT]) == 0: - print('No PVs present exiting!') - sys.exit(1) - if len(self.objs[MANAGER_INT]) != 1: - print('Expecting a manager object!') - sys.exit(1) - - if len(self.objs[VG_INT]) != 0: - print('Expecting no VGs to exist!') - sys.exit(1) - - self.pvs = [] - for p in self.objs[PV_INT]: - self.pvs.append(p.Pv.Name) - - def tearDown(self): - # If we get here it means we passed setUp, so lets remove anything - # and everything that remains, besides the PVs themselves - self.objs, self.bus = get_objects() - for v in self.objs[VG_INT]: - #print "DEBUG: Removing VG= ", v.Uuid, v.Name - v.Vg.Remove(-1, {}) - - # Check to make sure the PVs we had to start exist, else re-create - # them - if len(self.pvs) != len(self.objs[PV_INT]): - for p in self.pvs: - found = False - for pc in self.objs[PV_INT]: - if pc.Pv.Name == p: - found = True - break - - if not found: - # print('Re-creating PV=', p) - self._pv_create(p) - - def _pv_create(self, device): - pv_path = self.objs[MANAGER_INT][0].Manager.PvCreate(device, -1, {})[0] - self.assertTrue(pv_path is not None and len(pv_path) > 0) - return pv_path - - def _manager(self): - return self.objs[MANAGER_INT][0] - - def _refresh(self): - return self._manager().Manager.Refresh() - - def test_refresh(self): - rc = self._refresh() - self.assertEqual(rc, 0) - - def test_version(self): - rc = self.objs[MANAGER_INT][0].Manager.Version - self.assertTrue(rc is not None and len(rc) > 0) - self.assertEqual(self._refresh(), 0) - - def _vg_create(self, pv_paths=None): - - if not pv_paths: - pv_paths = [self.objs[PV_INT][0].object_path] - - vg_name = rs(8, '_vg') - - vg_path = self.objs[MANAGER_INT][0].Manager.VgCreate( - vg_name, - pv_paths, - -1, - {})[0] - self.assertTrue(vg_path is not None and len(vg_path) > 0) - return ClientProxy(self.bus, vg_path) - - def test_vg_create(self): - self._vg_create() - self.assertEqual(self._refresh(), 0) - - def test_vg_delete(self): - vg = self._vg_create().Vg - vg.Remove(-1, {}) - self.assertEqual(self._refresh(), 0) - - def _pv_remove(self, pv): - rc = pv.Pv.Remove(-1, {}) - return rc - - def test_pv_remove_add(self): - target = self.objs[PV_INT][0] - - # Remove the PV - rc = self._pv_remove(target) - self.assertTrue(rc == '/') - self.assertEqual(self._refresh(), 0) - - # Add it back - rc = self._pv_create(target.Pv.Name)[0] - self.assertTrue(rc == '/') - self.assertEqual(self._refresh(), 0) - - def _lookup(self, lvm_id): - return self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(lvm_id) - - def test_lookup_by_lvm_id(self): - # For the moment lets just lookup what we know about which is PVs - # When we start testing VGs and LVs we will test lookups for those - # during those unit tests - for p in self.objs[PV_INT]: - rc = self._lookup(p.Pv.Name) - self.assertTrue(rc is not None and rc != '/') - - # Search for something which doesn't exist - rc = self._lookup('/dev/null') - self.assertTrue(rc == '/') - - def test_vg_extend(self): - # Create a VG - self.assertTrue(len(self.objs[PV_INT]) >= 2) - - if len(self.objs[PV_INT]) >= 2: - pv_initial = self.objs[PV_INT][0] - pv_next = self.objs[PV_INT][1] - - vg = self._vg_create([pv_initial.object_path]).Vg - path = vg.Extend([pv_next.object_path], -1, {}) - self.assertTrue(path == '/') - self.assertEqual(self._refresh(), 0) - - # noinspection PyUnresolvedReferences - def test_vg_reduce(self): - self.assertTrue(len(self.objs[PV_INT]) >= 2) - - if len(self.objs[PV_INT]) >= 2: - vg = self._vg_create( - [self.objs[PV_INT][0].object_path, - self.objs[PV_INT][1].object_path]).Vg - - path = vg.Reduce(False, [vg.Pvs[0]], -1, {}) - self.assertTrue(path == '/') - self.assertEqual(self._refresh(), 0) - - # noinspection PyUnresolvedReferences - def test_vg_rename(self): - vg = self._vg_create().Vg - - # Create some LVs in the VG - for i in range(0, 5): - self._create_lv(size=1024 * 1024 * 16, vg=vg) - - path = vg.Rename('renamed_' + vg.Name, -1, {}) - self.assertTrue(path == '/') - self.assertEqual(self._refresh(), 0) - - # Go through each LV and make sure it has the correct path back to the - # VG - vg.update() - - lv_paths = vg.Lvs - self.assertTrue(len(lv_paths) == 5) - - for l in lv_paths: - lv_proxy = ClientProxy(self.bus, l).LvCommon - self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % - (lv_proxy.Vg, vg.object_path)) - - def _test_lv_create(self, method, params, vg): - lv = None - path = method(*params)[0] - - self.assertTrue(vg) - - if path: - lv = ClientProxy(self.bus, path) - # TODO verify object properties - - self.assertEqual(self._refresh(), 0) - return lv - - def test_lv_create(self): - vg = self._vg_create().Vg - self._test_lv_create(vg.LvCreate, - (rs(8, '_lv'), 1024 * 1024 * 4, - dbus.Array([], '(ott)'), -1, {}), - vg) - - def test_lv_create_linear(self): - - vg = self._vg_create().Vg - self._test_lv_create(vg.LvCreateLinear, - (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), - vg) - - def test_lv_create_striped(self): - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - self._test_lv_create(vg.LvCreateStriped, - (rs(8, '_lv'), 1024 * 1024 * 4, 2, 8, False, - -1, {}), vg) - - def test_lv_create_mirror(self): - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - self._test_lv_create(vg.LvCreateMirror, - (rs(8, '_lv'), 1024 * 1024 * 4, 2, -1, {}), vg) - - def test_lv_create_raid(self): - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - self._test_lv_create(vg.LvCreateRaid, - (rs(8, '_lv'), 'raid4', - 1024 * 1024 * 16, 2, 8, -1, {}), vg) - - def _create_lv(self, thinpool=False, size=None, vg=None): - - if not vg: - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - - if size is None: - size = 1024 * 1024 * 128 - - return self._test_lv_create( - vg.LvCreateLinear, - (rs(8, '_lv'), size, thinpool, -1, {}), vg) + def setUp(self): + # Because of the sensitive nature of running LVM tests we will only + # run if we have PVs and nothing else, so that we can be confident that + # we are not mucking with someones data on their system + self.objs, self.bus = get_objects() + if len(self.objs[PV_INT]) == 0: + print('No PVs present exiting!') + sys.exit(1) + if len(self.objs[MANAGER_INT]) != 1: + print('Expecting a manager object!') + sys.exit(1) + + if len(self.objs[VG_INT]) != 0: + print('Expecting no VGs to exist!') + sys.exit(1) + + self.pvs = [] + for p in self.objs[PV_INT]: + self.pvs.append(p.Pv.Name) + + def tearDown(self): + # If we get here it means we passed setUp, so lets remove anything + # and everything that remains, besides the PVs themselves + self.objs, self.bus = get_objects() + for v in self.objs[VG_INT]: + # print "DEBUG: Removing VG= ", v.Uuid, v.Name + v.Vg.Remove(-1, {}) + + # Check to make sure the PVs we had to start exist, else re-create + # them + if len(self.pvs) != len(self.objs[PV_INT]): + for p in self.pvs: + found = False + for pc in self.objs[PV_INT]: + if pc.Pv.Name == p: + found = True + break + + if not found: + # print('Re-creating PV=', p) + self._pv_create(p) + + def _pv_create(self, device): + pv_path = self.objs[MANAGER_INT][0].Manager.PvCreate(device, -1, {})[0] + self.assertTrue(pv_path is not None and len(pv_path) > 0) + return pv_path + + def _manager(self): + return self.objs[MANAGER_INT][0] + + def _refresh(self): + return self._manager().Manager.Refresh() + + def test_refresh(self): + rc = self._refresh() + self.assertEqual(rc, 0) + + def test_version(self): + rc = self.objs[MANAGER_INT][0].Manager.Version + self.assertTrue(rc is not None and len(rc) > 0) + self.assertEqual(self._refresh(), 0) + + def _vg_create(self, pv_paths=None): + + if not pv_paths: + pv_paths = [self.objs[PV_INT][0].object_path] + + vg_name = rs(8, '_vg') + + vg_path = self.objs[MANAGER_INT][0].Manager.VgCreate( + vg_name, + pv_paths, + -1, + {})[0] + self.assertTrue(vg_path is not None and len(vg_path) > 0) + return ClientProxy(self.bus, vg_path) + + def test_vg_create(self): + self._vg_create() + self.assertEqual(self._refresh(), 0) + + def test_vg_delete(self): + vg = self._vg_create().Vg + vg.Remove(-1, {}) + self.assertEqual(self._refresh(), 0) + + def _pv_remove(self, pv): + rc = pv.Pv.Remove(-1, {}) + return rc + + def test_pv_remove_add(self): + target = self.objs[PV_INT][0] + + # Remove the PV + rc = self._pv_remove(target) + self.assertTrue(rc == '/') + self.assertEqual(self._refresh(), 0) + + # Add it back + rc = self._pv_create(target.Pv.Name)[0] + self.assertTrue(rc == '/') + self.assertEqual(self._refresh(), 0) + + def _lookup(self, lvm_id): + return self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(lvm_id) + + def test_lookup_by_lvm_id(self): + # For the moment lets just lookup what we know about which is PVs + # When we start testing VGs and LVs we will test lookups for those + # during those unit tests + for p in self.objs[PV_INT]: + rc = self._lookup(p.Pv.Name) + self.assertTrue(rc is not None and rc != '/') + + # Search for something which doesn't exist + rc = self._lookup('/dev/null') + self.assertTrue(rc == '/') + + def test_vg_extend(self): + # Create a VG + self.assertTrue(len(self.objs[PV_INT]) >= 2) + + if len(self.objs[PV_INT]) >= 2: + pv_initial = self.objs[PV_INT][0] + pv_next = self.objs[PV_INT][1] + + vg = self._vg_create([pv_initial.object_path]).Vg + path = vg.Extend([pv_next.object_path], -1, {}) + self.assertTrue(path == '/') + self.assertEqual(self._refresh(), 0) + + # noinspection PyUnresolvedReferences + def test_vg_reduce(self): + self.assertTrue(len(self.objs[PV_INT]) >= 2) + + if len(self.objs[PV_INT]) >= 2: + vg = self._vg_create( + [self.objs[PV_INT][0].object_path, + self.objs[PV_INT][1].object_path]).Vg + + path = vg.Reduce(False, [vg.Pvs[0]], -1, {}) + self.assertTrue(path == '/') + self.assertEqual(self._refresh(), 0) + + # noinspection PyUnresolvedReferences + def test_vg_rename(self): + vg = self._vg_create().Vg + + mgr = self.objs[MANAGER_INT][0].Manager + + # Do a vg lookup + path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(vg.Name) + + vg_name_start = vg.Name + prev_path = path + self.assertTrue(path != '/', "%s" % (path)) + + # Create some LVs in the VG + for i in range(0, 5): + lv_t = self._create_lv(size=1024 * 1024 * 16, vg=vg) + full_name = "%s/%s" % (vg_name_start, lv_t.LvCommon.Name) + lv_path = mgr.LookUpByLvmId(full_name) + self.assertTrue(lv_path == lv_t.object_path) + + new_name = 'renamed_' + vg.Name + + path = vg.Rename(new_name, -1, {}) + self.assertTrue(path == '/') + self.assertEqual(self._refresh(), 0) + + # Do a vg lookup + path = mgr.LookUpByLvmId(new_name) + self.assertTrue(path != '/', "%s" % (path)) + self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path)) + + # Go through each LV and make sure it has the correct path back to the + # VG + vg.update() + + lv_paths = vg.Lvs + self.assertTrue(len(lv_paths) == 5) + + for l in lv_paths: + lv_proxy = ClientProxy(self.bus, l).LvCommon + self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % + (lv_proxy.Vg, vg.object_path)) + full_name = "%s/%s" % (new_name, lv_proxy.Name) + lv_path = mgr.LookUpByLvmId(full_name) + self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % + (lv_path, lv_proxy.object_path)) + + def _verify_hidden_lookups(self, lv_common_object, vgname): + mgr = self.objs[MANAGER_INT][0].Manager + + hidden_lv_paths = lv_common_object.HiddenLvs + + for h in hidden_lv_paths: + h_lv = ClientProxy(self.bus, h).LvCommon + + if len(h_lv.HiddenLvs) > 0: + self._verify_hidden_lookups(h_lv, vgname) + + # print("Hidden check %s %s" % (h, h_lv.Name)) + full_name = "%s/%s" % (vgname, h_lv.Name) + lookup_path = mgr.LookUpByLvmId(full_name) + self.assertTrue(lookup_path != '/') + self.assertTrue(lookup_path == h_lv.object_path) + + def test_vg_rename_with_thin_pool(self): + + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + + vg_name_start = vg.Name + + mgr = self.objs[MANAGER_INT][0].Manager + + # Let's create a thin pool which uses a raid 5 meta and raid5 data + # areas + lv_meta_path = vg.LvCreateRaid( + "meta_r5", "raid5", 1024 * 1024 * 16, 0, 0, -1, {})[0] + + lv_data_path = vg.LvCreateRaid( + "data_r5", "raid5", 1024 * 1024 * 512, 0, 0, -1, {})[0] + + thin_pool_path = vg.CreateThinPool( + lv_meta_path, lv_data_path, -1, {})[0] + + # Lets create some thin LVs + thin_pool = ClientProxy(self.bus, thin_pool_path) + + # noinspection PyTypeChecker + self._verify_hidden_lookups(thin_pool.LvCommon, vg_name_start) + + for i in range(0, 5): + lv_name = rs(8, '_lv') + + thin_lv_path = thin_pool.ThinPool.LvCreate( + lv_name, 1024 * 1024 * 16, -1, {})[0] + + self.assertTrue(thin_lv_path != '/') + + full_name = "%s/%s" % (vg_name_start, lv_name) + + lookup_lv_path = mgr.LookUpByLvmId(full_name) + self.assertTrue(thin_lv_path == lookup_lv_path, + "%s != %s" % (thin_lv_path, lookup_lv_path)) + + # Rename the VG + new_name = 'renamed_' + vg.Name + + path = vg.Rename(new_name, -1, {}) + self.assertTrue(path == '/') + self.assertEqual(self._refresh(), 0) + + # Go through each LV and make sure it has the correct path back to the + # VG + vg.update() + thin_pool.update() + + lv_paths = vg.Lvs + + for l in lv_paths: + lv_proxy = ClientProxy(self.bus, l).LvCommon + self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % + (lv_proxy.Vg, vg.object_path)) + full_name = "%s/%s" % (new_name, lv_proxy.Name) + # print('Full Name %s' % (full_name)) + lv_path = mgr.LookUpByLvmId(full_name) + self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % + (lv_path, lv_proxy.object_path)) + + # noinspection PyTypeChecker + self._verify_hidden_lookups(thin_pool.LvCommon, new_name) + + def _test_lv_create(self, method, params, vg): + lv = None + path = method(*params)[0] + + self.assertTrue(vg) + + if path: + lv = ClientProxy(self.bus, path) + # TODO verify object properties + + self.assertEqual(self._refresh(), 0) + return lv + + def test_lv_create(self): + vg = self._vg_create().Vg + self._test_lv_create( + vg.LvCreate, + (rs(8, '_lv'), 1024 * 1024 * 4, + dbus.Array([], '(ott)'), -1, {}), vg) + + def test_lv_create_linear(self): + + vg = self._vg_create().Vg + self._test_lv_create( + vg.LvCreateLinear, + (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), vg) + + def test_lv_create_striped(self): + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + self._test_lv_create( + vg.LvCreateStriped, + (rs(8, '_lv'), 1024 * 1024 * 4, 2, 8, False, + -1, {}), vg) + + def test_lv_create_mirror(self): + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + self._test_lv_create(vg.LvCreateMirror, + (rs(8, '_lv'), 1024 * 1024 * 4, 2, -1, {}), vg) + + def test_lv_create_raid(self): + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + self._test_lv_create(vg.LvCreateRaid, + (rs(8, '_lv'), 'raid4', + 1024 * 1024 * 16, 2, 8, -1, {}), vg) + + def _create_lv(self, thinpool=False, size=None, vg=None): + + if not vg: + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + + if size is None: + size = 1024 * 1024 * 128 + + return self._test_lv_create( + vg.LvCreateLinear, + (rs(8, '_lv'), size, thinpool, -1, {}), vg) + + def test_lv_create_rounding(self): + self._create_lv(size=1024 * 1024 * 2 + 13) + + def test_lv_create_thin_pool(self): + self._create_lv(True) + + def test_lv_rename(self): + # Rename a regular LV + lv = self._create_lv() + + path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(lv.LvCommon.Name) + prev_path = path + + new_name = 'renamed_' + lv.LvCommon.Name + lv.Lv.Rename(new_name, -1, {}) + + path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(new_name) + + self.assertEqual(self._refresh(), 0) + self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path)) + + def test_lv_thinpool_rename(self): + # Rename a thin pool + tp = self._create_lv(True) + self.assertTrue(THINPOOL_LV_PATH in tp.object_path, + "%s" % (tp.object_path)) + + new_name = 'renamed_' + tp.LvCommon.Name + tp.Lv.Rename(new_name, -1, {}) + tp.update() + self.assertEqual(self._refresh(), 0) + self.assertEqual(new_name, tp.LvCommon.Name) + + # noinspection PyUnresolvedReferences + def test_lv_on_thin_pool_rename(self): + # Rename a LV on a thin Pool + + # This returns a LV with the LV interface, need to get a proxy for + # thinpool interface too + tp = self._create_lv(True) + + thin_path = tp.ThinPool.LvCreate( + rs(10, '_thin_lv'), 1024 * 1024 * 10, -1, {})[0] + + lv = ClientProxy(self.bus, thin_path) + rc = lv.Lv.Rename('rename_test' + lv.LvCommon.Name, -1, {}) + self.assertTrue(rc == '/') + self.assertEqual(self._refresh(), 0) + + def test_lv_remove(self): + lv = self._create_lv().Lv + rc = lv.Remove(-1, {}) + self.assertTrue(rc == '/') + self.assertEqual(self._refresh(), 0) + + def test_lv_snapshot(self): + lv_p = self._create_lv() + ss_name = 'ss_' + lv_p.LvCommon.Name + + # Test waiting to complete + ss, job = lv_p.Lv.Snapshot(ss_name, 0, -1, {}) + self.assertTrue(ss != '/') + self.assertTrue(job == '/') + + snapshot = ClientProxy(self.bus, ss) + self.assertTrue(snapshot.LvCommon.Name == ss_name) + + self.assertEqual(self._refresh(), 0) + + # Test getting a job returned immediately + rc, job = lv_p.Lv.Snapshot('ss2_' + lv_p.LvCommon.Name, 0, 0, {}) + self.assertTrue(rc == '/') + self.assertTrue(job != '/') + self._wait_for_job(job) + + self.assertEqual(self._refresh(), 0) + + # noinspection PyUnresolvedReferences + def _wait_for_job(self, j_path): + import time + rc = None + j = ClientProxy(self.bus, j_path).Job + + while True: + j.update() + if j.Complete: + (ec, error_msg) = j.GetError + self.assertTrue(ec == 0, "%d :%s" % (ec, error_msg)) + + if ec == 0: + self.assertTrue(j.Percent == 100, "P= %f" % j.Percent) + + rc = j.Result + j.Remove() + + break + + if j.Wait(1): + j.update() + self.assertTrue(j.Complete) + + return rc + + def test_lv_create_pv_specific(self): + vg = self._vg_create().Vg + + pv = vg.Pvs + + self._test_lv_create(vg.LvCreate, + (rs(8, '_lv'), 1024 * 1024 * 4, + dbus.Array([[pv[0], 0, 100]], '(ott)'), -1, {}), vg) + + def test_lv_resize(self): - def test_lv_create_rounding(self): - self._create_lv(size=1024 * 1024 * 2 + 13) + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) - def test_lv_create_thin_pool(self): - self._create_lv(True) + vg = self._vg_create(pv_paths).Vg + lv = self._create_lv(vg=vg) - def test_lv_rename(self): - # Rename a regular LV - lv = self._create_lv() - lv.Lv.Rename('renamed_' + lv.LvCommon.Name, -1, {}) - self.assertEqual(self._refresh(), 0) + for size in [ + lv.LvCommon.SizeBytes + 4194304, + lv.LvCommon.SizeBytes - 4194304, + lv.LvCommon.SizeBytes + 2048, + lv.LvCommon.SizeBytes - 2048]: - def test_lv_thinpool_rename(self): - # Rename a thin pool - tp = self._create_lv(True) - self.assertTrue(THINPOOL_LV_PATH in tp.object_path, - "%s" % (tp.object_path)) + pv_in_use = [i[0] for i in lv.LvCommon.Devices] + # Select a PV in the VG that isn't in use + pv_empty = [p for p in vg.Pvs if p not in pv_in_use] - new_name = 'renamed_' + tp.LvCommon.Name - tp.Lv.Rename(new_name, -1, {}) - tp.update() - self.assertEqual(self._refresh(), 0) - self.assertEqual(new_name, tp.LvCommon.Name) + prev = lv.LvCommon.SizeBytes - # noinspection PyUnresolvedReferences - def test_lv_on_thin_pool_rename(self): - # Rename a LV on a thin Pool + if len(pv_empty): + rc = lv.Lv.Resize( + size, dbus.Array([[pv_empty[0], 0, 100]], '(oii)'), + -1, {}) + else: + rc = lv.Lv.Resize(size, dbus.Array([], '(oii)'), -1, {}) - # This returns a LV with the LV interface, need to get a proxy for - # thinpool interface too - tp = self._create_lv(True) + self.assertEqual(rc, '/') + self.assertEqual(self._refresh(), 0) - thin_path = tp.ThinPool.LvCreate( - rs(10, '_thin_lv'), 1024 * 1024 * 10, -1, {})[0] + lv.update() - lv = ClientProxy(self.bus, thin_path) - rc = lv.Lv.Rename('rename_test' + lv.LvCommon.Name, -1, {}) - self.assertTrue(rc == '/') - self.assertEqual(self._refresh(), 0) + if prev < size: + self.assertTrue(lv.LvCommon.SizeBytes > prev) + else: + # We are testing re-sizing to same size too... + self.assertTrue(lv.LvCommon.SizeBytes <= prev) - def test_lv_remove(self): - lv = self._create_lv().Lv - rc = lv.Remove(-1, {}) - self.assertTrue(rc == '/') - self.assertEqual(self._refresh(), 0) + def test_lv_resize_same(self): + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) - def test_lv_snapshot(self): - lv_p = self._create_lv() - ss_name = 'ss_' + lv_p.LvCommon.Name + vg = self._vg_create(pv_paths).Vg + lv = self._create_lv(vg=vg) - # Test waiting to complete - ss, job = lv_p.Lv.Snapshot(ss_name, 0, -1, {}) - self.assertTrue(ss != '/') - self.assertTrue(job == '/') + with self.assertRaises(dbus.exceptions.DBusException): + lv.Lv.Resize(lv.LvCommon.SizeBytes, dbus.Array([], '(oii)'), -1, {}) - snapshot = ClientProxy(self.bus, ss) - self.assertTrue(snapshot.LvCommon.Name == ss_name) + def test_lv_move(self): + lv = self._create_lv() - self.assertEqual(self._refresh(), 0) + pv_path_move = str(lv.LvCommon.Devices[0][0]) - # Test getting a job returned immediately - rc, job = lv_p.Lv.Snapshot('ss2_' + lv_p.LvCommon.Name, 0, 0, {}) - self.assertTrue(rc == '/') - self.assertTrue(job != '/') - self._wait_for_job(job) + # Test moving a specific LV + job = lv.Lv.Move(pv_path_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) + self._wait_for_job(job) + self.assertEqual(self._refresh(), 0) - self.assertEqual(self._refresh(), 0) + lv.update() + new_pv = str(lv.LvCommon.Devices[0][0]) + self.assertTrue(pv_path_move != new_pv, "%s == %s" % + (pv_path_move, new_pv)) - # noinspection PyUnresolvedReferences - def _wait_for_job(self, j_path): - import time - rc = None - j = ClientProxy(self.bus, j_path).Job + def test_lv_activate_deactivate(self): + lv_p = self._create_lv() + lv_p.update() - while True: - j.update() - if j.Complete: - (ec, error_msg) = j.GetError - self.assertTrue(ec == 0, "%d :%s" % (ec, error_msg)) + lv_p.Lv.Deactivate(0, -1, {}) + lv_p.update() + self.assertFalse(lv_p.LvCommon.Active) + self.assertEqual(self._refresh(), 0) - if ec == 0: - self.assertTrue(j.Percent == 100, "P= %f" % j.Percent) + lv_p.Lv.Activate(0, -1, {}) - rc = j.Result - j.Remove() + lv_p.update() + self.assertTrue(lv_p.LvCommon.Active) + self.assertEqual(self._refresh(), 0) - break + # Try control flags + for i in range(0, 5): + lv_p.Lv.Activate(1 << i, -1, {}) + self.assertTrue(lv_p.LvCommon.Active) + self.assertEqual(self._refresh(), 0) - if j.Wait(1): - j.update() - self.assertTrue(j.Complete) + def test_move(self): + lv = self._create_lv() - return rc + # Test moving without being LV specific + vg = ClientProxy(self.bus, lv.LvCommon.Vg).Vg + pv_to_move = str(lv.LvCommon.Devices[0][0]) + job = vg.Move(pv_to_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) + self._wait_for_job(job) + self.assertEqual(self._refresh(), 0) - def test_lv_create_pv_specific(self): - vg = self._vg_create().Vg + # Test Vg.Move + # TODO Test this more! + vg.update() + lv.update() - pv = vg.Pvs + location = lv.LvCommon.Devices[0][0] - self._test_lv_create(vg.LvCreate, - (rs(8, '_lv'), 1024 * 1024 * 4, - dbus.Array([[pv[0], 0, 100]], '(ott)'), -1, {}), - vg) + dst = None + for p in vg.Pvs: + if p != location: + dst = p - def test_lv_resize(self): + # Fetch the destination + pv = ClientProxy(self.bus, dst).Pv - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - lv = self._create_lv(vg=vg) - - for size in [lv.LvCommon.SizeBytes + 4194304, - lv.LvCommon.SizeBytes - 4194304, - lv.LvCommon.SizeBytes + 2048, - lv.LvCommon.SizeBytes - 2048, - lv.LvCommon.SizeBytes]: + # Test range, move it to the middle of the new destination and blocking + # blocking for it to complete + job = vg.Move( + location, (0, 0), [(dst, pv.PeCount / 2, 0), ], -1, {}) + self.assertEqual(job, '/') + self.assertEqual(self._refresh(), 0) - pv_in_use = [i[0] for i in lv.LvCommon.Devices] - # Select a PV in the VG that isn't in use - pv_empty = [p for p in vg.Pvs if p not in pv_in_use] + def test_job_handling(self): + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) - prev = lv.LvCommon.SizeBytes + vg_name = rs(8, '_vg') - if len(pv_empty): - rc = lv.Lv.Resize(size, - dbus.Array([[pv_empty[0], 0, 100]], '(oii)'), - -1, {}) - else: - rc = lv.Lv.Resize(size, dbus.Array([], '(oii)'), -1, {}) + # Test getting a job right away + vg_path, vg_job = self.objs[MANAGER_INT][0].Manager.VgCreate( + vg_name, pv_paths, + 0, {}) - self.assertEqual(rc, '/') - self.assertEqual(self._refresh(), 0) + self.assertTrue(vg_path == '/') + self.assertTrue(vg_job and len(vg_job) > 0) - lv.update() + self._wait_for_job(vg_job) - if prev < size: - self.assertTrue(lv.LvCommon.SizeBytes > prev) - else: - # We are testing re-sizing to same size too... - self.assertTrue(lv.LvCommon.SizeBytes <= prev) + def _test_expired_timer(self, num_lvs): + rc = False + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) - def test_lv_move(self): - lv = self._create_lv() + # In small configurations lvm is pretty snappy, so lets create a VG + # add a number of LVs and then remove the VG and all the contained + # LVs which appears to consistently run a little slow. - pv_path_move = str(lv.LvCommon.Devices[0][0]) + vg = self._vg_create(pv_paths).Vg - # Test moving a specific LV - job = lv.Lv.Move(pv_path_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) - self._wait_for_job(job) - self.assertEqual(self._refresh(), 0) + for i in range(0, num_lvs): + obj_path, job = vg.LvCreateLinear( + rs(8, "_lv"), 1024 * 1024 * 4, False, -1, {}) + self.assertTrue(job == '/') - lv.update() - new_pv = str(lv.LvCommon.Devices[0][0]) - self.assertTrue(pv_path_move != new_pv, "%s == %s" % - (pv_path_move, new_pv)) + # Make sure that we are honoring the timeout + start = time.time() - def test_lv_activate_deactivate(self): - lv_p = self._create_lv() - lv_p.update() + remove_job = vg.Remove(1, {}) - lv_p.Lv.Deactivate(0, -1, {}) - lv_p.update() - self.assertFalse(lv_p.LvCommon.Active) - self.assertEqual(self._refresh(), 0) + end = time.time() - lv_p.Lv.Activate(0, -1, {}) + tt_remove = float(end) - float(start) - lv_p.update() - self.assertTrue(lv_p.LvCommon.Active) - self.assertEqual(self._refresh(), 0) + self.assertTrue(tt_remove < 2.0, "remove time %s" % (str(tt_remove))) - # Try control flags - for i in range(0, 5): - lv_p.Lv.Activate(1 << i, -1, {}) - self.assertTrue(lv_p.LvCommon.Active) - self.assertEqual(self._refresh(), 0) + # Depending on how long it took we could finish either way + if remove_job != '/': + # We got a job + result = self._wait_for_job(remove_job) + self.assertTrue(result == '/') + rc = True + else: + # It completed before timer popped + pass - def test_move(self): - lv = self._create_lv() + return rc + + def test_job_handling_timer(self): + + yes = False + + # This may not pass + for i in [48, 64, 128]: + yes = self._test_expired_timer(i) + if yes: + break + print('Attempt (%d) failed, trying again...' % (i)) + + self.assertTrue(yes) + + def test_pv_tags(self): + pvs = [] + + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) + + vg = self._vg_create(pv_paths).Vg + + # Get the PVs + for p in vg.Pvs: + pvs.append(ClientProxy(self.bus, p).Pv) + + for tags_value in [['hello'], ['foo', 'bar']]: + rc = vg.PvTagsAdd(vg.Pvs, tags_value, -1, {}) + self.assertTrue(rc == '/') + + for p in pvs: + p.update() + self.assertTrue(sorted(tags_value) == p.Tags) + + vg.PvTagsDel(vg.Pvs, tags_value, -1, {}) + for p in pvs: + p.update() + self.assertTrue([] == p.Tags) + + def test_vg_tags(self): + vg = self._vg_create().Vg - # Test moving without being LV specific - vg = ClientProxy(self.bus, lv.LvCommon.Vg).Vg - pv_to_move = str(lv.LvCommon.Devices[0][0]) - job = vg.Move(pv_to_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) - self._wait_for_job(job) - self.assertEqual(self._refresh(), 0) + t = ['Testing', 'tags'] - # Test Vg.Move - # TODO Test this more! - vg.update() - lv.update() + vg.TagsAdd(t, -1, {}) + vg.update() + self.assertTrue(t == vg.Tags) + vg.TagsDel(t, -1, {}) + vg.update() + self.assertTrue([] == vg.Tags) - location = lv.LvCommon.Devices[0][0] + def test_lv_tags(self): + vg = self._vg_create().Vg + lv = self._test_lv_create( + vg.LvCreateLinear, + (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), + vg) - dst = None - for p in vg.Pvs: - if p != location: - dst = p + t = ['Testing', 'tags'] - # Fetch the destination - pv = ClientProxy(self.bus, dst).Pv + lv.Lv.TagsAdd(t, -1, {}) + lv.update() + self.assertTrue(t == lv.LvCommon.Tags) + lv.Lv.TagsDel(t, -1, {}) + lv.update() + self.assertTrue([] == lv.LvCommon.Tags) - # Test range, move it to the middle of the new destination and blocking - # blocking for it to complete - job = vg.Move(location, - (0, 0), [(dst, pv.PeCount / 2, 0), ], -1, {}) - self.assertEqual(job, '/') - self.assertEqual(self._refresh(), 0) + def test_vg_allocation_policy_set(self): + vg = self._vg_create().Vg - def test_job_handling(self): - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) + for p in ['anywhere', 'contiguous', 'cling', 'normal']: + rc = vg.AllocationPolicySet(p, -1, {}) + self.assertEqual(rc, '/') + vg.update() - vg_name = rs(8, '_vg') + prop = getattr(vg, 'Alloc' + p.title()) + self.assertTrue(prop) - # Test getting a job right away - vg_path, vg_job = self.objs[MANAGER_INT][0].Manager.VgCreate( - vg_name, pv_paths, - 0, {}) + def test_vg_max_pv(self): + vg = self._vg_create().Vg - self.assertTrue(vg_path == '/') - self.assertTrue(vg_job and len(vg_job) > 0) + # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 + # TODO: Add a test back for larger values here when bug is resolved + for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]: + rc = vg.MaxPvSet(p, -1, {}) + self.assertEqual(rc, '/') + vg.update() + self.assertTrue(vg.MaxPv == p, "Expected %s != Actual %s" % + (str(p), str(vg.MaxPv))) - self._wait_for_job(vg_job) + def test_vg_max_lv(self): + vg = self._vg_create().Vg - def _test_expired_timer(self, num_lvs): - rc = False - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) + # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 + # TODO: Add a test back for larger values here when bug is resolved + for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]: + rc = vg.MaxLvSet(p, -1, {}) + self.assertEqual(rc, '/') + vg.update() + self.assertTrue(vg.MaxLv == p, "Expected %s != Actual %s" % + (str(p), str(vg.MaxLv))) - # In small configurations lvm is pretty snappy, so lets create a VG - # add a number of LVs and then remove the VG and all the contained - # LVs which appears to consistently run a little slow. + def test_vg_uuid_gen(self): + # TODO renable test case when + # https://bugzilla.redhat.com/show_bug.cgi?id=1264169 gets fixed + # This was tested with lvmetad disabled and we passed + print("\nSkipping Vg.UuidGenerate until BZ: 1264169 resolved\n") - vg = self._vg_create(pv_paths).Vg + if False: + vg = self._vg_create().Vg + prev_uuid = vg.Uuid + rc = vg.UuidGenerate(-1, {}) + self.assertEqual(rc, '/') + vg.update() + self.assertTrue(vg.Uuid != prev_uuid, "Expected %s != Actual %s" % + (vg.Uuid, prev_uuid)) - for i in range(0, num_lvs): - obj_path, job = vg.LvCreateLinear(rs(8, "_lv"), - 1024 * 1024 * 4, False, -1, {}) - self.assertTrue(job == '/') + def test_vg_activate_deactivate(self): + vg = self._vg_create().Vg + self._test_lv_create( + vg.LvCreateLinear, + (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), + vg) - # Make sure that we are honoring the timeout - start = time.time() + vg.update() - remove_job = vg.Remove(1, {}) + vg.Deactivate(0, -1, {}) + self.assertEqual(self._refresh(), 0) - end = time.time() + vg.Activate(0, -1, {}) + self.assertEqual(self._refresh(), 0) - tt_remove = float(end) - float(start) + # Try control flags + for i in range(0, 5): + vg.Activate(1 << i, -1, {}) - self.assertTrue(tt_remove < 2.0, "remove time %s" % (str(tt_remove))) + def test_pv_resize(self): - # Depending on how long it took we could finish either way - if remove_job != '/': - # We got a job - result = self._wait_for_job(remove_job) - self.assertTrue(result == '/') - rc = True - else: - # It completed before timer popped - pass + self.assertTrue(len(self.objs[PV_INT]) > 0) + + if len(self.objs[PV_INT]) > 0: + pv = ClientProxy(self.bus, self.objs[PV_INT][0].object_path).Pv + + original_size = pv.SizeBytes + + new_size = original_size / 2 - return rc - - def test_job_handling_timer(self): - - yes = False - - # This may not pass - for i in [48, 64, 128]: - yes = self._test_expired_timer(i) - if yes: - break - print('Attempt (%d) failed, trying again...' % (i)) - - self.assertTrue(yes) - - def test_pv_tags(self): - pvs = [] - - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) - - vg = self._vg_create(pv_paths).Vg - - # Get the PVs - for p in vg.Pvs: - pvs.append(ClientProxy(self.bus, p).Pv) - - for tags_value in [['hello'], ['foo', 'bar']]: - rc = vg.PvTagsAdd(vg.Pvs, tags_value, -1, {}) - self.assertTrue(rc == '/') - - for p in pvs: - p.update() - self.assertTrue(sorted(tags_value) == p.Tags) - - vg.PvTagsDel(vg.Pvs, tags_value, -1, {}) - for p in pvs: - p.update() - self.assertTrue([] == p.Tags) - - def test_vg_tags(self): - vg = self._vg_create().Vg + pv.ReSize(new_size, -1, {}) + self.assertEqual(self._refresh(), 0) + pv.update() - t = ['Testing', 'tags'] + self.assertTrue(pv.SizeBytes != original_size) + pv.ReSize(0, -1, {}) + self.assertEqual(self._refresh(), 0) + pv.update() + self.assertTrue(pv.SizeBytes == original_size) - vg.TagsAdd(t, -1, {}) - vg.update() - self.assertTrue(t == vg.Tags) - vg.TagsDel(t, -1, {}) - vg.update() - self.assertTrue([] == vg.Tags) + def test_pv_allocation(self): - def test_lv_tags(self): - vg = self._vg_create().Vg - lv = self._test_lv_create( - vg.LvCreateLinear, - (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), - vg) + pv_paths = [] + for pp in self.objs[PV_INT]: + pv_paths.append(pp.object_path) - t = ['Testing', 'tags'] + vg = self._vg_create(pv_paths).Vg - lv.Lv.TagsAdd(t, -1, {}) - lv.update() - self.assertTrue(t == lv.LvCommon.Tags) - lv.Lv.TagsDel(t, -1, {}) - lv.update() - self.assertTrue([] == lv.LvCommon.Tags) + pv = ClientProxy(self.bus, vg.Pvs[0]).Pv - def test_vg_allocation_policy_set(self): - vg = self._vg_create().Vg + pv.AllocationEnabled(False, -1, {}) + pv.update() + self.assertFalse(pv.Allocatable) - for p in ['anywhere', 'contiguous', 'cling', 'normal']: - rc = vg.AllocationPolicySet(p, -1, {}) - self.assertEqual(rc, '/') - vg.update() + pv.AllocationEnabled(True, -1, {}) + pv.update() + self.assertTrue(pv.Allocatable) - prop = getattr(vg, 'Alloc' + p.title()) - self.assertTrue(prop) + self.assertEqual(self._refresh(), 0) - def test_vg_max_pv(self): - vg = self._vg_create().Vg + def _get_devices(self): + context = pyudev.Context() + return context.list_devices(subsystem='block', MAJOR='8') - # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 - # TODO: Add a test back for larger values here when bug is resolved - for p in [0, 1, 10, 100, 100, 1024, 2**32 - 1]: - rc = vg.MaxPvSet(p, -1, {}) - self.assertEqual(rc, '/') - vg.update() - self.assertTrue(vg.MaxPv == p, "Expected %s != Actual %s" % - (str(p), str(vg.MaxPv))) + def test_pv_scan(self): + devices = self._get_devices() - def test_vg_max_lv(self): - vg = self._vg_create().Vg + mgr = self._manager().Manager - # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 - # TODO: Add a test back for larger values here when bug is resolved - for p in [0, 1, 10, 100, 100, 1024, 2**32 - 1]: - rc = vg.MaxLvSet(p, -1, {}) - self.assertEqual(rc, '/') - vg.update() - self.assertTrue(vg.MaxLv == p, "Expected %s != Actual %s" % - (str(p), str(vg.MaxLv))) + self.assertEqual(mgr.PvScan(False, True, + dbus.Array([], 's'), + dbus.Array([], '(ii)'), -1, {}), '/') + self.assertEqual(self._refresh(), 0) + self.assertEqual(mgr.PvScan(False, False, + dbus.Array([], 's'), + dbus.Array([], '(ii)'), -1, {}), '/') + self.assertEqual(self._refresh(), 0) - def test_vg_uuid_gen(self): - # TODO renable test case when - # https://bugzilla.redhat.com/show_bug.cgi?id=1264169 gets fixed - # This was tested with lvmetad disabled and we passed - print("\nSkipping Vg.UuidGenerate until BZ: 1264169 resolved\n") + block_path = [] + for d in devices: + block_path.append(d['DEVNAME']) - if False: - vg = self._vg_create().Vg - prev_uuid = vg.Uuid - rc = vg.UuidGenerate(-1, {}) - self.assertEqual(rc, '/') - vg.update() - self.assertTrue(vg.Uuid != prev_uuid, "Expected %s != Actual %s" % - (vg.Uuid, prev_uuid)) + self.assertEqual(mgr.PvScan(False, True, + block_path, + dbus.Array([], '(ii)'), -1, {}), '/') - def test_vg_activate_deactivate(self): - vg = self._vg_create().Vg - self._test_lv_create( - vg.LvCreateLinear, - (rs(8, '_lv'), 1024 * 1024 * 4, False, -1, {}), - vg) + self.assertEqual(self._refresh(), 0) - vg.update() + mm = [] + for d in devices: + mm.append((int(d['MAJOR']), int(d['MINOR']))) - vg.Deactivate(0, -1, {}) - self.assertEqual(self._refresh(), 0) + self.assertEqual(mgr.PvScan(False, True, + block_path, + mm, -1, {}), '/') - vg.Activate(0, -1, {}) - self.assertEqual(self._refresh(), 0) + self.assertEqual(self._refresh(), 0) - # Try control flags - for i in range(0, 5): - vg.Activate(1 << i, -1, {}) + self.assertEqual(mgr.PvScan(False, True, + dbus.Array([], 's'), + mm, -1, {}), '/') - def test_pv_resize(self): + self.assertEqual(self._refresh(), 0) - self.assertTrue(len(self.objs[PV_INT]) > 0) - - if len(self.objs[PV_INT]) > 0: - pv = ClientProxy(self.bus, self.objs[PV_INT][0].object_path).Pv - - original_size = pv.SizeBytes - - new_size = original_size / 2 + @staticmethod + def _write_some_data(device_path, size): + blocks = int(size / 512) + block = bytearray(512) + for i in range(0, 512): + block[i] = i % 255 - pv.ReSize(new_size, -1, {}) - self.assertEqual(self._refresh(), 0) - pv.update() + with open(device_path, mode='wb') as lv: + for i in range(0, blocks): + lv.write(block) - self.assertTrue(pv.SizeBytes != original_size) - pv.ReSize(0, -1, {}) - self.assertEqual(self._refresh(), 0) - pv.update() - self.assertTrue(pv.SizeBytes == original_size) + def test_snapshot_merge(self): + # Create a non-thin LV and merge it + ss_size = 1024 * 1024 * 512 - def test_pv_allocation(self): + lv_p = self._create_lv(size=1024 * 1024 * 1024) + ss_name = lv_p.LvCommon.Name + '_snap' + snapshot_path = lv_p.Lv.Snapshot(ss_name, ss_size, -1, {})[0] + ss = ClientProxy(self.bus, snapshot_path) - pv_paths = [] - for pp in self.objs[PV_INT]: - pv_paths.append(pp.object_path) + # Write some data to snapshot so merge takes some time + TestDbusService._write_some_data(ss.LvCommon.Path, ss_size / 2) - vg = self._vg_create(pv_paths).Vg + job_path = ss.Snapshot.Merge(0, {}) - pv = ClientProxy(self.bus, vg.Pvs[0]).Pv + self.assertTrue(job_path != '/') + self._wait_for_job(job_path) - pv.AllocationEnabled(False, -1, {}) - pv.update() - self.assertFalse(pv.Allocatable) + def test_snapshot_merge_thin(self): + # Create a thin LV, snapshot it and merge it + tp = self._create_lv(True) - pv.AllocationEnabled(True, -1, {}) - pv.update() - self.assertTrue(pv.Allocatable) + thin_path = tp.ThinPool.LvCreate( + rs(10, '_thin_lv'), 1024 * 1024 * 10, -1, {})[0] - self.assertEqual(self._refresh(), 0) + lv_p = ClientProxy(self.bus, thin_path) - def _get_devices(self): - context = pyudev.Context() - return context.list_devices(subsystem='block', MAJOR='8') + ss_name = lv_p.LvCommon.Name + '_snap' + snapshot_path = lv_p.Lv.Snapshot(ss_name, 0, -1, {})[0] + ss = ClientProxy(self.bus, snapshot_path) + job_path = ss.Snapshot.Merge(0, {}) + self.assertTrue(job_path != '/') + self._wait_for_job(job_path) - def test_pv_scan(self): - devices = self._get_devices() + def _create_cache_pool(self): + vg = self._vg_create().Vg - mgr = self._manager().Manager + md = self._create_lv(size=(1024 * 1024 * 8), vg=vg) + data = self._create_lv(size=(1024 * 1024 * 256), vg=vg) - self.assertEqual(mgr.PvScan(False, True, - dbus.Array([], 's'), - dbus.Array([], '(ii)'), -1, {}), '/') - self.assertEqual(self._refresh(), 0) - self.assertEqual(mgr.PvScan(False, False, - dbus.Array([], 's'), - dbus.Array([], '(ii)'), -1, {}), '/') - self.assertEqual(self._refresh(), 0) + cache_pool_path = vg.CreateCachePool( + md.object_path, data.object_path, -1, {})[0] - block_path = [] - for d in devices: - block_path.append(d['DEVNAME']) + cp = ClientProxy(self.bus, cache_pool_path) - self.assertEqual(mgr.PvScan(False, True, - block_path, - dbus.Array([], '(ii)'), -1, {}), '/') + return (vg, cp) - self.assertEqual(self._refresh(), 0) + def test_cache_pool_create(self): - mm = [] - for d in devices: - mm.append((int(d['MAJOR']), int(d['MINOR']))) + vg, cache_pool = self._create_cache_pool() - self.assertEqual(mgr.PvScan(False, True, - block_path, - mm, -1, {}), '/') + self.assertTrue('/com/redhat/lvmdbus1/CachePool' in + cache_pool.object_path) - self.assertEqual(self._refresh(), 0) + def test_cache_lv_create(self): - self.assertEqual(mgr.PvScan(False, True, - dbus.Array([], 's'), - mm, -1, {}), '/') + for destroy_cache in [True, False]: + vg, cache_pool = self._create_cache_pool() - self.assertEqual(self._refresh(), 0) + lv_to_cache = self._create_lv(size=(1024 * 1024 * 1024), vg=vg) - @staticmethod - def _write_some_data(device_path, size): - blocks = int(size / 512) - block = bytearray(512) - for i in range(0, 512): - block[i] = i % 255 + c_lv_path = cache_pool.CachePool.CacheLv( + lv_to_cache.object_path, -1, {})[0] - with open(device_path, mode='wb') as lv: - for i in range(0, blocks): - lv.write(block) + cached_lv = ClientProxy(self.bus, c_lv_path) - def test_snapshot_merge(self): - # Create a non-thin LV and merge it - ss_size = 1024 * 1024 * 512 + uncached_lv_path = \ + cached_lv.CachedLv.DetachCachePool(destroy_cache, -1, {})[0] - lv_p = self._create_lv(size=1024 * 1024 * 1024) - ss_name = lv_p.LvCommon.Name + '_snap' - snapshot_path = lv_p.Lv.Snapshot(ss_name, ss_size, -1, {})[0] - ss = ClientProxy(self.bus, snapshot_path) + self.assertTrue('/com/redhat/lvmdbus1/Lv' in + uncached_lv_path) - # Write some data to snapshot so merge takes some time - TestDbusService._write_some_data(ss.LvCommon.Path, ss_size / 2) + vg.Remove(-1, {}) - job_path = ss.Snapshot.Merge(0, {}) + def test_vg_change(self): + vg_proxy = self._vg_create() + result = vg_proxy.Vg.Change(-1, {'-a': 'ay'}) + self.assertTrue(result == '/') + result = vg_proxy.Vg.Change(-1, {'-a': 'n'}) + self.assertTrue(result == '/') - self.assertTrue(job_path != '/') - self._wait_for_job(job_path) + def _invalid_vg_lv_name_characters(self): + bad_vg_lv_set = set(string.printable) - \ + set(string.ascii_letters + string.digits + '.-_+') + return ''.join(bad_vg_lv_set) - def test_snapshot_merge_thin(self): - # Create a thin LV, snapshot it and merge it - tp = self._create_lv(True) + def test_invalid_names(self): + mgr = self.objs[MANAGER_INT][0].Manager - thin_path = tp.ThinPool.LvCreate( - rs(10, '_thin_lv'), 1024 * 1024 * 10, -1, {})[0] + # Pv device path + with self.assertRaises(dbus.exceptions.DBusException): + mgr.PvCreate("/dev/space in name", -1, {}) - lv_p = ClientProxy(self.bus, thin_path) + # VG Name testing... + # Go through all bad characters + pv_paths = [self.objs[PV_INT][0].object_path] + bad_chars = self._invalid_vg_lv_name_characters() + for c in bad_chars: + with self.assertRaises(dbus.exceptions.DBusException): + mgr.VgCreate("name%s" % (c), pv_paths, -1, {}) - ss_name = lv_p.LvCommon.Name + '_snap' - snapshot_path = lv_p.Lv.Snapshot(ss_name, 0, -1, {})[0] - ss = ClientProxy(self.bus, snapshot_path) - job_path = ss.Snapshot.Merge(0, {}) - self.assertTrue(job_path != '/') - self._wait_for_job(job_path) + # Bad names + for bad in [".", ".."]: + with self.assertRaises(dbus.exceptions.DBusException): + mgr.VgCreate(bad, pv_paths, -1, {}) - def _create_cache_pool(self): - vg = self._vg_create().Vg + # Exceed name length + for i in [128, 1024, 4096]: + with self.assertRaises(dbus.exceptions.DBusException): + mgr.VgCreate('T' * i, pv_paths, -1, {}) - md = self._create_lv(size=(1024 * 1024 * 8), vg=vg) - data = self._create_lv(size=(1024 * 1024 * 256), vg=vg) + # Create a VG and try to create LVs with different bad names + vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] + vg_proxy = ClientProxy(self.bus, vg_path) - cache_pool_path = vg.CreateCachePool( - md.object_path, data.object_path, -1, {})[0] + for c in bad_chars: + with self.assertRaises(dbus.exceptions.DBusException): + vg_proxy.Vg.LvCreateLinear(rs(8, '_lv') + c, + 1024 * 1024 * 4, False, -1, {}) - cp = ClientProxy(self.bus, cache_pool_path) + for r in ("_cdata", "_cmeta", "_corig", "_mimage", "_mlog", + "_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta", "_vorigin"): + with self.assertRaises(dbus.exceptions.DBusException): + vg_proxy.Vg.LvCreateLinear(rs(8, '_lv') + r, + 1024 * 1024 * 4, False, -1, {}) - return (vg, cp) + for r in ("snapshot", "pvmove"): + with self.assertRaises(dbus.exceptions.DBusException): + vg_proxy.Vg.LvCreateLinear(r + rs(8, '_lv'), + 1024 * 1024 * 4, False, -1, {}) - def test_cache_pool_create(self): + _ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#" - vg, cache_pool = self._create_cache_pool() + def _invalid_tag_characters(self): + bad_tag_ch_set = set(string.printable) - set(self._ALLOWABLE_TAG_CH) + return ''.join(bad_tag_ch_set) - self.assertTrue('/com/redhat/lvmdbus1/CachePool' in - cache_pool.object_path) + def test_invalid_tags(self): + mgr = self.objs[MANAGER_INT][0].Manager + pv_paths = [self.objs[PV_INT][0].object_path] - def test_cache_lv_create(self): + vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] + vg_proxy = ClientProxy(self.bus, vg_path) - for destroy_cache in [True, False]: - vg, cache_pool = self._create_cache_pool() + for c in self._invalid_tag_characters(): + with self.assertRaises(dbus.exceptions.DBusException): + vg_proxy.Vg.TagsAdd([c], -1, {}) - lv_to_cache = self._create_lv(size=(1024 * 1024 * 1024), vg=vg) + for c in self._invalid_tag_characters(): + with self.assertRaises(dbus.exceptions.DBusException): + vg_proxy.Vg.TagsAdd(["a%sb" % (c)], -1, {}) - c_lv_path = cache_pool.CachePool.CacheLv( - lv_to_cache.object_path, -1, {})[0] + def test_tag_names(self): + mgr = self.objs[MANAGER_INT][0].Manager + pv_paths = [self.objs[PV_INT][0].object_path] - cached_lv = ClientProxy(self.bus, c_lv_path) + vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] + vg_proxy = ClientProxy(self.bus, vg_path) - uncached_lv_path = \ - cached_lv.CachedLv.DetachCachePool(destroy_cache, -1, {})[0] + for i in range(1, 64): + tag = rs(i, "", self._ALLOWABLE_TAG_CH) + r = vg_proxy.Vg.TagsAdd([tag], -1, {}) + self.assertTrue(r == '/') + vg_proxy.update() - self.assertTrue('/com/redhat/lvmdbus1/Lv' in - uncached_lv_path) + self.assertTrue(tag in vg_proxy.Vg.Tags, "%s not in %s" % + (tag, str(vg_proxy.Vg.Tags))) - vg.Remove(-1, {}) + self.assertEqual(i, len(vg_proxy.Vg.Tags), "%d != %d" % + (i, len(vg_proxy.Vg.Tags))) - def test_vg_change(self): - vg_proxy = self._vg_create() - result = vg_proxy.Vg.Change(-1, {'-a': 'ay'}) - self.assertTrue(result == '/') - result = vg_proxy.Vg.Change(-1, {'-a': 'n'}) - self.assertTrue(result == '/') if __name__ == '__main__': - # Test forking & exec new each time - test_shell = os.getenv('LVM_DBUS_TEST_SHELL', 0) - - set_execution(False) - - if test_shell == 0: - unittest.main(exit=True) - else: - unittest.main(exit=False) - # Test lvm shell - print('\n *** Testing lvm shell *** \n') - set_execution(True) - unittest.main() + # Test forking & exec new each time + test_shell = os.getenv('LVM_DBUS_TEST_SHELL', 0) + + set_execution(False) + + if test_shell == 0: + unittest.main(exit=True) + else: + unittest.main(exit=False) + # Test lvm shell + print('\n *** Testing lvm shell *** \n') + set_execution(True) + unittest.main() |