diff options
author | Yuri Weinstein <yweinste@redhat.com> | 2021-07-27 08:19:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-27 08:19:15 -0700 |
commit | 6c3babe75efe98a54802997a7bb1328fc9d01d70 (patch) | |
tree | dc518ae2cd683855c2c156bfc16177627773a8a3 | |
parent | b0fcbe73d60b19615c0eda47046e98ce465ae131 (diff) | |
parent | d3e487b8a0e9f7d9cb25e4bccc00889cb608985f (diff) | |
download | ceph-6c3babe75efe98a54802997a7bb1328fc9d01d70.tar.gz |
Merge pull request #42377 from ifed01/wip-ifed-migrate-oct
octopus: ceph-volume: implement bluefs volume migration.
Reviewed-by: Kefu Chai <kchai@redhat.com>
Reviewed-by: Dimitri Savineau <dsavinea@redhat.com>
-rw-r--r-- | doc/man/8/ceph-volume.rst | 67 | ||||
-rw-r--r-- | src/ceph-volume/ceph_volume/api/lvm.py | 12 | ||||
-rw-r--r-- | src/ceph-volume/ceph_volume/devices/lvm/main.py | 4 | ||||
-rw-r--r-- | src/ceph-volume/ceph_volume/devices/lvm/migrate.py | 674 | ||||
-rw-r--r-- | src/ceph-volume/ceph_volume/tests/devices/lvm/test_migrate.py | 1504 | ||||
-rw-r--r-- | src/os/bluestore/bluestore_tool.cc | 38 |
6 files changed, 2285 insertions, 14 deletions
diff --git a/doc/man/8/ceph-volume.rst b/doc/man/8/ceph-volume.rst index b3c70a5567b..9bd51744eaf 100644 --- a/doc/man/8/ceph-volume.rst +++ b/doc/man/8/ceph-volume.rst @@ -15,7 +15,7 @@ Synopsis | **ceph-volume** **inventory** | **ceph-volume** **lvm** [ *trigger* | *create* | *activate* | *prepare* -| *zap* | *list* | *batch*] +| *zap* | *list* | *batch* | *new-wal* | *new-db* | *migrate* ] | **ceph-volume** **simple** [ *trigger* | *scan* | *activate* ] @@ -241,6 +241,71 @@ Positional arguments: ``/path/to/sda1`` or ``/path/to/sda`` for regular devices. +**new-wal** +Attaches the given logical volume to OSD as a WAL. Logical volume +name format is vg/lv. Fails if OSD has already got attached WAL. + +Usage:: + + ceph-volume lvm new-wal --osd-id OSD_ID --osd-fsid OSD_FSID --target TARGET_LV + +Optional arguments: + +* [-h, --help] show the help message and exit + +Required arguments: + +* --osd-id OSD_ID OSD id to attach new WAL to +* --osd-fsid OSD_FSID OSD fsid to attach new WAL to +* --target TARGET_LV logical volume name to attach as WAL + + +**new-db** +Attaches the given logical volume to OSD as a DB. Logical volume +name format is vg/lv. Fails if OSD has already got attached DB. + +Usage:: + + ceph-volume lvm new-db --osd-id OSD_ID --osd-fsid OSD_FSID --target <target lv> + +Optional arguments: + +* [-h, --help] show the help message and exit + +Required arguments: + +* --osd-id OSD_ID OSD id to attach new DB to +* --osd-fsid OSD_FSID OSD fsid to attach new DB to +* --target TARGET_LV logical volume name to attach as DB + +**migrate** + +Moves BlueFS data from source volume(s) to the target one, source volumes +(except the main, i.e. data or block one) are removed on success. LVM volumes +are permitted for Target only, both already attached or new one. In the latter +case it is attached to the OSD replacing one of the source devices. Following +replacement rules apply (in the order of precedence, stop on the first match): + + - if source list has DB volume - target device replaces it. + - if source list has WAL volume - target device replace it. + - if source list has slow volume only - operation is not permitted, + requires explicit allocation via new-db/new-wal command. + +Usage:: + + ceph-volume lvm migrate --osd-id OSD_ID --osd-fsid OSD_FSID --target TARGET_LV --from {data|db|wal} [{data|db|wal} ...] + +Optional arguments: + +* [-h, --help] show the help message and exit + +Required arguments: + +* --osd-id OSD_ID OSD id to perform migration at +* --osd-fsid OSD_FSID OSD fsid to perform migration at +* --target TARGET_LV logical volume to move data to +* --from TYPE_LIST list of source device type names, e.g. --from db wal + simple ------ diff --git a/src/ceph-volume/ceph_volume/api/lvm.py b/src/ceph-volume/ceph_volume/api/lvm.py index 30362f1bd27..e4b932b80e2 100644 --- a/src/ceph-volume/ceph_volume/api/lvm.py +++ b/src/ceph-volume/ceph_volume/api/lvm.py @@ -1134,3 +1134,15 @@ def get_device_lvs(device, name_prefix=''): lvs = _output_parser(stdout, LV_FIELDS) return [Volume(**lv) for lv in lvs if lv['lv_name'] and lv['lv_name'].startswith(name_prefix)] + +def get_lv_by_fullname(full_name): + """ + returns LV by the specified LV's full name (formatted as vg_name/lv_name) + """ + try: + vg_name, lv_name = full_name.split('/') + res_lv = get_first_lv(filters={'lv_name': lv_name, + 'vg_name': vg_name}) + except ValueError: + res_lv = None + return res_lv diff --git a/src/ceph-volume/ceph_volume/devices/lvm/main.py b/src/ceph-volume/ceph_volume/devices/lvm/main.py index 3ef3c1117d2..39947454d57 100644 --- a/src/ceph-volume/ceph_volume/devices/lvm/main.py +++ b/src/ceph-volume/ceph_volume/devices/lvm/main.py @@ -9,6 +9,7 @@ from . import trigger from . import listing from . import zap from . import batch +from . import migrate class LVM(object): @@ -30,6 +31,9 @@ class LVM(object): 'trigger': trigger.Trigger, 'list': listing.List, 'zap': zap.Zap, + 'migrate': migrate.Migrate, + 'new-wal': migrate.NewWAL, + 'new-db': migrate.NewDB, } def __init__(self, argv): diff --git a/src/ceph-volume/ceph_volume/devices/lvm/migrate.py b/src/ceph-volume/ceph_volume/devices/lvm/migrate.py new file mode 100644 index 00000000000..3410dd50879 --- /dev/null +++ b/src/ceph-volume/ceph_volume/devices/lvm/migrate.py @@ -0,0 +1,674 @@ +from __future__ import print_function +import argparse +import logging +import os +from textwrap import dedent +from ceph_volume.util import system, disk, merge_dict +from ceph_volume.util.device import Device +from ceph_volume import decorators, terminal, process +from ceph_volume.api import lvm as api +from ceph_volume.systemd import systemctl + + +logger = logging.getLogger(__name__) +mlogger = terminal.MultiLogger(__name__) + +def get_cluster_name(osd_id, osd_fsid): + """ + From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the + system that match those tag values, then return cluster_name for the first + one. + """ + lv_tags = {} + lv_tags['ceph.osd_id'] = osd_id + lv_tags['ceph.osd_fsid'] = osd_fsid + + lvs = api.get_lvs(tags=lv_tags) + if not lvs: + mlogger.error( + 'Unable to find any LV for source OSD: id:{} fsid:{}'.format( + osd_id, osd_fsid) ) + raise SystemExit('Unexpected error, terminating') + return next(iter(lvs)).tags["ceph.cluster_name"] + +def get_osd_path(osd_id, osd_fsid): + return '/var/lib/ceph/osd/{}-{}'.format( + get_cluster_name(osd_id, osd_fsid), osd_id) + +def find_associated_devices(osd_id, osd_fsid): + """ + From an ``osd_id`` and/or an ``osd_fsid``, filter out all the LVs in the + system that match those tag values, further detect if any partitions are + part of the OSD, and then return the set of LVs and partitions (if any). + """ + lv_tags = {} + lv_tags['ceph.osd_id'] = osd_id + lv_tags['ceph.osd_fsid'] = osd_fsid + + lvs = api.get_lvs(tags=lv_tags) + if not lvs: + mlogger.error( + 'Unable to find any LV for source OSD: id:{} fsid:{}'.format( + osd_id, osd_fsid) ) + raise SystemExit('Unexpected error, terminating') + + devices = set(ensure_associated_lvs(lvs, lv_tags)) + return [(Device(path), type) for path, type in devices if path] + +def ensure_associated_lvs(lvs, lv_tags): + """ + Go through each LV and ensure if backing devices (journal, wal, block) + are LVs or partitions, so that they can be accurately reported. + """ + # look for many LVs for each backing type, because it is possible to + # receive a filtering for osd.1, and have multiple failed deployments + # leaving many journals with osd.1 - usually, only a single LV will be + # returned + + block_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'block'})) + db_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'db'})) + wal_lvs = api.get_lvs(tags=merge_dict(lv_tags, {'ceph.type': 'wal'})) + backing_devices = [(block_lvs, 'block'), (db_lvs, 'db'), + (wal_lvs, 'wal')] + + verified_devices = [] + + for lv in lvs: + # go through each lv and append it, otherwise query `blkid` to find + # a physical device. Do this for each type (journal,db,wal) regardless + # if they have been processed in the previous LV, so that bad devices + # with the same ID can be caught + for ceph_lvs, type in backing_devices: + + if ceph_lvs: + verified_devices.extend([(l.lv_path, type) for l in ceph_lvs]) + continue + + # must be a disk partition, by querying blkid by the uuid we are + # ensuring that the device path is always correct + try: + device_uuid = lv.tags['ceph.{}_uuid'.format(type)] + except KeyError: + # Bluestore will not have ceph.journal_uuid, and Filestore + # will not not have ceph.db_uuid + continue + + osd_device = disk.get_device_from_partuuid(device_uuid) + if not osd_device: + # if the osd_device is not found by the partuuid, then it is + # not possible to ensure this device exists anymore, so skip it + continue + verified_devices.append((osd_device, type)) + + return verified_devices + +class VolumeTagTracker(object): + def __init__(self, devices, target_lv): + self.target_lv = target_lv + self.data_device = self.db_device = self.wal_device = None + for device, type in devices: + if type == 'block': + self.data_device = device + elif type == 'db': + self.db_device = device + elif type == 'wal': + self.wal_device = device + if not self.data_device: + mlogger.error('Data device not found') + raise SystemExit( + "Unexpected error, terminating") + if not self.data_device.is_lv: + mlogger.error('Data device isn\'t LVM') + raise SystemExit( + "Unexpected error, terminating") + + self.old_target_tags = self.target_lv.tags.copy() + self.old_data_tags = ( + self.data_device.lv_api.tags.copy() + if self.data_device.is_lv else None) + self.old_db_tags = ( + self.db_device.lv_api.tags.copy() + if self.db_device and self.db_device.is_lv else None) + self.old_wal_tags = ( + self.wal_device.lv_api.tags.copy() + if self.wal_device and self.wal_device.is_lv else None) + + def update_tags_when_lv_create(self, create_type): + tags = {} + if not self.data_device.is_lv: + mlogger.warning( + 'Data device is not LVM, wouldn\'t update LVM tags') + else: + tags["ceph.{}_uuid".format(create_type)] = self.target_lv.lv_uuid + tags["ceph.{}_device".format(create_type)] = self.target_lv.lv_path + self.data_device.lv_api.set_tags(tags) + + tags = self.data_device.lv_api.tags.copy() + tags["ceph.type"] = create_type + self.target_lv.set_tags(tags) + + aux_dev = None + if create_type == "db" and self.wal_device: + aux_dev = self.wal_device + elif create_type == "wal" and self.db_device: + aux_dev = self.db_device + else: + return + if not aux_dev.is_lv: + mlogger.warning( + '{} device is not LVM, wouldn\'t update LVM tags'.format( + create_type.upper())) + else: + tags = {} + tags["ceph.{}_uuid".format(create_type)] = self.target_lv.lv_uuid + tags["ceph.{}_device".format(create_type)] = self.target_lv.lv_path + aux_dev.lv_api.set_tags(tags) + + def remove_lvs(self, source_devices, target_type): + remaining_devices = [self.data_device, self.db_device, self.wal_device] + + outdated_tags = [] + for device, type in source_devices: + if type == "block" or type == target_type: + continue + remaining_devices.remove(device) + if device.is_lv: + outdated_tags.append("ceph.{}_uuid".format(type)) + outdated_tags.append("ceph.{}_device".format(type)) + device.lv_api.clear_tags() + if len(outdated_tags) > 0: + for d in remaining_devices: + if d and d.is_lv: + d.lv_api.clear_tags(outdated_tags) + + def replace_lvs(self, source_devices, target_type): + remaining_devices = [self.data_device] + if self.db_device: + remaining_devices.append(self.db_device) + if self.wal_device: + remaining_devices.append(self.wal_device) + + outdated_tags = [] + for device, type in source_devices: + if type == "block": + continue + remaining_devices.remove(device) + if device.is_lv: + outdated_tags.append("ceph.{}_uuid".format(type)) + outdated_tags.append("ceph.{}_device".format(type)) + device.lv_api.clear_tags() + + new_tags = {} + new_tags["ceph.{}_uuid".format(target_type)] = self.target_lv.lv_uuid + new_tags["ceph.{}_device".format(target_type)] = self.target_lv.lv_path + + for d in remaining_devices: + if d and d.is_lv: + if len(outdated_tags) > 0: + d.lv_api.clear_tags(outdated_tags) + d.lv_api.set_tags(new_tags) + + if not self.data_device.is_lv: + mlogger.warning( + 'Data device is not LVM, wouldn\'t properly update target LVM tags') + else: + tags = self.data_device.lv_api.tags.copy() + + tags["ceph.type"] = target_type + tags["ceph.{}_uuid".format(target_type)] = self.target_lv.lv_uuid + tags["ceph.{}_device".format(target_type)] = self.target_lv.lv_path + self.target_lv.set_tags(tags) + + def undo(self): + mlogger.info( + 'Undoing lv tag set') + if self.data_device: + if self.old_data_tags: + self.data_device.lv_api.set_tags(self.old_data_tags) + else: + self.data_device.lv_api.clear_tags() + if self.db_device: + if self.old_db_tags: + self.db_device.lv_api.set_tags(self.old_db_tags) + else: + self.db_device.lv_api.clear_tags() + if self.wal_device: + if self.old_wal_tags: + self.wal_device.lv_api.set_tags(self.old_wal_tags) + else: + self.wal_device.lv_api.clear_tags() + if self.old_target_tags: + self.target_lv.set_tags(self.old_target_tags) + else: + self.target_lv.clear_tags() + +class Migrate(object): + + help = 'Migrate BlueFS data from to another LVM device' + + def __init__(self, argv): + self.argv = argv + self.osd_id = None + + def get_source_devices(self, devices, target_type=""): + ret = [] + for device, type in devices: + if type == target_type: + continue + if type == 'block': + if 'data' not in self.args.from_: + continue; + elif type == 'db': + if 'db' not in self.args.from_: + continue; + elif type == 'wal': + if 'wal' not in self.args.from_: + continue; + ret.append([device, type]) + if ret == []: + mlogger.error('Source device list is empty') + raise SystemExit( + 'Unable to migrate to : {}'.format(self.args.target)) + return ret + + # ceph-bluestore-tool uses the following replacement rules + # (in the order of precedence, stop on the first match) + # if source list has DB volume - target device replaces it. + # if source list has WAL volume - target device replace it. + # if source list has slow volume only - operation isn’t permitted, + # requires explicit allocation via new-db/new-wal command.detects which + def get_target_type_by_source(self, devices): + ret = None + for device, type in devices: + if type == 'db': + return 'db' + elif type == 'wal': + ret = 'wal' + return ret + + def get_filename_by_type(self, type): + filename = 'block' + if type == 'db' or type == 'wal': + filename += '.' + type + return filename + + def get_source_args(self, osd_path, devices): + ret = [] + for device, type in devices: + ret = ret + ["--devs-source", os.path.join( + osd_path, self.get_filename_by_type(type))] + return ret + + @decorators.needs_root + def migrate_to_new(self, osd_id, osd_fsid, devices, target_lv): + source_devices = self.get_source_devices(devices) + target_type = self.get_target_type_by_source(source_devices) + if not target_type: + mlogger.error( + "Unable to determine new volume type," + " please use new-db or new-wal command before.") + raise SystemExit( + "Unable to migrate to : {}".format(self.args.target)) + + target_path = target_lv.lv_path + + try: + tag_tracker = VolumeTagTracker(devices, target_lv) + # we need to update lvm tags for all the remaining volumes + # and clear for ones which to be removed + + # ceph-bluestore-tool removes source volume(s) other than block one + # and attaches target one after successful migration + tag_tracker.replace_lvs(source_devices, target_type) + + osd_path = get_osd_path(osd_id, osd_fsid) + source_args = self.get_source_args(osd_path, source_devices) + mlogger.info("Migrate to new, Source: {} Target: {}".format( + source_args, target_path)) + stdout, stderr, exit_code = process.call([ + 'ceph-bluestore-tool', + '--path', + osd_path, + '--dev-target', + target_path, + '--command', + 'bluefs-bdev-migrate'] + + source_args) + if exit_code != 0: + mlogger.error( + 'Failed to migrate device, error code:{}'.format(exit_code)) + raise SystemExit( + 'Failed to migrate to : {}'.format(self.args.target)) + else: + system.chown(os.path.join(osd_path, "block.{}".format( + target_type))) + terminal.success('Migration successful.') + except: + tag_tracker.undo() + raise + + return + + @decorators.needs_root + def migrate_to_existing(self, osd_id, osd_fsid, devices, target_lv): + target_type = target_lv.tags["ceph.type"] + if target_type == "wal": + mlogger.error("Migrate to WAL is not supported") + raise SystemExit( + "Unable to migrate to : {}".format(self.args.target)) + target_filename = self.get_filename_by_type(target_type) + if (target_filename == ""): + mlogger.error( + "Target Logical Volume doesn't have proper volume type " + "(ceph.type LVM tag): {}".format(target_type)) + raise SystemExit( + "Unable to migrate to : {}".format(self.args.target)) + + osd_path = get_osd_path(osd_id, osd_fsid) + source_devices = self.get_source_devices(devices, target_type) + target_path = os.path.join(osd_path, target_filename) + tag_tracker = VolumeTagTracker(devices, target_lv) + + try: + # ceph-bluestore-tool removes source volume(s) other than + # block and target ones after successful migration + tag_tracker.remove_lvs(source_devices, target_type) + source_args = self.get_source_args(osd_path, source_devices) + mlogger.info("Migrate to existing, Source: {} Target: {}".format( + source_args, target_path)) + stdout, stderr, exit_code = process.call([ + 'ceph-bluestore-tool', + '--path', + osd_path, + '--dev-target', + target_path, + '--command', + 'bluefs-bdev-migrate'] + + source_args) + if exit_code != 0: + mlogger.error( + 'Failed to migrate device, error code:{}'.format(exit_code)) + raise SystemExit( + 'Failed to migrate to : {}'.format(self.args.target)) + else: + terminal.success('Migration successful.') + except: + tag_tracker.undo() + raise + + return + + @decorators.needs_root + def migrate_osd(self): + if self.args.osd_id: + osd_is_running = systemctl.osd_is_active(self.args.osd_id) + if osd_is_running: + mlogger.error('OSD is running, stop it with: ' + 'systemctl stop ceph-osd@{}'.format( + self.args.osd_id)) + raise SystemExit( + 'Unable to migrate devices associated with OSD ID: {}' + .format(self.args.osd_id)) + + target_lv = api.get_lv_by_fullname(self.args.target) + if not target_lv: + mlogger.error( + 'Target path "{}" is not a Logical Volume'.formaat( + self.args.target)) + raise SystemExit( + 'Unable to migrate to : {}'.format(self.args.target)) + devices = find_associated_devices(self.args.osd_id, self.args.osd_fsid) + if (not target_lv.used_by_ceph): + self.migrate_to_new(self.args.osd_id, self.args.osd_fsid, + devices, + target_lv) + else: + if (target_lv.tags['ceph.osd_id'] != self.args.osd_id or + target_lv.tags['ceph.osd_fsid'] != self.args.osd_fsid): + mlogger.error( + 'Target Logical Volume isn\'t used by the specified OSD: ' + '{} FSID: {}'.format(self.args.osd_id, + self.args.osd_fsid)) + raise SystemExit( + 'Unable to migrate to : {}'.format(self.args.target)) + + self.migrate_to_existing(self.args.osd_id, self.args.osd_fsid, + devices, + target_lv) + + def parse_argv(self): + sub_command_help = dedent(""" + Moves BlueFS data from source volume(s) to the target one, source + volumes (except the main (i.e. data or block) one) are removed on + success. LVM volumes are permitted for Target only, both already + attached or new logical one. In the latter case it is attached to OSD + replacing one of the source devices. Following replacement rules apply + (in the order of precedence, stop on the first match): + * if source list has DB volume - target device replaces it. + * if source list has WAL volume - target device replace it. + * if source list has slow volume only - operation is not permitted, + requires explicit allocation via new-db/new-wal command. + + Example calls for supported scenarios: + + Moves BlueFS data from main device to LV already attached as DB: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/db + + Moves BlueFS data from shared main device to LV which will be attached + as a new DB: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data --target vgname/new_db + + Moves BlueFS data from DB device to new LV, DB is replaced: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db --target vgname/new_db + + Moves BlueFS data from main and DB devices to new LV, DB is replaced: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db --target vgname/new_db + + Moves BlueFS data from main, DB and WAL devices to new LV, WAL is + removed and DB is replaced: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from data db wal --target vgname/new_db + + Moves BlueFS data from main, DB and WAL devices to main device, WAL + and DB are removed: + + ceph-volume lvm migrate --osd-id 1 --osd-fsid <uuid> --from db wal --target vgname/data + + """) + parser = argparse.ArgumentParser( + prog='ceph-volume lvm migrate', + formatter_class=argparse.RawDescriptionHelpFormatter, + description=sub_command_help, + ) + + parser.add_argument( + '--osd-id', + required=True, + help='Specify an OSD ID to detect associated devices for zapping', + ) + + parser.add_argument( + '--osd-fsid', + required=True, + help='Specify an OSD FSID to detect associated devices for zapping', + ) + parser.add_argument( + '--target', + required=True, + help='Specify target Logical Volume (LV) to migrate data to', + ) + parser.add_argument( + '--from', + nargs='*', + dest='from_', + required=True, + choices=['data', 'db', 'wal'], + help='Copy BlueFS data from DB device', + ) + + if len(self.argv) == 0: + print(sub_command_help) + return + self.args = parser.parse_args(self.argv) + + def main(self): + self.parse_argv() + self.migrate_osd() + +class NewVolume(object): + def __init__(self, create_type, argv): + self.create_type = create_type + self.argv = argv + + def make_parser(self, prog, sub_command_help): + parser = argparse.ArgumentParser( + prog=prog, + formatter_class=argparse.RawDescriptionHelpFormatter, + description=sub_command_help, + ) + + parser.add_argument( + '--osd-id', + required=True, + help='Specify an OSD ID to attach new volume to', + ) + + parser.add_argument( + '--osd-fsid', + required=True, + help='Specify an OSD FSIDto attach new volume to', + ) + parser.add_argument( + '--target', + required=True, + help='Specify target Logical Volume (LV) to attach', + ) + return parser + + @decorators.needs_root + def make_new_volume(self, osd_id, osd_fsid, devices, target_lv): + osd_path = get_osd_path(osd_id, osd_fsid) + mlogger.info( + 'Making new volume at {} for OSD: {} ({})'.format( + target_lv.lv_path, osd_id, osd_path)) + tag_tracker = VolumeTagTracker(devices, target_lv) + + try: + tag_tracker.update_tags_when_lv_create(self.create_type) + + stdout, stderr, exit_code = process.call([ + 'ceph-bluestore-tool', + '--path', + osd_path, + '--dev-target', + target_lv.lv_path, + '--command', + 'bluefs-bdev-new-{}'.format(self.create_type) + ]) + if exit_code != 0: + mlogger.error( + 'failed to attach new volume, error code:{}'.format( + exit_code)) + raise SystemExit( + "Failed to attach new volume: {}".format( + self.args.target)) + else: + system.chown(os.path.join(osd_path, "block.{}".format( + self.create_type))) + terminal.success('New volume attached.') + except: + tag_tracker.undo() + raise + return + + @decorators.needs_root + def new_volume(self): + if self.args.osd_id: + osd_is_running = systemctl.osd_is_active(self.args.osd_id) + if osd_is_running: + mlogger.error('OSD ID is running, stop it with:' + ' systemctl stop ceph-osd@{}'.format(self.args.osd_id)) + raise SystemExit( + 'Unable to attach new volume for OSD: {}'.format( + self.args.osd_id)) + + target_lv = api.get_lv_by_fullname(self.args.target) + if not target_lv: + mlogger.error( + 'Target path {} is not a Logical Volume'.format( + self.args.target)) + raise SystemExit( + 'Unable to attach new volume : {}'.format(self.args.target)) + if target_lv.used_by_ceph: + mlogger.error( + 'Target Logical Volume is already used by ceph: {}'.format( + self.args.target)) + raise SystemExit( + 'Unable to attach new volume : {}'.format(self.args.target)) + else: + devices = find_associated_devices(self.args.osd_id, + self.args.osd_fsid) + self.make_new_volume( + self.args.osd_id, + self.args.osd_fsid, + devices, + target_lv) + +class NewWAL(NewVolume): + + help = 'Allocate new WAL volume for OSD at specified Logical Volume' + + def __init__(self, argv): + super(NewWAL, self).__init__("wal", argv) + + def main(self): + sub_command_help = dedent(""" + Attaches the given logical volume to the given OSD as a WAL volume. + Logical volume format is vg/lv. Fails if OSD has already got attached DB. + + Example: + + Attach vgname/lvname as a WAL volume to OSD 1 + + ceph-volume lvm new-wal --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_wal + """) + parser = self.make_parser('ceph-volume lvm new-wal', sub_command_help) + + if len(self.argv) == 0: + print(sub_command_help) + return + + self.args = parser.parse_args(self.argv) + + self.new_volume() + +class NewDB(NewVolume): + + help = 'Allocate new DB volume for OSD at specified Logical Volume' + + def __init__(self, argv): + super(NewDB, self).__init__("db", argv) + + def main(self): + sub_command_help = dedent(""" + Attaches the given logical volume to the given OSD as a DB volume. + Logical volume format is vg/lv. Fails if OSD has already got attached DB. + + Example: + + Attach vgname/lvname as a DB volume to OSD 1 + + ceph-volume lvm new-db --osd-id 1 --osd-fsid 55BD4219-16A7-4037-BC20-0F158EFCC83D --target vgname/new_db + """) + + parser = self.make_parser('ceph-volume lvm new-db', sub_command_help) + if len(self.argv) == 0: + print(sub_command_help) + return + self.args = parser.parse_args(self.argv) + + self.new_volume() diff --git a/src/ceph-volume/ceph_volume/tests/devices/lvm/test_migrate.py b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_migrate.py new file mode 100644 index 00000000000..dc429793de4 --- /dev/null +++ b/src/ceph-volume/ceph_volume/tests/devices/lvm/test_migrate.py @@ -0,0 +1,1504 @@ +import pytest +from mock.mock import patch +from ceph_volume import process +from ceph_volume.api import lvm as api +from ceph_volume.devices.lvm import migrate +from ceph_volume.util.device import Device +from ceph_volume.util import system + +class TestGetClusterName(object): + + mock_volumes = [] + def mock_get_lvs(self, *args, **kwargs): + return self.mock_volumes.pop(0) + + def test_cluster_found(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234,ceph.cluster_name=name_of_the_cluster' + vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='', + lv_path='/dev/VolGroup/lv1', lv_tags=tags) + self.mock_volumes = [] + self.mock_volumes.append([vol]) + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = migrate.get_cluster_name(osd_id='0', osd_fsid='1234') + assert "name_of_the_cluster" == result + + def test_cluster_not_found(self, monkeypatch, capsys): + self.mock_volumes = [] + self.mock_volumes.append([]) + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + with pytest.raises(SystemExit) as error: + migrate.get_cluster_name(osd_id='0', osd_fsid='1234') + stdout, stderr = capsys.readouterr() + expected = 'Unexpected error, terminating' + assert expected in str(error.value) + expected = 'Unable to find any LV for source OSD: id:0 fsid:1234' + assert expected in stderr + +class TestFindAssociatedDevices(object): + + mock_volumes = [] + def mock_get_lvs(self, *args, **kwargs): + return self.mock_volumes.pop(0) + + mock_single_volumes = {} + def mock_get_first_lv(self, *args, **kwargs): + p = kwargs['filters']['lv_path'] + return self.mock_single_volumes[p] + + def test_lv_is_matched_id(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234' + vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='', + lv_path='/dev/VolGroup/lv1', lv_tags=tags) + self.mock_volumes = [] + self.mock_volumes.append([vol]) + self.mock_volumes.append([vol]) + self.mock_volumes.append([]) + self.mock_volumes.append([]) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': vol} + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = migrate.find_associated_devices(osd_id='0', osd_fsid='1234') + assert len(result) == 1 + assert result[0][0].abspath == '/dev/VolGroup/lv1' + assert result[0][0].lvs == [vol] + assert result[0][1] == 'block' + + def test_lv_is_matched_id2(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234' + vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=tags) + tags2 = 'ceph.osd_id=0,ceph.journal_uuid=xx,ceph.type=wal,ceph.osd_fsid=1234' + vol2 = api.Volume(lv_name='volume2', lv_uuid='z', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=tags2) + self.mock_volumes = [] + self.mock_volumes.append([vol]) + self.mock_volumes.append([vol]) + self.mock_volumes.append([]) + self.mock_volumes.append([vol2]) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': vol, '/dev/VolGroup/lv2': vol2} + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = migrate.find_associated_devices(osd_id='0', osd_fsid='1234') + assert len(result) == 2 + for d in result: + if d[1] == 'block': + assert d[0].abspath == '/dev/VolGroup/lv1' + assert d[0].lvs == [vol] + elif d[1] == 'wal': + assert d[0].abspath == '/dev/VolGroup/lv2' + assert d[0].lvs == [vol2] + else: + assert False + + def test_lv_is_matched_id3(self, monkeypatch): + tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234' + vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=tags) + tags2 = 'ceph.osd_id=0,ceph.journal_uuid=xx,ceph.type=wal,ceph.osd_fsid=1234' + vol2 = api.Volume(lv_name='volume2', lv_uuid='z', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=tags2) + tags3 = 'ceph.osd_id=0,ceph.journal_uuid=xx,ceph.type=db,ceph.osd_fsid=1234' + vol3 = api.Volume(lv_name='volume3', lv_uuid='z', vg_name='vg', + lv_path='/dev/VolGroup/lv3', lv_tags=tags3) + + self.mock_volumes = [] + self.mock_volumes.append([vol]) + self.mock_volumes.append([vol]) + self.mock_volumes.append([vol3]) + self.mock_volumes.append([vol2]) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': vol, + '/dev/VolGroup/lv2': vol2, + '/dev/VolGroup/lv3': vol3} + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + result = migrate.find_associated_devices(osd_id='0', osd_fsid='1234') + assert len(result) == 3 + for d in result: + if d[1] == 'block': + assert d[0].abspath == '/dev/VolGroup/lv1' + assert d[0].lvs == [vol] + elif d[1] == 'wal': + assert d[0].abspath == '/dev/VolGroup/lv2' + assert d[0].lvs == [vol2] + elif d[1] == 'db': + assert d[0].abspath == '/dev/VolGroup/lv3' + assert d[0].lvs == [vol3] + else: + assert False + + def test_lv_is_not_matched(self, monkeypatch, capsys): + self.mock_volumes = [None] + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + monkeypatch.setattr(process, 'call', lambda x, **kw: ('', '', 0)) + + with pytest.raises(SystemExit) as error: + migrate.find_associated_devices(osd_id='1', osd_fsid='1234') + stdout, stderr = capsys.readouterr() + expected = 'Unexpected error, terminating' + assert expected in str(error.value) + expected = 'Unable to find any LV for source OSD: id:1 fsid:1234' + assert expected in stderr + +class TestVolumeTagTracker(object): + mock_single_volumes = {} + def mock_get_first_lv(self, *args, **kwargs): + p = kwargs['filters']['lv_path'] + return self.mock_single_volumes[p] + + mock_process_input = [] + def mock_process(self, *args, **kwargs): + self.mock_process_input.append(args[0]); + return ('', '', 0) + + def test_init(self, monkeypatch): + source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234' + source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234' + source_wal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=wal' + target_tags="ceph.a=1,ceph.b=2,c=3,ceph.d=4" # 'c' to be bypassed + devices=[] + + data_vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=source_db_tags) + wal_vol = api.Volume(lv_name='volume3', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv3', lv_tags=source_wal_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol} + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + data_device = Device(path = '/dev/VolGroup/lv1') + db_device = Device(path = '/dev/VolGroup/lv2') + wal_device = Device(path = '/dev/VolGroup/lv3') + devices.append([data_device, 'block']) + devices.append([db_device, 'db']) + devices.append([wal_device, 'wal']) + + target = api.Volume(lv_name='target_name', lv_tags=target_tags, + lv_path='/dev/VolGroup/lv_target') + t = migrate.VolumeTagTracker(devices, target); + + assert 3 == len(t.old_target_tags) + + assert data_device == t.data_device + assert 4 == len(t.old_data_tags) + assert 'data' == t.old_data_tags['ceph.type'] + + assert db_device == t.db_device + assert 2 == len(t.old_db_tags) + assert 'db' == t.old_db_tags['ceph.type'] + + assert wal_device == t.wal_device + assert 3 == len(t.old_wal_tags) + assert 'wal' == t.old_wal_tags['ceph.type'] + + def test_update_tags_when_lv_create(self, monkeypatch): + source_tags = \ + 'ceph.osd_id=0,ceph.journal_uuid=x,' \ + 'ceph.type=data,ceph.osd_fsid=1234' + source_db_tags = \ + 'ceph.osd_id=0,journal_uuid=x,ceph.type=db,' \ + 'osd_fsid=1234' + + devices=[] + + data_vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=source_db_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + data_device = Device(path = '/dev/VolGroup/lv1') + db_device = Device(path = '/dev/VolGroup/lv2') + devices.append([data_device, 'block']) + devices.append([db_device, 'db']) + + target = api.Volume(lv_name='target_name', lv_tags='', + lv_uuid='wal_uuid', + lv_path='/dev/VolGroup/lv_target') + t = migrate.VolumeTagTracker(devices, target); + + self.mock_process_input = [] + t.update_tags_when_lv_create('wal') + + assert 3 == len(self.mock_process_input) + + assert ['lvchange', + '--addtag', 'ceph.wal_uuid=wal_uuid', + '--addtag', 'ceph.wal_device=/dev/VolGroup/lv_target', + '/dev/VolGroup/lv1'] == self.mock_process_input[0] + + assert self.mock_process_input[1].sort() == [ + 'lvchange', + '--addtag', 'ceph.osd_id=0', + '--addtag', 'ceph.journal_uuid=x', + '--addtag', 'ceph.type=wal', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.wal_uuid=wal_uuid', + '--addtag', 'ceph.wal_device=/dev/VolGroup/lv_target', + '/dev/VolGroup/lv_target'].sort() + + assert ['lvchange', + '--addtag', 'ceph.wal_uuid=wal_uuid', + '--addtag', 'ceph.wal_device=/dev/VolGroup/lv_target', + '/dev/VolGroup/lv2'] == self.mock_process_input[2] + + def test_remove_lvs(self, monkeypatch): + source_tags = \ + 'ceph.osd_id=0,ceph.journal_uuid=x,' \ + 'ceph.type=data,ceph.osd_fsid=1234,ceph.wal_uuid=aaaaa' + source_db_tags = \ + 'ceph.osd_id=0,journal_uuid=x,ceph.type=db,' \ + 'osd_fsid=1234,ceph.wal_device=aaaaa' + source_wal_tags = \ + 'ceph.wal_uuid=uuid,ceph.wal_device=device,' \ + 'ceph.osd_id=0,ceph.type=wal' + + devices=[] + + data_vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=source_db_tags) + wal_vol = api.Volume(lv_name='volume3', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv3', lv_tags=source_wal_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + data_device = Device(path = '/dev/VolGroup/lv1') + db_device = Device(path = '/dev/VolGroup/lv2') + wal_device = Device(path = '/dev/VolGroup/lv3') + devices.append([data_device, 'block']) + devices.append([db_device, 'db']) + devices.append([wal_device, 'wal']) + + target = api.Volume(lv_name='target_name', lv_tags='', + lv_path='/dev/VolGroup/lv_target') + t = migrate.VolumeTagTracker(devices, target); + + device_to_remove = devices.copy() + + self.mock_process_input = [] + t.remove_lvs(device_to_remove, 'db') + + assert 3 == len(self.mock_process_input) + assert ['lvchange', + '--deltag', 'ceph.wal_uuid=uuid', + '--deltag', 'ceph.wal_device=device', + '--deltag', 'ceph.osd_id=0', + '--deltag', 'ceph.type=wal', + '/dev/VolGroup/lv3'] == self.mock_process_input[0] + assert ['lvchange', + '--deltag', 'ceph.wal_uuid=aaaaa', + '/dev/VolGroup/lv1'] == self.mock_process_input[1] + assert ['lvchange', + '--deltag', 'ceph.wal_device=aaaaa', + '/dev/VolGroup/lv2'] == self.mock_process_input[2] + + def test_replace_lvs(self, monkeypatch): + source_tags = \ + 'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\ + 'ceph.wal_uuid=wal_uuid,ceph.db_device=/dbdevice' + source_db_tags = \ + 'ceph.osd_id=0,ceph.type=db,ceph.osd_fsid=1234' + source_wal_tags = \ + 'ceph.wal_uuid=uuid,ceph.wal_device=device,' \ + 'ceph.osd_id=0,ceph.type=wal' + + devices=[] + + data_vol = api.Volume(lv_name='volume1', lv_uuid='datauuid', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', lv_uuid='dbuuid', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=source_db_tags) + wal_vol = api.Volume(lv_name='volume3', lv_uuid='waluuid', vg_name='vg', + lv_path='/dev/VolGroup/lv3', lv_tags=source_wal_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + data_device = Device(path = '/dev/VolGroup/lv1') + db_device = Device(path = '/dev/VolGroup/lv2') + wal_device = Device(path = '/dev/VolGroup/lv3') + devices.append([data_device, 'block']) + devices.append([db_device, 'db']) + devices.append([wal_device, 'wal']) + + target = api.Volume(lv_name='target_name', + lv_uuid='ttt', + lv_tags='ceph.tag_to_remove=aaa', + lv_path='/dev/VolGroup/lv_target') + t = migrate.VolumeTagTracker(devices, target); + + self.mock_process_input = [] + t.replace_lvs(devices, 'db') + + assert 5 == len(self.mock_process_input) + + assert ['lvchange', + '--deltag', 'ceph.osd_id=0', + '--deltag', 'ceph.type=db', + '--deltag', 'ceph.osd_fsid=1234', + '/dev/VolGroup/lv2'] == self.mock_process_input[0] + assert ['lvchange', + '--deltag', 'ceph.wal_uuid=uuid', + '--deltag', 'ceph.wal_device=device', + '--deltag', 'ceph.osd_id=0', + '--deltag', 'ceph.type=wal', + '/dev/VolGroup/lv3'] == self.mock_process_input[1] + assert ['lvchange', + '--deltag', 'ceph.db_device=/dbdevice', + '--deltag', 'ceph.wal_uuid=wal_uuid', + '/dev/VolGroup/lv1'] == self.mock_process_input[2] + + assert ['lvchange', + '--addtag', 'ceph.db_uuid=ttt', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv_target', + '/dev/VolGroup/lv1'] == self.mock_process_input[3] + + assert self.mock_process_input[4].sort() == [ + 'lvchange', + '--addtag', 'ceph.osd_id=0', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.type=db', + '--addtag', 'ceph.db_uuid=ttt', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv_target', + '/dev/VolGroup/lv_target'].sort() + + def test_undo(self, monkeypatch): + source_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.osd_fsid=1234' + source_db_tags = 'ceph.osd_id=0,journal_uuid=x,ceph.type=db, osd_fsid=1234' + source_wal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=wal' + target_tags="" + devices=[] + + data_vol = api.Volume(lv_name='volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv2', lv_tags=source_db_tags) + wal_vol = api.Volume(lv_name='volume3', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/lv3', lv_tags=source_wal_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + data_device = Device(path = '/dev/VolGroup/lv1') + db_device = Device(path = '/dev/VolGroup/lv2') + wal_device = Device(path = '/dev/VolGroup/lv3') + devices.append([data_device, 'block']) + devices.append([db_device, 'db']) + devices.append([wal_device, 'wal']) + + target = api.Volume(lv_name='target_name', lv_tags=target_tags, + lv_path='/dev/VolGroup/lv_target') + t = migrate.VolumeTagTracker(devices, target); + + target.tags['ceph.a'] = 'aa'; + target.tags['ceph.b'] = 'bb'; + + data_vol.tags['ceph.journal_uuid'] = 'z'; + + db_vol.tags.pop('ceph.type') + + wal_vol.tags.clear() + + assert 2 == len(target.tags) + assert 4 == len(data_vol.tags) + assert 1 == len(db_vol.tags) + + self.mock_process_input = [] + t.undo() + + assert 0 == len(target.tags) + assert 4 == len(data_vol.tags) + assert 'x' == data_vol.tags['ceph.journal_uuid'] + + assert 2 == len(db_vol.tags) + assert 'db' == db_vol.tags['ceph.type'] + + assert 3 == len(wal_vol.tags) + assert 'wal' == wal_vol.tags['ceph.type'] + + assert 6 == len(self.mock_process_input) + assert 'lvchange' in self.mock_process_input[0] + assert '--deltag' in self.mock_process_input[0] + assert 'ceph.journal_uuid=z' in self.mock_process_input[0] + assert '/dev/VolGroup/lv1' in self.mock_process_input[0] + + assert 'lvchange' in self.mock_process_input[1] + assert '--addtag' in self.mock_process_input[1] + assert 'ceph.journal_uuid=x' in self.mock_process_input[1] + assert '/dev/VolGroup/lv1' in self.mock_process_input[1] + + assert 'lvchange' in self.mock_process_input[2] + assert '--deltag' in self.mock_process_input[2] + assert 'ceph.osd_id=0' in self.mock_process_input[2] + assert '/dev/VolGroup/lv2' in self.mock_process_input[2] + + assert 'lvchange' in self.mock_process_input[3] + assert '--addtag' in self.mock_process_input[3] + assert 'ceph.type=db' in self.mock_process_input[3] + assert '/dev/VolGroup/lv2' in self.mock_process_input[3] + + assert 'lvchange' in self.mock_process_input[4] + assert '--addtag' in self.mock_process_input[4] + assert 'ceph.type=wal' in self.mock_process_input[4] + assert '/dev/VolGroup/lv3' in self.mock_process_input[4] + + assert 'lvchange' in self.mock_process_input[5] + assert '--deltag' in self.mock_process_input[5] + assert 'ceph.a=aa' in self.mock_process_input[5] + assert 'ceph.b=bb' in self.mock_process_input[5] + assert '/dev/VolGroup/lv_target' in self.mock_process_input[5] + +class TestNew(object): + + mock_volume = None + def mock_get_lv_by_fullname(self, *args, **kwargs): + return self.mock_volume + + mock_process_input = [] + def mock_process(self, *args, **kwargs): + self.mock_process_input.append(args[0]); + return ('', '', 0) + + mock_single_volumes = {} + def mock_get_first_lv(self, *args, **kwargs): + p = kwargs['filters']['lv_path'] + return self.mock_single_volumes[p] + + mock_volumes = [] + def mock_get_lvs(self, *args, **kwargs): + return self.mock_volumes.pop(0) + + def test_newdb_non_root(self): + with pytest.raises(Exception) as error: + migrate.NewDB(argv=[ + '--osd-id', '1', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--target', 'vgname/new_db']).main() + expected = 'This command needs to be executed with sudo or as root' + assert expected in str(error.value) + + @patch('os.getuid') + def test_newdb_not_target_lvm(self, m_getuid, capsys): + m_getuid.return_value = 0 + with pytest.raises(SystemExit) as error: + migrate.NewDB(argv=[ + '--osd-id', '1', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--target', 'vgname/new_db']).main() + stdout, stderr = capsys.readouterr() + expected = 'Unable to attach new volume : vgname/new_db' + assert expected in str(error.value) + expected = 'Target path vgname/new_db is not a Logical Volume' + assert expected in stderr + + + @patch('os.getuid') + def test_newdb_already_in_use(self, m_getuid, monkeypatch, capsys): + m_getuid.return_value = 0 + + self.mock_volume = api.Volume(lv_name='volume1', + lv_uuid='y', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags='ceph.osd_id=5') # this results in set used_by_ceph + monkeypatch.setattr(api, 'get_lv_by_fullname', self.mock_get_lv_by_fullname) + + with pytest.raises(SystemExit) as error: + migrate.NewDB(argv=[ + '--osd-id', '1', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--target', 'vgname/new_db']).main() + stdout, stderr = capsys.readouterr() + expected = 'Unable to attach new volume : vgname/new_db' + assert expected in str(error.value) + expected = 'Target Logical Volume is already used by ceph: vgname/new_db' + assert expected in stderr + + @patch('os.getuid') + def test_newdb(self, m_getuid, monkeypatch, capsys): + m_getuid.return_value = 0 + + source_tags = \ + 'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234,'\ + 'ceph.wal_uuid=wal_uuid,ceph.db_device=/dbdevice' + source_wal_tags = \ + 'ceph.wal_uuid=uuid,ceph.wal_device=device,' \ + 'ceph.osd_id=0,ceph.type=wal' + + data_vol = api.Volume(lv_name='volume1', lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='waluuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv3': wal_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + self.mock_volume = api.Volume(lv_name='target_volume1', lv_uuid='y', + vg_name='vg', + lv_path='/dev/VolGroup/target_volume', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + #find_associated_devices will call get_lvs() 4 times + # and it this needs results to be arranged that way + self.mock_volumes = [] + self.mock_volumes.append([data_vol, wal_vol]) + self.mock_volumes.append([data_vol]) + self.mock_volumes.append([]) + self.mock_volumes.append([wal_vol]) + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph_cluster') + monkeypatch.setattr(system, 'chown', lambda path: 0) + + migrate.NewDB(argv=[ + '--osd-id', '1', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--target', 'vgname/new_db']).main() + + n = len(self.mock_process_input) + assert n >= 5 + + assert self.mock_process_input[n - 5] == [ + 'lvchange', + '--deltag', 'ceph.db_device=/dbdevice', + '/dev/VolGroup/lv1'] + assert self.mock_process_input[n - 4] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=y', + '--addtag', 'ceph.db_device=/dev/VolGroup/target_volume', + '/dev/VolGroup/lv1'] + + assert self.mock_process_input[n - 3].sort() == [ + 'lvchange', + '--addtag', 'ceph.wal_uuid=uuid', + '--addtag', 'ceph.osd_id=0', + '--addtag', 'ceph.type=db', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.db_uuid=y', + '--addtag', 'ceph.db_device=/dev/VolGroup/target_volume', + '/dev/VolGroup/target_volume'].sort() + + assert self.mock_process_input[n - 2] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=y', + '--addtag', 'ceph.db_device=/dev/VolGroup/target_volume', + '/dev/VolGroup/lv3'] + + assert self.mock_process_input[n - 1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph_cluster-1', + '--dev-target', '/dev/VolGroup/target_volume', + '--command', 'bluefs-bdev-new-db'] + + @patch('os.getuid') + def test_newwal(self, m_getuid, monkeypatch, capsys): + m_getuid.return_value = 0 + + source_tags = \ + 'ceph.osd_id=0,ceph.type=data,ceph.osd_fsid=1234' + + data_vol = api.Volume(lv_name='volume1', lv_uuid='datauuid', vg_name='vg', + lv_path='/dev/VolGroup/lv1', lv_tags=source_tags) + + self.mock_single_volumes = {'/dev/VolGroup/lv1': data_vol} + + monkeypatch.setattr(migrate.api, 'get_first_lv', self.mock_get_first_lv) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + self.mock_volume = api.Volume(lv_name='target_volume1', lv_uuid='y', vg_name='vg', + lv_path='/dev/VolGroup/target_volume', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', self.mock_get_lv_by_fullname) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", lambda id: False) + + #find_associated_devices will call get_lvs() 4 times + # and it this needs results to be arranged that way + self.mock_volumes = [] + self.mock_volumes.append([data_vol]) + self.mock_volumes.append([data_vol]) + self.mock_volumes.append([]) + self.mock_volumes.append([]) + + monkeypatch.setattr(migrate.api, 'get_lvs', self.mock_get_lvs) + + monkeypatch.setattr(migrate, 'get_cluster_name', lambda osd_id, osd_fsid: 'cluster') + monkeypatch.setattr(system, 'chown', lambda path: 0) + + migrate.NewWAL(argv=[ + '--osd-id', '2', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--target', 'vgname/new_wal']).main() + + n = len(self.mock_process_input) + assert n >= 3 + + assert self.mock_process_input[n - 3] == [ + 'lvchange', + '--addtag', 'ceph.wal_uuid=y', + '--addtag', 'ceph.wal_device=/dev/VolGroup/target_volume', + '/dev/VolGroup/lv1'] + + assert self.mock_process_input[n - 2].sort() == [ + 'lvchange', + '--addtag', 'ceph.osd_id=0', + '--addtag', 'ceph.type=wal', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.wal_uuid=y', + '--addtag', 'ceph.wal_device=/dev/VolGroup/target_volume', + '/dev/VolGroup/target_volume'].sort() + + assert self.mock_process_input[n - 1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/cluster-2', + '--dev-target', '/dev/VolGroup/target_volume', + '--command', 'bluefs-bdev-new-wal'] + +class TestMigrate(object): + + mock_volume = None + def mock_get_lv_by_fullname(self, *args, **kwargs): + return self.mock_volume + + mock_process_input = [] + def mock_process(self, *args, **kwargs): + self.mock_process_input.append(args[0]); + return ('', '', 0) + + mock_single_volumes = {} + def mock_get_first_lv(self, *args, **kwargs): + p = kwargs['filters']['lv_path'] + return self.mock_single_volumes[p] + + mock_volumes = [] + def mock_get_lvs(self, *args, **kwargs): + return self.mock_volumes.pop(0) + + def test_get_source_devices(self, monkeypatch): + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234' + source_wal_tags = 'ceph.osd_id=2,ceph.type=wal,ceph.osd_fsid=1234' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = api.Volume(lv_name='volume2', lv_uuid='y', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags='ceph.osd_id=5,ceph.osd_type=db') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--from', 'data', 'wal', + '--target', 'vgname/new_wal']) + m.parse_argv() + res_devices = m.get_source_devices(devices) + + assert 2 == len(res_devices) + assert devices[0] == res_devices[0] + assert devices[2] == res_devices[1] + + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '55BD4219-16A7-4037-BC20-0F158EFCC83D', + '--from', 'db', 'wal', 'data', + '--target', 'vgname/new_wal']) + m.parse_argv() + res_devices = m.get_source_devices(devices) + + assert 3 == len(res_devices) + assert devices[0] == res_devices[0] + assert devices[1] == res_devices[1] + assert devices[2] == res_devices[2] + + + @patch('os.getuid') + def test_migrate_data_db_to_new_db(self, m_getuid, monkeypatch): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = api.Volume(lv_name='volume2_new', lv_uuid='new-db-uuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2_new', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'data', 'db', 'wal', + '--target', 'vgname/new_wal']) + m.main() + + n = len(self.mock_process_input) + assert n >= 5 + + assert self. mock_process_input[n-5] == [ + 'lvchange', + '--deltag', 'ceph.osd_id=2', + '--deltag', 'ceph.type=db', + '--deltag', 'ceph.osd_fsid=1234', + '--deltag', 'ceph.cluster_name=ceph', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv2'] + + assert self. mock_process_input[n-4] == [ + 'lvchange', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-3] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-2] == [ + 'lvchange', + '--addtag', 'ceph.osd_id=2', + '--addtag', 'ceph.type=db', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.cluster_name=ceph', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv2_new'] + + assert self. mock_process_input[n-1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph-2', + '--dev-target', '/dev/VolGroup/lv2_new', + '--command', 'bluefs-bdev-migrate', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block.db'] + + @patch('os.getuid') + def test_migrate_data_db_to_new_db_skip_wal(self, m_getuid, monkeypatch): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_wal_tags = 'ceph.osd_id=2,ceph.type=wal,ceph.osd_fsid=1234' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = api.Volume(lv_name='volume2_new', lv_uuid='new-db-uuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2_new', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'data', 'db', + '--target', 'vgname/new_wal']) + m.main() + + n = len(self.mock_process_input) + assert n >= 7 + + assert self. mock_process_input[n-7] == [ + 'lvchange', + '--deltag', 'ceph.osd_id=2', + '--deltag', 'ceph.type=db', + '--deltag', 'ceph.osd_fsid=1234', + '--deltag', 'ceph.cluster_name=ceph', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv2'] + + assert self. mock_process_input[n-6] == [ + 'lvchange', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-5] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-4] == [ + 'lvchange', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv3'] + + assert self. mock_process_input[n-3] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv3'] + + assert self. mock_process_input[n-2] == [ + 'lvchange', + '--addtag', 'ceph.osd_id=2', + '--addtag', 'ceph.type=db', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.cluster_name=ceph', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv2_new'] + + assert self. mock_process_input[n-1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph-2', + '--dev-target', '/dev/VolGroup/lv2_new', + '--command', 'bluefs-bdev-migrate', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block.db'] + + @patch('os.getuid') + def test_migrate_data_db_wal_to_new_db(self, m_getuid, monkeypatch): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_wal_tags = 'ceph.osd_id=0,ceph.type=wal,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='waluuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = api.Volume(lv_name='volume2_new', lv_uuid='new-db-uuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2_new', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'data', 'db', 'wal', + '--target', 'vgname/new_wal']) + m.main() + + n = len(self.mock_process_input) + assert n >= 6 + + assert self. mock_process_input[n-6] == [ + 'lvchange', + '--deltag', 'ceph.osd_id=2', + '--deltag', 'ceph.type=db', + '--deltag', 'ceph.osd_fsid=1234', + '--deltag', 'ceph.cluster_name=ceph', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '/dev/VolGroup/lv2'] + + assert self. mock_process_input[n-5] == [ + 'lvchange', + '--deltag', 'ceph.osd_id=0', + '--deltag', 'ceph.type=wal', + '--deltag', 'ceph.osd_fsid=1234', + '--deltag', 'ceph.cluster_name=ceph', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '--deltag', 'ceph.wal_uuid=waluuid', + '--deltag', 'ceph.wal_device=wal_dev', + '/dev/VolGroup/lv3'] + + assert self. mock_process_input[n-4] == [ + 'lvchange', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '--deltag', 'ceph.wal_uuid=waluuid', + '--deltag', 'ceph.wal_device=wal_dev', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-3] == [ + 'lvchange', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv1'] + + assert self. mock_process_input[n-2] == [ + 'lvchange', + '--addtag', 'ceph.osd_id=2', + '--addtag', 'ceph.type=db', + '--addtag', 'ceph.osd_fsid=1234', + '--addtag', 'ceph.cluster_name=ceph', + '--addtag', 'ceph.db_uuid=new-db-uuid', + '--addtag', 'ceph.db_device=/dev/VolGroup/lv2_new', + '/dev/VolGroup/lv2_new'] + + assert self. mock_process_input[n-1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph-2', + '--dev-target', '/dev/VolGroup/lv2_new', + '--command', 'bluefs-bdev-migrate', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block.db', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal'] + + @patch('os.getuid') + def test_dont_migrate_data_db_wal_to_new_data(self, + m_getuid, + monkeypatch, + capsys): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = api.Volume(lv_name='volume2_new', lv_uuid='new-db-uuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2_new', + lv_tags='') + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'data', + '--target', 'vgname/new_data']) + + with pytest.raises(SystemExit) as error: + m.main() + stdout, stderr = capsys.readouterr() + expected = 'Unable to migrate to : vgname/new_data' + assert expected in str(error.value) + expected = 'Unable to determine new volume type,' + ' please use new-db or new-wal command before.' + assert expected in stderr + + @patch('os.getuid') + def test_dont_migrate_db_to_wal(self, + m_getuid, + monkeypatch, + capsys): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_wal_tags = 'ceph.osd_id=2,ceph.type=wal,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='waluuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = wal_vol + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'db', + '--target', 'vgname/wal']) + + with pytest.raises(SystemExit) as error: + m.main() + stdout, stderr = capsys.readouterr() + expected = 'Unable to migrate to : vgname/wal' + assert expected in str(error.value) + expected = 'Migrate to WAL is not supported' + assert expected in stderr + + @patch('os.getuid') + def test_migrate_data_db_to_db(self, + m_getuid, + monkeypatch, + capsys): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev' + source_wal_tags = 'ceph.osd_id=2,ceph.type=wal,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='waluuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = db_vol + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'db', 'data', + '--target', 'vgname/db']) + + m.main() + + n = len(self.mock_process_input) + assert n >= 1 + for s in self.mock_process_input: + print(s) + + assert self. mock_process_input[n-1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph-2', + '--dev-target', '/var/lib/ceph/osd/ceph-2/block.db', + '--command', 'bluefs-bdev-migrate', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block'] + + @patch('os.getuid') + def test_migrate_data_wal_to_db(self, + m_getuid, + monkeypatch, + capsys): + m_getuid.return_value = 0 + + source_tags = 'ceph.osd_id=2,ceph.type=data,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + source_db_tags = 'ceph.osd_id=2,ceph.type=db,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + source_wal_tags = 'ceph.osd_id=2,ceph.type=wal,ceph.osd_fsid=1234,' \ + 'ceph.cluster_name=ceph,ceph.db_uuid=dbuuid,ceph.db_device=db_dev,' \ + 'ceph.wal_uuid=waluuid,ceph.wal_device=wal_dev' + + data_vol = api.Volume(lv_name='volume1', + lv_uuid='datauuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv1', + lv_tags=source_tags) + db_vol = api.Volume(lv_name='volume2', + lv_uuid='dbuuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv2', + lv_tags=source_db_tags) + + wal_vol = api.Volume(lv_name='volume3', + lv_uuid='waluuid', + vg_name='vg', + lv_path='/dev/VolGroup/lv3', + lv_tags=source_wal_tags) + + self.mock_single_volumes = { + '/dev/VolGroup/lv1': data_vol, + '/dev/VolGroup/lv2': db_vol, + '/dev/VolGroup/lv3': wal_vol, + } + monkeypatch.setattr(migrate.api, 'get_first_lv', + self.mock_get_first_lv) + + self.mock_volume = db_vol + monkeypatch.setattr(api, 'get_lv_by_fullname', + self.mock_get_lv_by_fullname) + + self.mock_process_input = [] + monkeypatch.setattr(process, 'call', self.mock_process) + + devices = [] + devices.append([Device('/dev/VolGroup/lv1'), 'block']) + devices.append([Device('/dev/VolGroup/lv2'), 'db']) + devices.append([Device('/dev/VolGroup/lv3'), 'wal']) + + monkeypatch.setattr(migrate, 'find_associated_devices', + lambda osd_id, osd_fsid: devices) + + monkeypatch.setattr("ceph_volume.systemd.systemctl.osd_is_active", + lambda id: False) + + monkeypatch.setattr(migrate, 'get_cluster_name', + lambda osd_id, osd_fsid: 'ceph') + monkeypatch.setattr(system, 'chown', lambda path: 0) + m = migrate.Migrate(argv=[ + '--osd-id', '2', + '--osd-fsid', '1234', + '--from', 'db', 'data', 'wal', + '--target', 'vgname/db']) + + m.main() + + n = len(self.mock_process_input) + assert n >= 1 + for s in self.mock_process_input: + print(s) + + assert self. mock_process_input[n-4] == [ + 'lvchange', + '--deltag', 'ceph.osd_id=2', + '--deltag', 'ceph.type=wal', + '--deltag', 'ceph.osd_fsid=1234', + '--deltag', 'ceph.cluster_name=ceph', + '--deltag', 'ceph.db_uuid=dbuuid', + '--deltag', 'ceph.db_device=db_dev', + '--deltag', 'ceph.wal_uuid=waluuid', + '--deltag', 'ceph.wal_device=wal_dev', + '/dev/VolGroup/lv3'] + assert self. mock_process_input[n-3] == [ + 'lvchange', + '--deltag', 'ceph.wal_uuid=waluuid', + '--deltag', 'ceph.wal_device=wal_dev', + '/dev/VolGroup/lv1'] + assert self. mock_process_input[n-2] == [ + 'lvchange', + '--deltag', 'ceph.wal_uuid=waluuid', + '--deltag', 'ceph.wal_device=wal_dev', + '/dev/VolGroup/lv2'] + assert self. mock_process_input[n-1] == [ + 'ceph-bluestore-tool', + '--path', '/var/lib/ceph/osd/ceph-2', + '--dev-target', '/var/lib/ceph/osd/ceph-2/block.db', + '--command', 'bluefs-bdev-migrate', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block', + '--devs-source', '/var/lib/ceph/osd/ceph-2/block.wal'] diff --git a/src/os/bluestore/bluestore_tool.cc b/src/os/bluestore/bluestore_tool.cc index 2dc1016c38e..2b671484039 100644 --- a/src/os/bluestore/bluestore_tool.cc +++ b/src/os/bluestore/bluestore_tool.cc @@ -676,6 +676,7 @@ int main(int argc, char **argv) parse_devices(cct.get(), devs, &cur_devs_map, &has_db, &has_wal); + const char* rlpath = nullptr; if (has_db && has_wal) { cerr << "can't allocate new device, both WAL and DB exist" << std::endl; @@ -689,24 +690,35 @@ int main(int argc, char **argv) << std::endl; exit(EXIT_FAILURE); } else if(!dev_target.empty() && - realpath(dev_target.c_str(), target_path) == nullptr) { + (rlpath = realpath(dev_target.c_str(), target_path)) == nullptr) { cerr << "failed to retrieve absolute path for " << dev_target << ": " << cpp_strerror(errno) << std::endl; exit(EXIT_FAILURE); } - // Create either DB or WAL volume - int r = EXIT_FAILURE; - if (need_db && cct->_conf->bluestore_block_db_size == 0) { - cerr << "DB size isn't specified, " - "please set Ceph bluestore-block-db-size config parameter " - << std::endl; - } else if (!need_db && cct->_conf->bluestore_block_wal_size == 0) { - cerr << "WAL size isn't specified, " - "please set Ceph bluestore-block-wal-size config parameter " - << std::endl; - } else { + // Attach either DB or WAL volume, create if needed + struct stat st; + int r = -1; + if (rlpath != nullptr) { + r = ::stat(rlpath, &st); + } + // check if we need additional size specification + if (r == -1 || (r == 0 && S_ISREG(st.st_mode) && st.st_size == 0)) { + r = 0; + if (need_db && cct->_conf->bluestore_block_db_size == 0) { + cerr << "Might need DB size specification, " + "please set Ceph bluestore-block-db-size config parameter " + << std::endl; + r = EXIT_FAILURE; + } else if (!need_db && cct->_conf->bluestore_block_wal_size == 0) { + cerr << "Might need WAL size specification, " + "please set Ceph bluestore-block-wal-size config parameter " + << std::endl; + r = EXIT_FAILURE; + } + } + if (r == 0) { BlueStore bluestore(cct.get(), path); r = bluestore.add_new_bluefs_device( need_db ? BlueFS::BDEV_NEWDB : BlueFS::BDEV_NEWWAL, @@ -719,8 +731,8 @@ int main(int argc, char **argv) << cpp_strerror(r) << std::endl; } - return r; } + return r; } else if (action == "bluefs-bdev-migrate") { map<string, int> cur_devs_map; set<int> src_dev_ids; |