#!/usr/bin/python import argparse import errno import fcntl import logging import os import os.path import platform import re import subprocess import stat import sys import tempfile import uuid """ Prepare: - create GPT partition - mark the partition with the ceph type uuid - create a file system - mark the fs as ready for ceph consumption - entire data disk is used (one big partition) - a new partition is added to the journal disk (so it can be easily shared) - triggered by administrator or ceph-deploy, e.g. 'ceph-disk [journal disk] Activate: - mount the volume in a temp loation - allocate an osd id (if needed) - remount in the correct location /var/lib/ceph/osd/$cluster-$id - start ceph-osd - triggered by udev when it sees the OSD gpt partition type - triggered by admin 'ceph-disk activate ' - triggered on ceph service startup with 'ceph-disk activate-all' We rely on /dev/disk/by-partuuid to find partitions by their UUID; this is what the journal symlink inside the osd data volume normally points to. activate-all relies on /dev/disk/by-parttype-uuid/$typeuuid.$uuid to find all partitions. We install special udev rules to create these links. udev triggers 'ceph-disk activate ' or 'ceph-disk activate-journal ' based on the partition type. On old distros (e.g., RHEL6), the blkid installed does not recognized GPT partition metadata and the /dev/disk/by-partuuid etc. links aren't present. We have a horrible hack in the form of ceph-disk-udev that parses gparted output to create the symlinks above and triggers the 'ceph-disk activate' etc commands that udev normally would do if it knew the GPT partition type. """ CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106' OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d' DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d' TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be' DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be' DEFAULT_FS_TYPE = 'xfs' MOUNT_OPTIONS = dict( btrfs='noatime,user_subvol_rm_allowed', # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll # delay a moment before removing it fully because we did have some # issues with ext4 before the xatts-in-leveldb work, and it seemed # that user_xattr helped ext4='noatime,user_xattr', xfs='noatime', ) MKFS_ARGS = dict( btrfs=[ '-m', 'single', '-l', '32768', '-n', '32768', ], xfs=[ # xfs insists on not overwriting previous fs; even if we wipe # partition table, we often recreate it exactly the same way, # so we'll see ghosts of filesystems past '-f', '-i', 'size=2048', ], ) INIT_SYSTEMS = [ 'upstart', 'sysvinit', 'systemd', 'auto', ] # Nuke the TERM variable to avoid confusing any subprocesses we call. # For example, libreadline will print weird control sequences for some # TERM values. if 'TERM' in os.environ: del os.environ['TERM'] LOG_NAME = __name__ if LOG_NAME == '__main__': LOG_NAME = os.path.basename(sys.argv[0]) LOG = logging.getLogger(LOG_NAME) ###### lock ######## class filelock(object): def __init__(self, fn): self.fn = fn self.fd = None def acquire(self): assert not self.fd self.fd = file(self.fn, 'w') fcntl.lockf(self.fd, fcntl.LOCK_EX) def release(self): assert self.fd fcntl.lockf(self.fd, fcntl.LOCK_UN) self.fd = None prepare_lock = filelock('/var/lib/ceph/tmp/ceph-disk.prepare.lock') activate_lock = filelock('/var/lib/ceph/tmp/ceph-disk.activate.lock') ###### exceptions ######## class Error(Exception): """ Error """ def __str__(self): doc = self.__doc__.strip() return ': '.join([doc] + [str(a) for a in self.args]) class MountError(Error): """ Mounting filesystem failed """ class UnmountError(Error): """ Unmounting filesystem failed """ class BadMagicError(Error): """ Does not look like a Ceph OSD, or incompatible version """ class TruncatedLineError(Error): """ Line is truncated """ class TooManyLinesError(Error): """ Too many lines """ class FilesystemTypeError(Error): """ Cannot discover filesystem type """ ####### utils def maybe_mkdir(*a, **kw): """ Creates a new directory if it doesn't exist, removes existing symlink before creating the directory. """ # remove any symlink, if it is there.. if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode): LOG.debug('Removing old symlink at %s', *a) os.unlink(*a) try: os.mkdir(*a, **kw) except OSError, e: if e.errno == errno.EEXIST: pass else: raise # a device "name" is something like # sdb # cciss!c0d1 def get_dev_name(path): """ get device name from path. e.g., /dev/sda -> sdas, /dev/cciss/c0d1 -> cciss!c0d1 """ assert path.startswith('/dev/') base = path[5:] return base.replace('/', '!') # a device "path" is something like # /dev/sdb # /dev/cciss/c0d1 def get_dev_path(name): """ get a path (/dev/...) from a name (cciss!c0d1) """ return '/dev/' + name.replace('!', '/') def get_dev_relpath(name): """ get a relative path to /dev from a name (cciss!c0d1) """ return name.replace('!', '/') def get_partition_dev(dev, pnum): """ get the device name for a partition assume that partitions are named like the base dev, with a number, and optionally some intervening characters (like 'p'). e.g., sda 1 -> sda1 cciss/c0d1 1 -> cciss!c0d1p1 """ name = get_dev_name(os.path.realpath(dev)) partname = None for f in os.listdir(os.path.join('/sys/block', name)): if f.startswith(name) and f.endswith(str(pnum)): # we want the shortest name that starts with the base name and ends with the partition number if not partname or len(f) < len(partname): partname = f if partname: return get_dev_path(partname) else: raise Error('partition %d for %s does not appear to exist' % (pnum, dev)) def list_all_partitions(): """ Return a list of devices and partitions """ dev_part_list = {} for name in os.listdir('/sys/block'): if not os.path.exists(os.path.join('/sys/block', name, 'device')): continue dev_part_list[name] = list_partitions(name) return dev_part_list def list_partitions(basename): """ Return a list of partitions on the given device name """ partitions = [] for name in os.listdir(os.path.join('/sys/block', basename)): if name.startswith(basename): partitions.append(name) return partitions def is_partition(dev): """ Check whether a given device path is a partition or a full disk. """ dev = os.path.realpath(dev) if not stat.S_ISBLK(os.lstat(dev).st_mode): raise Error('not a block device', dev) name = get_dev_name(dev) if os.path.exists(os.path.join('/sys/block', name)): return False # make sure it is a partition of something else for basename in os.listdir('/sys/block'): if os.path.exists(os.path.join('/sys/block', basename, name)): return True raise Error('not a disk or partition', dev) def is_mounted(dev): """ Check if the given device is mounted. """ dev = os.path.realpath(dev) with file('/proc/mounts', 'rb') as proc_mounts: for line in proc_mounts: fields = line.split() if len(fields) < 3: continue mounts_dev = fields[0] path = fields[1] if mounts_dev.startswith('/') and os.path.exists(mounts_dev): mounts_dev = os.path.realpath(mounts_dev) if mounts_dev == dev: return path return None def is_held(dev): """ Check if a device is held by another device (e.g., a dm-crypt mapping) """ assert os.path.exists(dev) dev = os.path.realpath(dev) base = get_dev_name(dev) # full disk? directory = '/sys/block/{base}/holders'.format(base=base) if os.path.exists(directory): return os.listdir(directory) # partition? part = base while len(base): directory = '/sys/block/{base}/{part}/holders'.format(part=part, base=base) if os.path.exists(directory): return os.listdir(directory) base = base[:-1] return [] def verify_not_in_use(dev): """ Verify if a given device (path) is in use (e.g. mounted or in use by device-mapper). :raises: Error if device is in use. """ assert os.path.exists(dev) if is_partition(dev): if is_mounted(dev): raise Error('Device is mounted', dev) holders = is_held(dev) if holders: raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) else: basename = get_dev_name(os.path.realpath(dev)) for partname in list_partitions(basename): partition = get_dev_path(partname) if is_mounted(partition): raise Error('Device is mounted', partition) holders = is_held(partition) if holders: raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders)) def must_be_one_line(line): """ Checks if given line is really one single line. :raises: TruncatedLineError or TooManyLinesError :return: Content of the line, or None if line isn't valid. """ if line[-1:] != '\n': raise TruncatedLineError(line) line = line[:-1] if '\n' in line: raise TooManyLinesError(line) return line def read_one_line(parent, name): """ Read a file whose sole contents are a single line. Strips the newline. :return: Contents of the line, or None if file did not exist. """ path = os.path.join(parent, name) try: line = file(path, 'rb').read() except IOError as e: if e.errno == errno.ENOENT: return None else: raise try: line = must_be_one_line(line) except (TruncatedLineError, TooManyLinesError) as e: raise Error('File is corrupt: {path}: {msg}'.format( path=path, msg=e, )) return line def write_one_line(parent, name, text): """ Write a file whose sole contents are a single line. Adds a newline. """ path = os.path.join(parent, name) tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) with file(tmp, 'wb') as tmp_file: tmp_file.write(text + '\n') os.fsync(tmp_file.fileno()) os.rename(tmp, path) def check_osd_magic(path): """ Check that this path has the Ceph OSD magic. :raises: BadMagicError if this does not look like a Ceph OSD data dir. """ magic = read_one_line(path, 'magic') if magic is None: # probably not mkfs'ed yet raise BadMagicError(path) if magic != CEPH_OSD_ONDISK_MAGIC: raise BadMagicError(path) def check_osd_id(osd_id): """ Ensures osd id is numeric. """ if not re.match(r'^[0-9]+$', osd_id): raise Error('osd id is not numeric') def allocate_osd_id( cluster, fsid, keyring, ): """ Accocates an OSD id on the given cluster. :raises: Error if the call to allocate the OSD id fails. :return: The allocated OSD id. """ LOG.debug('Allocating OSD id...') try: osd_id = _check_output( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'osd', 'create', '--concise', fsid, ], ) except subprocess.CalledProcessError as e: raise Error('ceph osd create failed', e) osd_id = must_be_one_line(osd_id) check_osd_id(osd_id) return osd_id def get_osd_id(path): """ Gets the OSD id of the OSD at the given path. """ osd_id = read_one_line(path, 'whoami') if osd_id is not None: check_osd_id(osd_id) return osd_id def _check_output(*args, **kwargs): process = subprocess.Popen( stdout=subprocess.PIPE, *args, **kwargs) out, _ = process.communicate() ret = process.wait() if ret: cmd = kwargs.get("args") if cmd is None: cmd = args[0] #raise subprocess.CalledProcessError(ret, cmd, output=out) error = subprocess.CalledProcessError(ret, cmd) error.output = out raise error return out def get_conf(cluster, variable): """ Get the value of the given configuration variable from the cluster. :raises: Error if call to ceph-conf fails. :return: The variable value or None. """ try: process = subprocess.Popen( args=[ '/usr/bin/ceph-conf', '--cluster={cluster}'.format( cluster=cluster, ), '--name=osd.', '--lookup', variable, ], stdout=subprocess.PIPE, close_fds=True, ) except OSError as e: raise Error('error executing ceph-conf', e) (out, _err) = process.communicate() ret = process.wait() if ret == 1: # config entry not found return None elif ret != 0: raise Error('getting variable from configuration failed') value = str(out).split('\n', 1)[0] # don't differentiate between "var=" and no var set if not value: return None return value def get_conf_with_default(cluster, variable): """ Get a config value that is known to the C++ code. This will fail if called on variables that are not defined in common config options. """ try: out = _check_output( args=[ 'ceph-osd', '--cluster={cluster}'.format( cluster=cluster, ), '--show-config-value={variable}'.format( variable=variable, ), ], close_fds=True, ) except subprocess.CalledProcessError as e: raise Error( 'getting variable from configuration failed', e, ) value = str(out).split('\n', 1)[0] return value def get_fsid(cluster): """ Get the fsid of the cluster. :return: The fsid or raises Error. """ fsid = get_conf(cluster=cluster, variable='fsid') if fsid is None: raise Error('getting cluster uuid from configuration failed') return fsid def get_or_create_dmcrypt_key( _uuid, key_dir, ): """ Get path to dmcrypt key or create a new key file. :return: Path to the dmcrypt key file. """ path = os.path.join(key_dir, _uuid) # already have it? if os.path.exists(path): return path # make a new key try: if not os.path.exists(key_dir): os.makedirs(key_dir) with file('/dev/urandom', 'rb') as i: key = i.read(256) with file(path, 'wb') as key_file: key_file.write(key) return path except: raise Error('unable to read or create dm-crypt key', path) def dmcrypt_map( rawdev, keypath, _uuid, ): """ Maps a device to a dmcrypt device. :return: Path to the dmcrypt device. """ dev = '/dev/mapper/'+ _uuid args = [ 'cryptsetup', '--key-file', keypath, '--key-size', '256', 'create', _uuid, rawdev, ] try: subprocess.check_call(args) return dev except subprocess.CalledProcessError as e: raise Error('unable to map device', rawdev, e) def dmcrypt_unmap( _uuid ): """ Removes the dmcrypt device with the given UUID. """ args = [ 'cryptsetup', 'remove', _uuid ] try: subprocess.check_call(args) except subprocess.CalledProcessError as e: raise Error('unable to unmap device', _uuid, e) def mount( dev, fstype, options, ): """ Mounts a device with given filessystem type and mount options to a tempfile path under /var/lib/ceph/tmp. """ # pick best-of-breed mount options based on fs type if options is None: options = MOUNT_OPTIONS.get(fstype, '') # mount path = tempfile.mkdtemp( prefix='mnt.', dir='/var/lib/ceph/tmp', ) try: LOG.debug('Mounting %s on %s with options %s', dev, path, options) subprocess.check_call( args=[ 'mount', '-t', fstype, '-o', options, '--', dev, path, ], ) except subprocess.CalledProcessError as e: try: os.rmdir(path) except (OSError, IOError): pass raise MountError(e) return path def unmount( path, ): """ Unmount and removes the given mount point. """ try: LOG.debug('Unmounting %s', path) subprocess.check_call( args=[ '/bin/umount', '--', path, ], ) except subprocess.CalledProcessError as e: raise UnmountError(e) os.rmdir(path) ########################################### def get_free_partition_index(dev): """ Get the next free partition index on a given device. :return: Index number (> 1 if there is already a partition on the device) or 1 if there is no partition table. """ try: lines = _check_output( args=[ 'parted', '--machine', '--', dev, 'print', ], ) except subprocess.CalledProcessError as e: print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e return 1 if not lines: raise Error('parted failed to output anything') lines = str(lines).splitlines(True) # work around buggy libreadline(?) library in rhel/centos. idiot_prefix = '\x1b\x5b\x3f\x31\x30\x33\x34\x68'; if lines[0].startswith(idiot_prefix): lines[0] = lines[0][8:] if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: raise Error('weird parted units', lines[0]) del lines[0] if not lines[0].startswith('/dev/'): raise Error('weird parted disk entry', lines[0]) del lines[0] seen = set() for line in lines: idx, _ = line.split(':', 1) idx = int(idx) seen.add(idx) num = 1 while num in seen: num += 1 return num def zap(dev): """ Destroy the partition table and content of a given disk. """ try: LOG.debug('Zapping partition table on %s', dev) # try to wipe out any GPT partition table backups. sgdisk # isn't too thorough. lba_size = 4096 size = 33 * lba_size with file(dev, 'wb') as dev_file: dev_file.seek(-size, os.SEEK_END) dev_file.write(size*'\0') subprocess.check_call( args=[ 'sgdisk', '--zap-all', '--clear', '--mbrtogpt', '--', dev, ], ) except subprocess.CalledProcessError as e: raise Error(e) def prepare_journal_dev( data, journal, journal_size, journal_uuid, journal_dm_keypath, ): if is_partition(journal): LOG.debug('Journal %s is a partition', journal) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') return (journal, None, None) ptype = JOURNAL_UUID if journal_dm_keypath: ptype = DMCRYPT_JOURNAL_UUID # it is a whole disk. create a partition! num = None if journal == data: # we're sharing the disk between osd data and journal; # make journal be partition number 2, so it's pretty num = 2 journal_part = '{num}:0:{size}M'.format( num=num, size=journal_size, ) else: # sgdisk has no way for me to say "whatever is the next # free index number" when setting type guids etc, so we # need to awkwardly look up the next free number, and then # fix that in the call -- and hope nobody races with us; # then again nothing guards the partition table from races # anyway num = get_free_partition_index(dev=journal) journal_part = '{num}:0:+{size}M'.format( num=num, size=journal_size, ) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') try: LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal) subprocess.check_call( args=[ 'sgdisk', '--new={part}'.format(part=journal_part), '--change-name={num}:ceph journal'.format(num=num), '--partition-guid={num}:{journal_uuid}'.format( num=num, journal_uuid=journal_uuid, ), '--typecode={num}:{uuid}'.format( num=num, uuid=ptype, ), '--', journal, ], ) subprocess.call( args=[ # wait for udev event queue to clear 'udevadm', 'settle', ], ) journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( journal_uuid=journal_uuid, ) journal_dmcrypt = None if journal_dm_keypath: journal_dmcrypt = journal_symlink journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid) LOG.debug('Journal is GPT partition %s', journal_symlink) return (journal_symlink, journal_dmcrypt, journal_uuid) except subprocess.CalledProcessError as e: raise Error(e) def prepare_journal_file( journal, journal_size): if not os.path.exists(journal): LOG.debug('Creating journal file %s with size %dM', journal, journal_size) with file(journal, 'wb') as journal_file: journal_file.truncate(journal_size * 1048576) # FIXME: should we resize an existing journal file? LOG.debug('Journal is file %s', journal) LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') return (journal, None, None) def prepare_journal( data, journal, journal_size, journal_uuid, force_file, force_dev, journal_dm_keypath, ): if journal is None: if force_dev: raise Error('Journal is unspecified; not a block device') return (None, None, None) if not os.path.exists(journal): if force_dev: raise Error('Journal does not exist; not a block device', journal) return prepare_journal_file(journal, journal_size) jmode = os.stat(journal).st_mode if stat.S_ISREG(jmode): if force_dev: raise Error('Journal is not a block device', journal) return prepare_journal_file(journal, journal_size) if stat.S_ISBLK(jmode): if force_file: raise Error('Journal is not a regular file', journal) return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath) raise Error('Journal %s is neither a block device nor regular file', journal) def adjust_symlink(target, path): create = True if os.path.lexists(path): try: mode = os.lstat(path).st_mode if stat.S_ISREG(mode): LOG.debug('Removing old file %s', path) os.unlink(path) elif stat.S_ISLNK(mode): old = os.readlink(path) if old != target: LOG.debug('Removing old symlink %s -> %s', path, old) os.unlink(path) else: create = False except: raise Error('unable to remove (or adjust) old file (symlink)', path) if create: LOG.debug('Creating symlink %s -> %s', path, target) try: os.symlink(target, path) except: raise Error('unable to create symlink %s -> %s' % (path, target)) def prepare_dir( path, journal, cluster_uuid, osd_uuid, journal_uuid, journal_dmcrypt = None, ): LOG.debug('Preparing osd data dir %s', path) if osd_uuid is None: osd_uuid = str(uuid.uuid4()) if journal is not None: # we're using an external journal; point to it here adjust_symlink(journal, os.path.join(path, 'journal')) if journal_dmcrypt is not None: adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt')) else: try: os.unlink(os.path.join(path, 'journal_dmcrypt')) except OSError: pass write_one_line(path, 'ceph_fsid', cluster_uuid) write_one_line(path, 'fsid', osd_uuid) write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) if journal_uuid is not None: # i.e., journal is a tagged partition write_one_line(path, 'journal_uuid', journal_uuid) def prepare_dev( data, journal, fstype, mkfs_args, mount_options, cluster_uuid, osd_uuid, journal_uuid, journal_dmcrypt, osd_dm_keypath, ): """ Prepare a data/journal combination to be used for an OSD. The ``magic`` file is written last, so it's presence is a reliable indicator of the whole sequence having completed. WARNING: This will unconditionally overwrite anything given to it. """ ptype_tobe = TOBE_UUID ptype_osd = OSD_UUID if osd_dm_keypath: ptype_tobe = DMCRYPT_TOBE_UUID ptype_osd = DMCRYPT_OSD_UUID rawdev = None if is_partition(data): LOG.debug('OSD data device %s is a partition', data) rawdev = data else: LOG.debug('Creating osd partition on %s', data) try: subprocess.check_call( args=[ 'sgdisk', '--largest-new=1', '--change-name=1:ceph data', '--partition-guid=1:{osd_uuid}'.format( osd_uuid=osd_uuid, ), '--typecode=1:%s' % ptype_tobe, '--', data, ], ) subprocess.call( args=[ # wait for udev event queue to clear 'udevadm', 'settle', ], ) except subprocess.CalledProcessError as e: raise Error(e) rawdev = get_partition_dev(data, 1) dev = None if osd_dm_keypath: dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid) else: dev = rawdev try: args = [ 'mkfs', '-t', fstype, ] if mkfs_args is not None: args.extend(mkfs_args.split()) if fstype == 'xfs': args.extend(['-f']) # always force else: args.extend(MKFS_ARGS.get(fstype, [])) args.extend([ '--', dev, ]) try: LOG.debug('Creating %s fs on %s', fstype, dev) subprocess.check_call(args=args) except subprocess.CalledProcessError as e: raise Error(e) #remove whitespaces from mount_options if mount_options is not None: mount_options = "".join(mount_options.split()) path = mount(dev=dev, fstype=fstype, options=mount_options) try: prepare_dir( path=path, journal=journal, cluster_uuid=cluster_uuid, osd_uuid=osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, ) finally: unmount(path) finally: if rawdev != dev: dmcrypt_unmap(osd_uuid) if not is_partition(data): try: subprocess.check_call( args=[ 'sgdisk', '--typecode=1:%s' % ptype_osd, '--', data, ], ) except subprocess.CalledProcessError as e: raise Error(e) def main_prepare(args): journal_dm_keypath = None osd_dm_keypath = None try: prepare_lock.acquire() if not os.path.exists(args.data): raise Error('data path does not exist', args.data) # in use? dmode = os.stat(args.data).st_mode if stat.S_ISBLK(dmode): verify_not_in_use(args.data) if args.journal and os.path.exists(args.journal): jmode = os.stat(args.journal).st_mode if stat.S_ISBLK(jmode): verify_not_in_use(args.journal) if args.zap_disk is not None: if stat.S_ISBLK(dmode) and not is_partition(args.data): zap(args.data) else: raise Error('not full block device; cannot zap', args.data) if args.cluster_uuid is None: args.cluster_uuid = get_fsid(cluster=args.cluster) if args.cluster_uuid is None: raise Error( 'must have fsid in config or pass --cluster-uuid=', ) if args.fs_type is None: args.fs_type = get_conf( cluster=args.cluster, variable='osd_mkfs_type', ) if args.fs_type is None: args.fs_type = get_conf( cluster=args.cluster, variable='osd_fs_type', ) if args.fs_type is None: args.fs_type = DEFAULT_FS_TYPE mkfs_args = get_conf( cluster=args.cluster, variable='osd_mkfs_options_{fstype}'.format( fstype=args.fs_type, ), ) if mkfs_args is None: mkfs_args = get_conf( cluster=args.cluster, variable='osd_fs_mkfs_options_{fstype}'.format( fstype=args.fs_type, ), ) mount_options = get_conf( cluster=args.cluster, variable='osd_mount_options_{fstype}'.format( fstype=args.fs_type, ), ) if mount_options is None: mount_options = get_conf( cluster=args.cluster, variable='osd_fs_mount_options_{fstype}'.format( fstype=args.fs_type, ), ) journal_size = get_conf_with_default( cluster=args.cluster, variable='osd_journal_size', ) journal_size = int(journal_size) # colocate journal with data? if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None: LOG.info('Will colocate journal with data on %s', args.data) args.journal = args.data if args.journal_uuid is None: args.journal_uuid = str(uuid.uuid4()) if args.osd_uuid is None: args.osd_uuid = str(uuid.uuid4()) # dm-crypt keys? if args.dmcrypt: journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir) osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir) # prepare journal (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal( data=args.data, journal=args.journal, journal_size=journal_size, journal_uuid=args.journal_uuid, force_file=args.journal_file, force_dev=args.journal_dev, journal_dm_keypath=journal_dm_keypath, ) # prepare data if stat.S_ISDIR(dmode): if args.data_dev: raise Error('data path is not a block device', args.data) prepare_dir( path=args.data, journal=journal_symlink, cluster_uuid=args.cluster_uuid, osd_uuid=args.osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, ) elif stat.S_ISBLK(dmode): if args.data_dir: raise Error('data path is not a directory', args.data) prepare_dev( data=args.data, journal=journal_symlink, fstype=args.fs_type, mkfs_args=mkfs_args, mount_options=mount_options, cluster_uuid=args.cluster_uuid, osd_uuid=args.osd_uuid, journal_uuid=journal_uuid, journal_dmcrypt=journal_dmcrypt, osd_dm_keypath=osd_dm_keypath, ) else: raise Error('not a dir or block device', args.data) prepare_lock.release() if stat.S_ISBLK(dmode): # try to make sure the kernel refreshes the table. note # that if this gets ebusy, we are probably racing with # udev because it already updated it.. ignore failure here. LOG.debug('Calling partprobe on prepared device %s', args.data) subprocess.call( args=[ 'partprobe', args.data, ], ) except Error as e: if journal_dm_keypath: os.unlink(journal_dm_keypath) if osd_dm_keypath: os.unlink(osd_dm_keypath) prepare_lock.release() raise e ########################### def mkfs( path, cluster, osd_id, fsid, keyring, ): monmap = os.path.join(path, 'activate.monmap') subprocess.check_call( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'mon', 'getmap', '-o', monmap, ], ) subprocess.check_call( args=[ '/usr/bin/ceph-osd', '--cluster', cluster, '--mkfs', '--mkkey', '-i', osd_id, '--monmap', monmap, '--osd-data', path, '--osd-journal', os.path.join(path, 'journal'), '--osd-uuid', fsid, '--keyring', os.path.join(path, 'keyring'), ], ) # TODO ceph-osd --mkfs removes the monmap file? # os.unlink(monmap) def auth_key( path, cluster, osd_id, keyring, ): try: # try dumpling+ cap scheme subprocess.check_call( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), '-i', os.path.join(path, 'keyring'), 'osd', 'allow *', 'mon', 'allow profile osd', ], ) except subprocess.CalledProcessError as err: if err.errno == errno.EACCES: # try old cap scheme subprocess.check_call( args=[ '/usr/bin/ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), '-i', os.path.join(path, 'keyring'), 'osd', 'allow *', 'mon', 'allow rwx', ], ) else: raise def move_mount( dev, path, cluster, osd_id, fstype, mount_options, ): LOG.debug('Moving mount to final location...') parent = '/var/lib/ceph/osd' osd_data = os.path.join( parent, '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), ) maybe_mkdir(osd_data) # pick best-of-breed mount options based on fs type if mount_options is None: mount_options = MOUNT_OPTIONS.get(fstype, '') # we really want to mount --move, but that is not supported when # the parent mount is shared, as it is by default on RH, Fedora, # and probably others. Also, --bind doesn't properly manipulate # /etc/mtab, which *still* isn't a symlink to /proc/mounts despite # this being 2013. Instead, mount the original device at the final # location. subprocess.check_call( args=[ '/bin/mount', '-o', mount_options, '--', dev, osd_data, ], ) subprocess.check_call( args=[ '/bin/umount', '-l', # lazy, in case someone else is peeking at the # wrong moment '--', path, ], ) def start_daemon( cluster, osd_id, ): LOG.debug('Starting %s osd.%s...', cluster, osd_id) path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id) # upstart? try: if os.path.exists(os.path.join(path,'upstart')): subprocess.check_call( args=[ '/sbin/initctl', # use emit, not start, because start would fail if the # instance was already running 'emit', # since the daemon starting doesn't guarantee much about # the service being operational anyway, don't bother # waiting for it '--no-wait', '--', 'ceph-osd', 'cluster={cluster}'.format(cluster=cluster), 'id={osd_id}'.format(osd_id=osd_id), ], ) elif os.path.exists(os.path.join(path, 'sysvinit')): if os.path.exists('/usr/sbin/service'): svc = '/usr/sbin/service' else: svc = '/sbin/service' subprocess.check_call( args=[ svc, 'ceph', 'start', 'osd.{osd_id}'.format(osd_id=osd_id), ], ) else: raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format( cluster=cluster, osd_id=osd_id, )) except subprocess.CalledProcessError as e: raise Error('ceph osd start failed', e) def detect_fstype( dev, ): fstype = _check_output( args=[ '/sbin/blkid', # we don't want stale cached results '-p', '-s', 'TYPE', '-o' 'value', '--', dev, ], ) fstype = must_be_one_line(fstype) return fstype def mount_activate( dev, activate_key_template, init, ): try: fstype = detect_fstype(dev=dev) except (subprocess.CalledProcessError, TruncatedLineError, TooManyLinesError) as e: raise FilesystemTypeError( 'device {dev}'.format(dev=dev), e, ) # TODO always using mount options from cluster=ceph for # now; see http://tracker.newdream.net/issues/3253 mount_options = get_conf( cluster='ceph', variable='osd_mount_options_{fstype}'.format( fstype=fstype, ), ) if mount_options is None: mount_options = get_conf( cluster='ceph', variable='osd_fs_mount_options_{fstype}'.format( fstype=fstype, ), ) #remove whitespaces from mount_options if mount_options is not None: mount_options = "".join(mount_options.split()) path = mount(dev=dev, fstype=fstype, options=mount_options) osd_id = None cluster = None try: (osd_id, cluster) = activate(path, activate_key_template, init) # check if the disk is already active, or if something else is already # mounted there active = False other = False src_dev = os.stat(path).st_dev try: dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id)).st_dev if src_dev == dst_dev: active = True else: parent_dev = os.stat('/var/lib/ceph/osd').st_dev if dst_dev != parent_dev: other = True elif os.listdir('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id, )): other = True except OSError: pass if active: LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id)) unmount(path) elif other: raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id)) else: move_mount( dev=dev, path=path, cluster=cluster, osd_id=osd_id, fstype=fstype, mount_options=mount_options, ) return (cluster, osd_id) except: LOG.error('Failed to activate') unmount(path) raise finally: # remove our temp dir if os.path.exists(path): os.rmdir(path) def activate_dir( path, activate_key_template, init, ): if not os.path.exists(path): raise Error( 'directory %s does not exist' % path ) (osd_id, cluster) = activate(path, activate_key_template, init) canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id) if path != canonical: # symlink it from the proper location create = True if os.path.lexists(canonical): old = os.readlink(canonical) if old != path: LOG.debug('Removing old symlink %s -> %s', canonical, old) try: os.unlink(canonical) except: raise Error('unable to remove old symlink %s', canonical) else: create = False if create: LOG.debug('Creating symlink %s -> %s', canonical, path) try: os.symlink(path, canonical) except: raise Error('unable to create symlink %s -> %s', canonical, path) return (cluster, osd_id) def find_cluster_by_uuid(_uuid): """ Find a cluster name by searching /etc/ceph/*.conf for a conf file with the right uuid. """ no_fsid = [] if not os.path.exists('/etc/ceph'): return None for conf_file in os.listdir('/etc/ceph'): if not conf_file.endswith('.conf'): continue cluster = conf_file[:-5] fsid = get_conf(cluster, 'fsid') if fsid is None: no_fsid.append(cluster) elif fsid == _uuid: return cluster # be tolerant of /etc/ceph/ceph.conf without an fsid defined. if len(no_fsid) == 1 and no_fsid[0] == 'ceph': LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway') return 'ceph' return None def activate( path, activate_key_template, init, ): try: check_osd_magic(path) ceph_fsid = read_one_line(path, 'ceph_fsid') if ceph_fsid is None: raise Error('No cluster uuid assigned.') LOG.debug('Cluster uuid is %s', ceph_fsid) cluster = find_cluster_by_uuid(ceph_fsid) if cluster is None: raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid) LOG.debug('Cluster name is %s', cluster) fsid = read_one_line(path, 'fsid') if fsid is None: raise Error('No OSD uuid assigned.') LOG.debug('OSD uuid is %s', fsid) keyring = activate_key_template.format(cluster=cluster) osd_id = get_osd_id(path) if osd_id is None: osd_id = allocate_osd_id( cluster=cluster, fsid=fsid, keyring=keyring, ) write_one_line(path, 'whoami', osd_id) LOG.debug('OSD id is %s', osd_id) if not os.path.exists(os.path.join(path, 'ready')): LOG.debug('Initializing OSD...') # re-running mkfs is safe, so just run until it completes mkfs( path=path, cluster=cluster, osd_id=osd_id, fsid=fsid, keyring=keyring, ) if init is not None: if init == 'auto': conf_val = get_conf( cluster=cluster, variable='init' ) if conf_val is not None: init = conf_val else: (distro, release, codename) = platform.dist() if distro == 'Ubuntu': init = 'upstart' else: init = 'sysvinit' LOG.debug('Marking with init system %s', init) with file(os.path.join(path, init), 'w'): pass # remove markers for others, just in case. for other in INIT_SYSTEMS: if other != init: try: os.unlink(os.path.join(path, other)) except OSError: pass if not os.path.exists(os.path.join(path, 'active')): LOG.debug('Authorizing OSD key...') auth_key( path=path, cluster=cluster, osd_id=osd_id, keyring=keyring, ) write_one_line(path, 'active', 'ok') LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path) return (osd_id, cluster) except: raise def main_activate(args): cluster = None osd_id = None if not os.path.exists(args.path): raise Error('%s does not exist', args.path) if is_suppressed(args.path): LOG.info('suppressed activate request on %s', args.path) return activate_lock.acquire() try: mode = os.stat(args.path).st_mode if stat.S_ISBLK(mode): (cluster, osd_id) = mount_activate( dev=args.path, activate_key_template=args.activate_key_template, init=args.mark_init, ) elif stat.S_ISDIR(mode): (cluster, osd_id) = activate_dir( path=args.path, activate_key_template=args.activate_key_template, init=args.mark_init, ) else: raise Error('%s is not a directory or block device', args.path) start_daemon( cluster=cluster, osd_id=osd_id, ) activate_lock.release() except: activate_lock.release() ########################### def get_journal_osd_uuid(path): if not os.path.exists(path): raise Error('%s does not exist', path) mode = os.stat(path).st_mode if not stat.S_ISBLK(mode): raise Error('%s is not a block device', path) try: out = _check_output( args=[ 'ceph-osd', '-i', '0', # this is ignored '--get-journal-uuid', '--osd-journal', path, ], close_fds=True, ) except subprocess.CalledProcessError as e: raise Error( 'failed to get osd uuid/fsid from journal', e, ) value = str(out).split('\n', 1)[0] LOG.debug('Journal %s has OSD UUID %s', path, value) return value def main_activate_journal(args): if not os.path.exists(args.dev): raise Error('%s does not exist', args.dev) cluster = None osd_id = None osd_uuid = None activate_lock.acquire() try: osd_uuid = get_journal_osd_uuid(args.dev) path = os.path.join('/dev/disk/by-partuuid/', osd_uuid.lower()) (cluster, osd_id) = mount_activate( dev=path, activate_key_template=args.activate_key_template, init=args.mark_init, ) start_daemon( cluster=cluster, osd_id=osd_id, ) activate_lock.release() except: activate_lock.release() raise ########################### def main_activate_all(args): dir = '/dev/disk/by-parttypeuuid' LOG.debug('Scanning %s', dir) if not os.path.exists(dir): return err = False for name in os.listdir(dir): if name.find('.') < 0: continue (tag, uuid) = name.split('.') if tag == OSD_UUID: path = os.path.join(dir, name) LOG.info('Activating %s', path) activate_lock.acquire() try: (cluster, osd_id) = mount_activate( dev=path, activate_key_template=args.activate_key_template, init=args.mark_init, ) start_daemon( cluster=cluster, osd_id=osd_id, ) except Exception as e: print >> sys.stderr, '{prog}: {msg}'.format( prog=args.prog, msg=e, ) err = True finally: activate_lock.release() if err: raise Error('One or more partitions failed to activate') ########################### def is_swap(dev): dev = os.path.realpath(dev) with file('/proc/swaps', 'rb') as proc_swaps: for line in proc_swaps.readlines()[1:]: fields = line.split() if len(fields) < 3: continue swaps_dev = fields[0] if swaps_dev.startswith('/') and os.path.exists(swaps_dev): swaps_dev = os.path.realpath(swaps_dev) if swaps_dev == dev: return True return False def get_oneliner(base, name): path = os.path.join(base, name) if os.path.isfile(path): with open(path, 'r') as _file: return _file.readline().rstrip() return None def get_dev_fs(dev): fscheck = subprocess.Popen( [ 'blkid', '-s', 'TYPE', dev ], stdout = subprocess.PIPE, stderr=subprocess.PIPE).stdout.read() if 'TYPE' in fscheck: fstype = fscheck.split()[1].split('"')[1] return fstype else: return None def get_partition_type(part): (base, partnum) = re.match('(\D+)(\d+)', part).group(1, 2) sgdisk = subprocess.Popen( [ 'sgdisk', '-p', base, ], stdout = subprocess.PIPE, stderr = subprocess.PIPE).stdout.read() for line in sgdisk.splitlines(): m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line) if m is not None: num = m.group(1) if num != partnum: continue return m.group(2) return None def get_partition_uuid(dev): (base, partnum) = re.match('(\D+)(\d+)', dev).group(1, 2) out = subprocess.Popen( [ 'sgdisk', '-i', partnum, base ], stdout = subprocess.PIPE, stderr = subprocess.PIPE).stdout.read() for line in out.splitlines(): m = re.match('Partition unique GUID: (\S+)', line) if m: return m.group(1).lower() return None def more_osd_info(path, uuid_map): desc = [] ceph_fsid = get_oneliner(path, 'ceph_fsid') if ceph_fsid: cluster = find_cluster_by_uuid(ceph_fsid) if cluster: desc.append('cluster ' + cluster) else: desc.append('unknown cluster ' + ceph_fsid) who = get_oneliner(path, 'whoami') if who: desc.append('osd.%s' % who) journal_uuid = get_oneliner(path, 'journal_uuid') if journal_uuid: journal_uuid = journal_uuid.lower() if journal_uuid in uuid_map: desc.append('journal %s' % uuid_map[journal_uuid]) return desc def list_dev(dev, uuid_map, journal_map): ptype = 'unknown' prefix = '' if is_partition(dev): ptype = get_partition_type(dev) prefix = ' ' fs_type = get_dev_fs(dev) path = is_mounted(dev) desc = [] if ptype == 'ceph data': if path: desc.append('active') desc.extend(more_osd_info(path, uuid_map)) elif fs_type: try: tpath = mount(dev=dev, fstype=fs_type, options='') if tpath: try: magic = get_oneliner(tpath, 'magic') if magic is not None: desc.append('prepared') desc.extend(more_osd_info(tpath, uuid_map)) finally: unmount(tpath) except MountError: pass if desc: desc = ['ceph data'] + desc else: desc = ['ceph data', 'unprepared'] elif ptype == 'ceph journal': desc.append('ceph journal') part_uuid = get_partition_uuid(dev) if part_uuid and part_uuid in journal_map: desc.append('for %s' % journal_map[part_uuid]) else: if is_swap(dev): desc.append('swap') else: desc.append('other') if fs_type: desc.append(fs_type) elif ptype: desc.append(ptype) if path: desc.append('mounted on %s' % path) print '%s%s %s' % (prefix, dev, ', '.join(desc)) def main_list(args): partmap = list_all_partitions() uuid_map = {} journal_map = {} for base, parts in sorted(partmap.iteritems()): for p in parts: dev = get_dev_path(p) part_uuid = get_partition_uuid(dev) if part_uuid: uuid_map[part_uuid] = dev ptype = get_partition_type(dev) if ptype == 'ceph data': fs_type = get_dev_fs(dev) try: tpath = mount(dev=dev, fstype=fs_type, options='') try: journal_uuid = get_oneliner(tpath, 'journal_uuid') if journal_uuid: journal_map[journal_uuid.lower()] = dev finally: unmount(tpath) except MountError: pass for base, parts in sorted(partmap.iteritems()): if parts: print '%s :' % get_dev_path(base) for p in sorted(parts): list_dev(get_dev_path(p), uuid_map, journal_map) else: list_dev(get_dev_path(base), uuid_map, journal_map) ########################### # # Mark devices that we want to suppress activates on with a # file like # # /var/lib/ceph/tmp/suppress-activate.sdb # # where the last bit is the sanitized device name (/dev/X without the # /dev/ prefix) and the is_suppress() check matches a prefix. That # means suppressing sdb will stop activate on sdb1, sdb2, etc. # SUPPRESS_PREFIX = '/var/lib/ceph/tmp/suppress-activate.' def is_suppressed(path): disk = os.path.realpath(path) try: if not disk.startswith('/dev/') or not stat.S_ISBLK(os.lstat(path).st_mode): return False base = get_dev_name(disk) while len(base): if os.path.exists(SUPPRESS_PREFIX + base): return True base = base[:-1] except: return False def set_suppress(path): disk = os.path.realpath(path) if not os.path.exists(disk): raise Error('does not exist', path) if not stat.S_ISBLK(os.lstat(path).st_mode): raise Error('not a block device', path) base = get_dev_name(disk) with file(SUPPRESS_PREFIX + base, 'w') as f: pass LOG.info('set suppress flag on %s', base) def unset_suppress(path): disk = os.path.realpath(path) if not os.path.exists(disk): raise Error('does not exist', path) if not stat.S_ISBLK(os.lstat(path).st_mode): raise Error('not a block device', path) assert disk.startswith('/dev/') base = get_dev_name(disk) fn = SUPPRESS_PREFIX + base if not os.path.exists(fn): raise Error('not marked as suppressed', path) try: os.unlink(fn) LOG.info('unset suppress flag on %s', base) except OSError as e: raise Error('failed to unsuppress', e) def main_suppress(args): set_suppress(args.path) def main_unsuppress(args): unset_suppress(args.path) def main_zap(args): for dev in args.dev: zap(dev) ########################### def parse_args(): parser = argparse.ArgumentParser( 'ceph-disk', ) parser.add_argument( '-v', '--verbose', action='store_true', default=None, help='be more verbose', ) parser.set_defaults( # we want to hold on to this, for later prog=parser.prog, cluster='ceph', ) subparsers = parser.add_subparsers( title='subcommands', description='valid subcommands', help='sub-command help', ) prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD') prepare_parser.add_argument( '--cluster', metavar='NAME', help='cluster name to assign this disk to', ) prepare_parser.add_argument( '--cluster-uuid', metavar='UUID', help='cluster uuid to assign this disk to', ) prepare_parser.add_argument( '--osd-uuid', metavar='UUID', help='unique OSD uuid to assign this disk to', ) prepare_parser.add_argument( '--journal-uuid', metavar='UUID', help='unique uuid to assign to the journal', ) prepare_parser.add_argument( '--fs-type', help='file system type to use (e.g. "ext4")', ) prepare_parser.add_argument( '--zap-disk', action='store_true', default=None, help='destroy the partition table (and content) of a disk', ) prepare_parser.add_argument( '--data-dir', action='store_true', default=None, help='verify that DATA is a dir', ) prepare_parser.add_argument( '--data-dev', action='store_true', default=None, help='verify that DATA is a block device', ) prepare_parser.add_argument( '--journal-file', action='store_true', default=None, help='verify that JOURNAL is a file', ) prepare_parser.add_argument( '--journal-dev', action='store_true', default=None, help='verify that JOURNAL is a block device', ) prepare_parser.add_argument( '--dmcrypt', action='store_true', default=None, help='encrypt DATA and/or JOURNAL devices with dm-crypt', ) prepare_parser.add_argument( '--dmcrypt-key-dir', metavar='KEYDIR', default='/etc/ceph/dmcrypt-keys', help='directory where dm-crypt keys are stored', ) prepare_parser.add_argument( 'data', metavar='DATA', help='path to OSD data (a disk block device or directory)', ) prepare_parser.add_argument( 'journal', metavar='JOURNAL', nargs='?', help=('path to OSD journal disk block device;' + ' leave out to store journal in file'), ) prepare_parser.set_defaults( func=main_prepare, ) activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD') activate_parser.add_argument( '--mount', action='store_true', default=None, help='mount a block device [deprecated, ignored]', ) activate_parser.add_argument( '--activate-key', metavar='PATH', help='bootstrap-osd keyring path template (%(default)s)', dest='activate_key_template', ) activate_parser.add_argument( '--mark-init', metavar='INITSYSTEM', help='init system to manage this dir', default='auto', choices=INIT_SYSTEMS, ) activate_parser.add_argument( 'path', metavar='PATH', nargs='?', help='path to block device or directory', ) activate_parser.set_defaults( activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', func=main_activate, ) activate_journal_parser = subparsers.add_parser('activate-journal', help='Activate an OSD via its journal device') activate_journal_parser.add_argument( 'dev', metavar='DEV', help='path to journal block device', ) activate_journal_parser.add_argument( '--activate-key', metavar='PATH', help='bootstrap-osd keyring path template (%(default)s)', dest='activate_key_template', ) activate_journal_parser.add_argument( '--mark-init', metavar='INITSYSTEM', help='init system to manage this dir', default='auto', choices=INIT_SYSTEMS, ) activate_journal_parser.set_defaults( activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', func=main_activate_journal, ) activate_all_parser = subparsers.add_parser('activate-all', help='Activate all tagged OSD partitions') activate_all_parser.add_argument( '--activate-key', metavar='PATH', help='bootstrap-osd keyring path template (%(default)s)', dest='activate_key_template', ) activate_all_parser.add_argument( '--mark-init', metavar='INITSYSTEM', help='init system to manage this dir', default='auto', choices=INIT_SYSTEMS, ) activate_all_parser.set_defaults( activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', func=main_activate_all, ) list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs') list_parser.set_defaults( func=main_list, ) suppress_parser = subparsers.add_parser('suppress-activate', help='Suppress activate on a device (prefix)') suppress_parser.add_argument( 'path', metavar='PATH', nargs='?', help='path to block device or directory', ) suppress_parser.set_defaults( func=main_suppress, ) unsuppress_parser = subparsers.add_parser('unsuppress-activate', help='Stop suppressing activate on a device (prefix)') unsuppress_parser.add_argument( 'path', metavar='PATH', nargs='?', help='path to block device or directory', ) unsuppress_parser.set_defaults( func=main_unsuppress, ) zap_parser = subparsers.add_parser('zap', help='Zap/erase/destroy a device\'s partition table (and contents)') zap_parser.add_argument( 'dev', metavar='DEV', nargs='*', help='path to block device', ) zap_parser.set_defaults( func=main_zap, ) args = parser.parse_args() return args def main(): args = parse_args() loglevel = logging.INFO if args.verbose: loglevel = logging.DEBUG logging.basicConfig( level=loglevel, ) try: args.func(args) except Error as e: print >> sys.stderr, '{prog}: {msg}'.format( prog=args.prog, msg=e, ) sys.exit(1) if __name__ == '__main__': main()