#!/usr/bin/python import argparse import errno import logging import os import os.path import re import subprocess import sys import tempfile log_name = __name__ if log_name == '__main__': log_name = os.path.basename(sys.argv[0]) log = logging.getLogger(log_name) class ActivateError(Exception): """ OSD activation error """ def __str__(self): doc = self.__doc__.strip() return ': '.join([doc] + [str(a) for a in self.args]) class BadMagicError(ActivateError): """ Does not look like a Ceph OSD, or incompatible version """ class TruncatedLineError(ActivateError): """ Line is truncated """ class TooManyLinesError(ActivateError): """ Too many lines """ class FilesystemTypeError(ActivateError): """ Cannot discover filesystem type """ class MountError(ActivateError): """ Mounting filesystem failed """ class UnmountError(ActivateError): """ Unmounting filesystem failed """ def maybe_mkdir(*a, **kw): try: os.mkdir(*a, **kw) except OSError, e: if e.errno == errno.EEXIST: pass else: raise def must_be_one_line(line): if line[-1:] != '\n': raise TruncatedLineError(line) line = line[:-1] if '\n' in line: raise TooManyLinesError(line) return line def read_one_line(parent, name): """ Read a file whose sole contents are a single line. Strips the newline. :return: Contents of the line, or None if file did not exist. """ path = os.path.join(parent, name) try: line = file(path, 'rb').read() except IOError as e: if e.errno == errno.ENOENT: return None else: raise try: line = must_be_one_line(line) except (TruncatedLineError, TooManyLinesError) as e: raise ActivateError('File is corrupt: {path}: {msg}'.format( path=path, msg=e, )) return line def write_one_line(parent, name, text): """ Write a file whose sole contents are a single line. Adds a newline. """ path = os.path.join(parent, name) tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) with file(tmp, 'wb') as f: f.write(text + '\n') os.fsync(f.fileno()) os.rename(tmp, path) CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' def check_osd_magic(path): """ Check that this path has the Ceph OSD magic. :raises: BadMagicError if this does not look like a Ceph OSD data dir. """ magic = read_one_line(path, 'magic') if magic is None: # probably not mkfs'ed yet raise BadMagicError(path) if magic != CEPH_OSD_ONDISK_MAGIC: raise BadMagicError(path) def check_osd_id(osd_id): """ Ensures osd id is numeric. """ if not re.match(r'^[0-9]+$', osd_id): raise ActivateError('osd id is not numeric') def get_osd_id(path): osd_id = read_one_line(path, 'whoami') if osd_id is not None: check_osd_id(osd_id) return osd_id # TODO depend on python2.7 def _check_output(*args, **kwargs): process = subprocess.Popen( stdout=subprocess.PIPE, *args, **kwargs) out, _ = process.communicate() ret = process.wait() if ret: cmd = kwargs.get("args") if cmd is None: cmd = args[0] raise subprocess.CalledProcessError(ret, cmd, output=out) return out def allocate_osd_id( cluster, fsid, keyring, ): log.debug('Allocating OSD id...') try: osd_id = _check_output( args=[ 'ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'osd', 'create', '--concise', fsid, ], ) except subprocess.CalledProcessError as e: raise ActivateError('ceph osd create failed', e) osd_id = must_be_one_line(osd_id) check_osd_id(osd_id) return osd_id def mkfs( path, cluster, osd_id, fsid, keyring, ): monmap = os.path.join(path, 'activate.monmap') subprocess.check_call( args=[ 'ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'mon', 'getmap', '-o', monmap, ], ) subprocess.check_call( args=[ 'ceph-osd', '--cluster', cluster, '--mkfs', '--mkkey', '-i', osd_id, '--monmap', monmap, '--osd-data', path, '--osd-journal', os.path.join(path, 'journal'), '--osd-uuid', fsid, '--keyring', os.path.join(path, 'keyring'), ], ) # TODO ceph-osd --mkfs removes the monmap file? # os.unlink(monmap) def auth_key( path, cluster, osd_id, keyring, ): subprocess.check_call( args=[ 'ceph', '--cluster', cluster, '--name', 'client.bootstrap-osd', '--keyring', keyring, 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), '-i', os.path.join(path, 'keyring'), 'osd', 'allow *', 'mon', 'allow rwx', ], ) def move_mount( path, cluster, osd_id, ): log.debug('Moving mount to final location...') parent = '/var/lib/ceph/osd' osd_data = os.path.join( parent, '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), ) maybe_mkdir(osd_data) subprocess.check_call( args=[ 'mount', '--move', '--', path, osd_data, ], ) def upstart_start( cluster, osd_id, ): log.debug('Starting service...') subprocess.check_call( args=[ 'initctl', # use emit, not start, because start would fail if the # instance was already running 'emit', # since the daemon starting doesn't guarantee much about # the service being operational anyway, don't bother # waiting for it '--no-wait', '--', 'ceph-osd', 'cluster={cluster}'.format(cluster=cluster), 'id={osd_id}'.format(osd_id=osd_id), ], ) def detect_fstype( dev, ): fstype = _check_output( args=[ 'blkid', # we don't want stale cached results '-p', '-s', 'TYPE', '-o' 'value', '--', dev, ], ) fstype = must_be_one_line(fstype) return fstype def get_conf(cluster, variable): try: p = subprocess.Popen( args=[ 'ceph-conf', '--cluster={cluster}'.format( cluster=cluster, ), '--name=osd.', '--lookup', variable, ], stdout=subprocess.PIPE, close_fds=True, ) except OSError as e: raise ActivateError('error executing ceph-conf', e) (out, _err) = p.communicate() ret = p.wait() if ret == 1: # config entry not found return None elif ret != 0: raise ActivateError('getting variable from configuration failed') value = out.split('\n', 1)[0] # don't differentiate between "var=" and no var set if not value: return None return value MOUNT_OPTIONS = dict( btrfs='noatime,user_subvol_rm_allowed', # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll # delay a moment before removing it fully because we did have some # issues with ext4 before the xatts-in-leveldb work, and it seemed # that user_xattr helped ext4='noatime,user_xattr', xfs='noatime', ) def mount( dev, fstype, options, ): # pick best-of-breed mount options based on fs type if options is None: options = MOUNT_OPTIONS.get(fstype, '') # mount path = tempfile.mkdtemp( prefix='mnt.', dir='/var/lib/ceph/tmp', ) try: subprocess.check_call( args=[ 'mount', '-o', options, '--', dev, path, ], ) except subprocess.CalledProcessError as e: try: os.rmdir(path) except (OSError, IOError): pass raise MountError(e) return path def unmount( path, ): try: subprocess.check_call( args=[ 'umount', '--', path, ], ) except subprocess.CalledProcessError as e: raise UnmountError(e) def activate( path, activate_key_template, do_mount, ): if do_mount: try: fstype = detect_fstype(dev=path) except (subprocess.CalledProcessError, TruncatedLineError, TooManyLinesError) as e: raise FilesystemTypeError( 'device {dev}'.format(dev=path), e, ) mount_options = get_conf( # TODO always using mount options from cluster=ceph for # now; see http://tracker.newdream.net/issues/3253 cluster='ceph', variable='osd_fs_mount_options_{fstype}'.format( fstype=fstype, ), ) path = mount(dev=path, fstype=fstype, options=mount_options) try: check_osd_magic(path) ceph_fsid = read_one_line(path, 'ceph_fsid') if ceph_fsid is None: raise ActivateError('No cluster uuid assigned.') log.debug('Cluster uuid is %s', ceph_fsid) # TODO use ceph_fsid to find the right cluster cluster = 'ceph' log.debug('Cluster name is %s', cluster) fsid = read_one_line(path, 'fsid') if fsid is None: raise ActivateError('No OSD uuid assigned.') log.debug('OSD uuid is %s', fsid) keyring = activate_key_template.format(cluster=cluster) osd_id = get_osd_id(path) if osd_id is None: osd_id = allocate_osd_id( cluster=cluster, fsid=fsid, keyring=keyring, ) write_one_line(path, 'whoami', osd_id) log.debug('OSD id is %s', osd_id) if not os.path.exists(os.path.join(path, 'ready')): log.debug('Initializing OSD...') # re-running mkfs is safe, so just run until it completes mkfs( path=path, cluster=cluster, osd_id=osd_id, fsid=fsid, keyring=keyring, ) if not os.path.exists(os.path.join(path, 'active')): log.debug('Authorizing OSD key...') auth_key( path=path, cluster=cluster, osd_id=osd_id, keyring=keyring, ) write_one_line(path, 'active', 'ok') # check if the disk is already active active = False src_dev = os.stat(path).st_dev try: dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( cluster=cluster, osd_id=osd_id)).st_dev if src_dev == dst_dev: active = True except: pass if active: log.debug('OSD already mounted') unmount(path) else: move_mount( path=path, cluster=cluster, osd_id=osd_id, ) except: unmount(path) finally: if do_mount: # if we created a temp dir to mount it, remove it os.rmdir(path) upstart_start( cluster=cluster, osd_id=osd_id, ) def parse_args(): parser = argparse.ArgumentParser( description='Activate a Ceph OSD', ) parser.add_argument( '-v', '--verbose', action='store_true', default=None, help='be more verbose', ) parser.add_argument( '--mount', action='store_true', default=None, help='mount the device first', ) parser.add_argument( '--activate-key', metavar='PATH', help='bootstrap-osd keyring path template (%(default)s)', dest='activate_key_template', ) parser.add_argument( 'path', metavar='PATH', help='path to OSD data directory, or block device if using --mount', ) parser.set_defaults( activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', # we want to hold on to this, for later prog=parser.prog, ) args = parser.parse_args() return args def main(): args = parse_args() loglevel = logging.INFO if args.verbose: loglevel = logging.DEBUG logging.basicConfig( level=loglevel, ) try: activate( path=args.path, activate_key_template=args.activate_key_template, do_mount=args.mount, ) except ActivateError as e: print >>sys.stderr, '{prog}: {msg}'.format( prog=args.prog, msg=e, ) sys.exit(1) if __name__ == '__main__': main()