#!/usr/bin/python import argparse import logging import os import os.path import subprocess import sys import tempfile import uuid log_name = __name__ if log_name == '__main__': log_name = os.path.basename(sys.argv[0]) log = logging.getLogger(log_name) class PrepareError(Exception): """ OSD preparation error """ def __str__(self): doc = self.__doc__.strip() return ': '.join([doc] + [str(a) for a in self.args]) class MountError(PrepareError): """ Mounting filesystem failed """ class UnmountError(PrepareError): """ Unmounting filesystem failed """ def write_one_line(parent, name, text): """ Write a file whose sole contents are a single line. Adds a newline. """ path = os.path.join(parent, name) tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) with file(tmp, 'wb') as f: f.write(text + '\n') os.fsync(f.fileno()) os.rename(tmp, path) CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' # TODO depend on python2.7 def _check_output(*args, **kwargs): process = subprocess.Popen( stdout=subprocess.PIPE, *args, **kwargs) out, _ = process.communicate() ret = process.wait() if ret: cmd = kwargs.get("args") if cmd is None: cmd = args[0] raise subprocess.CalledProcessError(ret, cmd, output=out) return out def get_conf(cluster, variable): try: p = subprocess.Popen( args=[ 'ceph-conf', '--cluster={cluster}'.format( cluster=cluster, ), '--name=osd.', '--lookup', variable, ], stdout=subprocess.PIPE, close_fds=True, ) except OSError as e: raise PrepareError('error executing ceph-conf', e) (out, _err) = p.communicate() ret = p.wait() if ret == 1: # config entry not found return None elif ret != 0: raise PrepareError('getting variable from configuration failed') value = out.split('\n', 1)[0] # don't differentiate between "var=" and no var set if not value: return None return value def get_conf_with_default(cluster, variable): """ Get a config value that is known to the C++ code. This will fail if called on variables that are not defined in common config options. """ try: out = _check_output( args=[ 'ceph-osd', '--cluster={cluster}'.format( cluster=cluster, ), '--show-config-value={variable}'.format( variable=variable, ), ], close_fds=True, ) except subprocess.CalledProcessError as e: raise PrepareError( 'getting variable from configuration failed', e, ) value = out.split('\n', 1)[0] return value def get_fsid(cluster): fsid = get_conf(cluster=cluster, variable='fsid') if fsid is None: raise PrepareError('getting cluster uuid from configuration failed') return fsid DEFAULT_FS_TYPE = 'xfs' MOUNT_OPTIONS = dict( btrfs='noatime,user_subvol_rm_allowed', ext4='noatime,user_xattr', xfs='noatime', ) MKFS_ARGS = dict( btrfs=[ '-m', 'single', '-l', '32768', '-n', '32768', ], xfs=[ # xfs insists on not overwriting previous fs; even if we wipe # partition table, we often recreate it exactly the same way, # so we'll see ghosts of filesystems past '-f', '-i', 'size=2048', ], ) def mount( dev, fstype, options, ): # pick best-of-breed mount options based on fs type if options is None: options = MOUNT_OPTIONS.get(fstype, '') # mount path = tempfile.mkdtemp( prefix='mnt.', dir='/var/lib/ceph/tmp', ) try: subprocess.check_call( args=[ 'mount', '-o', options, '--', dev, path, ], ) except subprocess.CalledProcessError as e: try: os.rmdir(path) except (OSError, IOError): pass raise MountError(e) return path def unmount( path, ): try: subprocess.check_call( args=[ 'umount', '--', path, ], ) except subprocess.CalledProcessError as e: raise UnmountError(e) os.rmdir(path) def get_free_partition_index(dev): try: lines = _check_output( args=[ 'parted', '--machine', '--', dev, 'print', ], ) except subprocess.CalledProcessError as e: print 'cannot read partition index; assume it isn\'t present\n' return 1 if not lines: raise PrepareError('parted failed to output anything') lines = lines.splitlines(True) if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: raise PrepareError('weird parted units', lines[0]) del lines[0] if not lines[0].startswith('/dev/'): raise PrepareError('weird parted disk entry', lines[0]) del lines[0] seen = set() for line in lines: idx, _ = line.split(':', 1) idx = int(idx) seen.add(idx) num = 1 while num in seen: num += 1 return num def prepare( disk, journal, journal_size, fstype, mkfs_args, mount_options, cluster_uuid, ): """ Prepare a disk to be used as an OSD data disk. The ``magic`` file is written last, so it's presence is a reliable indicator of the whole sequence having completed. WARNING: This will unconditionally overwrite anything given to it. """ try: # this kills the crab subprocess.check_call( args=[ 'sgdisk', '--zap-all', '--clear', '--mbrtogpt', '--', disk, ], ) except subprocess.CalledProcessError as e: raise PrepareError(e) osd_uuid = str(uuid.uuid4()) # store the partition uuid iff using external journal journal_uuid = None if journal is not None: journal_uuid = str(uuid.uuid4()) if journal == disk: # we're sharing the disk between osd data and journal; # make journal be partition number 2, so it's pretty; put # journal at end of free space so partitioning tools don't # reorder them suddenly num = 2 journal_part = '{num}:-{size}M:0'.format( num=num, size=journal_size, ) else: # sgdisk has no way for me to say "whatever is the next # free index number" when setting type guids etc, so we # need to awkwardly look up the next free number, and then # fix that in the call -- and hope nobody races with us; # then again nothing guards the partition table from races # anyway num = get_free_partition_index(dev=journal) journal_part = '{num}:0:+{size}M'.format( num=num, size=journal_size, ) try: subprocess.check_call( args=[ 'sgdisk', '--new={part}'.format(part=journal_part), '--change-name={num}:ceph journal'.format(num=num), '--partition-guid={num}:{journal_uuid}'.format( num=num, journal_uuid=journal_uuid, ), '--typecode={num}:{uuid}'.format( num=num, uuid=JOURNAL_UUID, ), '--', journal, ], ) subprocess.check_call( args=[ # also make sure the kernel refreshes the new table 'partprobe', journal, ], ) except subprocess.CalledProcessError as e: raise PrepareError(e) try: subprocess.check_call( args=[ 'sgdisk', '--largest-new=1', '--change-name=1:ceph data', '--partition-guid=1:{osd_uuid}'.format( osd_uuid=osd_uuid, ), '--typecode=1:89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be', '--', disk, ], ) subprocess.check_call( args=[ # also make sure the kernel refreshes the new table 'partprobe', disk, ], ) except subprocess.CalledProcessError as e: raise PrepareError(e) dev = '{disk}1'.format(disk=disk) args = [ 'mkfs', '--type={fstype}'.format(fstype=fstype), ] args.extend(MKFS_ARGS.get(fstype, [])) if mkfs_args is not None: args.extend(mkfs_args.split()) args.extend args.extend([ '--', dev, ]) try: subprocess.check_call(args=args) except subprocess.CalledProcessError as e: raise PrepareError(e) path = mount(dev=dev, fstype=fstype, options=mount_options) try: if journal_uuid is not None: # we're using an external journal; point to it here os.symlink( '/dev/disk/by-partuuid/{journal_uuid}'.format( journal_uuid=journal_uuid, ), os.path.join(path, 'journal'), ) write_one_line(path, 'ceph_fsid', cluster_uuid) write_one_line(path, 'fsid', osd_uuid) write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) finally: unmount(path) try: subprocess.check_call( args=[ 'sgdisk', '--typecode=1:4fbd7e29-9d25-41b8-afd0-062c0ceff05d', '--', disk, ], ) except subprocess.CalledProcessError as e: raise PrepareError(e) def parse_args(): parser = argparse.ArgumentParser( description='Prepare a disk for a Ceph OSD', ) parser.add_argument( '-v', '--verbose', action='store_true', default=None, help='be more verbose', ) parser.add_argument( '--cluster', metavar='NAME', help='cluster name to assign this disk to', ) parser.add_argument( '--cluster-uuid', metavar='UUID', help='cluster uuid to assign this disk to', ) parser.add_argument( '--fs-type', help='file system type to use (e.g. "ext4")', ) parser.add_argument( 'disk', metavar='DISK', help='path to OSD data disk block device', ) parser.add_argument( 'journal', metavar='JOURNAL', nargs='?', help=('path to OSD journal disk block device;' + ' leave out to store journal in file'), ) parser.set_defaults( # we want to hold on to this, for later prog=parser.prog, cluster='ceph', ) args = parser.parse_args() return args def main(): args = parse_args() loglevel = logging.INFO if args.verbose: loglevel = logging.DEBUG logging.basicConfig( level=loglevel, ) try: if args.cluster_uuid is None: args.cluster_uuid = get_fsid(cluster=args.cluster) if args.cluster_uuid is None: raise PrepareError( 'must have fsid in config or pass --cluster--uuid=', ) if args.fs_type is None: args.fs_type = get_conf( cluster=args.cluster, variable='osd_fs_type', ) if args.fs_type is None: args.fs_type = DEFAULT_FS_TYPE mkfs_args = get_conf( cluster=args.cluster, variable='osd_fs_mkfs_arguments_{fstype}'.format( fstype=args.fs_type, ), ) mount_options = get_conf( cluster=args.cluster, variable='osd_fs_mount_options_{fstype}'.format( fstype=args.fs_type, ), ) journal_size = get_conf_with_default( cluster=args.cluster, variable='osd_journal_size', ) journal_size = int(journal_size) prepare( disk=args.disk, journal=args.journal, journal_size=journal_size, fstype=args.fs_type, mkfs_args=mkfs_args, mount_options=mount_options, cluster_uuid=args.cluster_uuid, ) except PrepareError as e: print >>sys.stderr, '{prog}: {msg}'.format( prog=args.prog, msg=e, ) sys.exit(1) if __name__ == '__main__': main()