diff options
author | Sage Weil <sage@inktank.com> | 2013-05-01 12:57:43 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-01 12:57:43 -0700 |
commit | 15e6544f0892e8472f436149d15fc30257496c0b (patch) | |
tree | f578cb0f1b3616fa8e25915c9b81218e774bfc4b | |
parent | fdc05346177a60f064fe351ca81c6078cd065179 (diff) | |
parent | 418cff585bf6fc292d260a464369d82f7cdb3b79 (diff) | |
download | ceph-15e6544f0892e8472f436149d15fc30257496c0b.tar.gz |
Merge remote-tracking branch 'gh/bobtail-deploy' into bobtail-next
-rw-r--r-- | Makefile.am | 3 | ||||
-rw-r--r-- | ceph.spec.in | 17 | ||||
-rw-r--r-- | debian/ceph.dirs | 1 | ||||
-rw-r--r-- | debian/ceph.install | 8 | ||||
-rw-r--r-- | debian/ceph.postinst | 1 | ||||
-rw-r--r-- | debian/ceph.prerm | 5 | ||||
-rw-r--r-- | debian/control | 2 | ||||
-rwxr-xr-x | debian/rules | 3 | ||||
-rw-r--r-- | src/Makefile.am | 12 | ||||
-rwxr-xr-x | src/ceph-create-keys | 12 | ||||
-rwxr-xr-x | src/ceph-disk | 1925 | ||||
-rwxr-xr-x | src/ceph-disk-activate | 587 | ||||
-rwxr-xr-x | src/ceph-disk-prepare | 532 | ||||
-rw-r--r-- | src/ceph_common.sh | 59 | ||||
-rw-r--r-- | src/init-ceph.in | 1 | ||||
-rw-r--r-- | src/upstart/ceph-hotplug.conf | 11 | ||||
-rw-r--r-- | udev/95-ceph-osd.rules | 21 |
17 files changed, 2056 insertions, 1144 deletions
diff --git a/Makefile.am b/Makefile.am index 3f4231438ad..adeb4e57728 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,7 +9,8 @@ EXTRA_DIST += \ src/test/run-cli-tests-maybe-unset-ccache \ src/test/cli \ src/test/downloads \ - udev/50-rbd.rules + udev/50-rbd.rules \ + udev/95-ceph-osd.rules all-local: diff --git a/ceph.spec.in b/ceph.spec.in index 7aae6fefedb..0ec887eab92 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -24,6 +24,11 @@ Source0: http://ceph.com/download/%{name}-%{version}.tar.bz2 Requires: librbd1 = %{version}-%{release} Requires: librados2 = %{version}-%{release} Requires: libcephfs1 = %{version}-%{release} +Requires: python +Requires: cryptsetup +Requires: gptfdisk +Requires: parted +Requires: util-linux Requires(post): binutils BuildRoot: %{_tmppath}/%{name}-%{version}-build BuildRequires: gcc-c++ @@ -285,6 +290,10 @@ mkdir -p $RPM_BUILD_ROOT/usr/share/java mv $RPM_BUILD_ROOT/usr/lib64/libcephfs.jar $RPM_BUILD_ROOT/usr/share/java/. mv $RPM_BUILD_ROOT/usr/lib64/libcephfs-test.jar $RPM_BUILD_ROOT/usr/share/java/. +# udev rules +install -D -m 644 udev/50-rbd.rules $RPM_BUILD_ROOT/lib/udev/rules.d/50-rbd.rules +install -D -m 644 udev/95-ceph-osd.rules $RPM_BUILD_ROOT/lib/udev/rules.d/95-ceph-osd.rules + %clean rm -rf $RPM_BUILD_ROOT @@ -382,9 +391,11 @@ fi %{_libdir}/rados-classes/libcls_lock.so* %{_libdir}/rados-classes/libcls_kvs.so* %{_libdir}/rados-classes/libcls_refcount.so* -/sbin/ceph-disk-activate -/sbin/ceph-disk-prepare -/sbin/ceph-create-keys +%{_sbindir}/ceph-disk +%{_sbindir}/ceph-disk-activate +%{_sbindir}/ceph-disk-prepare +%{_sbindir}/ceph-create-keys +/lib/udev/rules.d/95-ceph-osd.rules ################################################################################# %files fuse diff --git a/debian/ceph.dirs b/debian/ceph.dirs index b9b8a21816f..ca7a880636c 100644 --- a/debian/ceph.dirs +++ b/debian/ceph.dirs @@ -5,3 +5,4 @@ var/lib/ceph/mon var/lib/ceph/osd var/lib/ceph/mds var/lib/ceph/bootstrap-osd +var/lib/ceph/bootstrap-mds diff --git a/debian/ceph.install b/debian/ceph.install index da097b24c86..82df2bacc21 100644 --- a/debian/ceph.install +++ b/debian/ceph.install @@ -6,9 +6,10 @@ usr/bin/ceph-run usr/bin/ceph-mon usr/bin/ceph-osd usr/bin/ceph-debugpack -sbin/ceph-disk-prepare usr/sbin/ -sbin/ceph-disk-activate usr/sbin/ -sbin/ceph-create-keys usr/sbin/ +usr/sbin/ceph-disk +usr/sbin/ceph-disk-prepare +usr/sbin/ceph-disk-activate +usr/sbin/ceph-create-keys sbin/mkcephfs usr/lib/ceph/ceph_common.sh usr/lib/rados-classes/* @@ -24,3 +25,4 @@ usr/share/man/man8/monmaptool.8 usr/share/man/man8/ceph-clsinfo.8 usr/share/man/man8/ceph-debugpack.8 etc/bash_completion.d/ceph +lib/udev/rules.d/95-ceph-osd.rules diff --git a/debian/ceph.postinst b/debian/ceph.postinst index 1f9469d8f6c..4edbf10d93b 100644 --- a/debian/ceph.postinst +++ b/debian/ceph.postinst @@ -27,6 +27,7 @@ set -e case "$1" in configure) rm -f /etc/init/ceph.conf + start ceph-all || : ;; abort-upgrade|abort-remove|abort-deconfigure) : diff --git a/debian/ceph.prerm b/debian/ceph.prerm new file mode 100644 index 00000000000..159a96e33c3 --- /dev/null +++ b/debian/ceph.prerm @@ -0,0 +1,5 @@ +#!/bin/sh + +stop ceph-all || : + +exit 0
\ No newline at end of file diff --git a/debian/control b/debian/control index 2ad36d94acb..27e36f09733 100644 --- a/debian/control +++ b/debian/control @@ -12,7 +12,7 @@ Standards-Version: 3.9.3 Package: ceph Architecture: linux-any Depends: ${shlibs:Depends}, ${misc:Depends}, sdparm | hdparm, binutils, ceph-common, uuid-runtime, python, xfsprogs -Recommends: ceph-mds, librados2, librbd1, btrfs-tools, gdisk, parted +Recommends: ceph-mds, librados2, librbd1, btrfs-tools, gdisk, parted, cryptsetup-bin Description: distributed storage and file system Ceph is a distributed storage system designed to provide excellent performance, reliability, and scalability. diff --git a/debian/rules b/debian/rules index 8f0b4ef2762..260f27fcd68 100755 --- a/debian/rules +++ b/debian/rules @@ -90,6 +90,7 @@ install: build $(MAKE) DESTDIR=$(DESTDIR) install sed -i "/dependency_libs/ s/'.*'/''/" `find . -name '*.la'` install -D -m 644 udev/50-rbd.rules $(DESTDIR)/lib/udev/rules.d/50-rbd.rules + install -D -m 644 udev/95-ceph-osd.rules $(DESTDIR)/lib/udev/rules.d/95-ceph-osd.rules # Add here commands to install the package into debian/testpack. # Build architecture-independent files here. @@ -127,6 +128,8 @@ binary-arch: build install # per package, so do this ourselves install -d -m0755 debian/ceph/etc/init install -m0644 src/upstart/ceph*.conf debian/ceph/etc/init + install -d -m0755 debian/ceph-mds/etc/init + mv debian/ceph/etc/init/ceph-mds* debian/ceph-mds/etc/init install -d -m0755 debian/radosgw/etc/init install -m0644 src/upstart/radosgw*.conf debian/radosgw/etc/init dh_installman -a diff --git a/src/Makefile.am b/src/Makefile.am index 50267c3b188..7eabd62b121 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -26,11 +26,17 @@ bin_PROGRAMS = # like bin_PROGRAMS, but these targets are only built for debug builds bin_DEBUGPROGRAMS = sbin_PROGRAMS = -sbin_SCRIPTS = \ +# like sbin_SCRIPTS but can be used to install to e.g. /usr/sbin +ceph_sbindir = $(prefix)$(sbindir) +ceph_sbin_SCRIPTS = \ + ceph-disk \ ceph-disk-prepare \ ceph-disk-activate \ - ceph-create-keys \ + ceph-create-keys + +sbin_SCRIPTS = \ mount.fuse.ceph + bin_SCRIPTS = ceph-run $(srcdir)/ceph-clsinfo ceph-debugpack ceph-rbdnamer dist_bin_SCRIPTS = # C/C++ tests to build will be appended to this @@ -1093,13 +1099,13 @@ EXTRA_DIST += \ $(srcdir)/upstart/ceph-osd.conf \ $(srcdir)/upstart/ceph-osd-all.conf \ $(srcdir)/upstart/ceph-osd-all-starter.conf \ - $(srcdir)/upstart/ceph-hotplug.conf \ $(srcdir)/upstart/ceph-mds.conf \ $(srcdir)/upstart/ceph-mds-all.conf \ $(srcdir)/upstart/ceph-mds-all-starter.conf \ $(srcdir)/upstart/radosgw.conf \ $(srcdir)/upstart/radosgw-all.conf \ $(srcdir)/upstart/radosgw-all-starter.conf \ + ceph-disk \ ceph-disk-prepare \ ceph-disk-activate \ ceph-create-keys \ diff --git a/src/ceph-create-keys b/src/ceph-create-keys index 438e51d3076..272bb3ec6ef 100755 --- a/src/ceph-create-keys +++ b/src/ceph-create-keys @@ -190,6 +190,7 @@ def main(): wait_for_quorum(cluster=args.cluster, mon_id=args.id) get_key(cluster=args.cluster, mon_id=args.id) + bootstrap_key( cluster=args.cluster, type_='osd', @@ -203,6 +204,17 @@ def main(): ), ) + bootstrap_key( + cluster=args.cluster, + type_='mds', + caps=dict( + mon=[ + r'allow command auth get-or-create * osd allow\ * mds allow mon allow\ rwx', + 'allow command mon getmap', + ], + ), + ) + if __name__ == '__main__': main() diff --git a/src/ceph-disk b/src/ceph-disk new file mode 100755 index 00000000000..de76f3c8c52 --- /dev/null +++ b/src/ceph-disk @@ -0,0 +1,1925 @@ +#!/usr/bin/python + +import argparse +import errno +import logging +import os +import os.path +import platform +import re +import subprocess +import stat +import sys +import tempfile +import uuid + +CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' + +JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' +DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106' +OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d' +DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d' +TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be' +DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be' + +DEFAULT_FS_TYPE = 'xfs' + +MOUNT_OPTIONS = dict( + btrfs='noatime,user_subvol_rm_allowed', + # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll + # delay a moment before removing it fully because we did have some + # issues with ext4 before the xatts-in-leveldb work, and it seemed + # that user_xattr helped + ext4='noatime,user_xattr', + xfs='noatime', + ) + +MKFS_ARGS = dict( + btrfs=[ + '-m', 'single', + '-l', '32768', + '-n', '32768', + ], + xfs=[ + # xfs insists on not overwriting previous fs; even if we wipe + # partition table, we often recreate it exactly the same way, + # so we'll see ghosts of filesystems past + '-f', + '-i', 'size=2048', + ], + ) + +INIT_SYSTEMS = [ + 'upstart', + 'sysvinit', + 'systemd', + 'auto', + ] + + +LOG_NAME = __name__ +if LOG_NAME == '__main__': + LOG_NAME = os.path.basename(sys.argv[0]) +LOG = logging.getLogger(LOG_NAME) + + +###### exceptions ######## + +class Error(Exception): + """ + Error + """ + + def __str__(self): + doc = self.__doc__.strip() + return ': '.join([doc] + [str(a) for a in self.args]) + +class MountError(Error): + """ + Mounting filesystem failed + """ + +class UnmountError(Error): + """ + Unmounting filesystem failed + """ + +class BadMagicError(Error): + """ + Does not look like a Ceph OSD, or incompatible version + """ + +class TruncatedLineError(Error): + """ + Line is truncated + """ + +class TooManyLinesError(Error): + """ + Too many lines + """ + +class FilesystemTypeError(Error): + """ + Cannot discover filesystem type + """ + + +####### utils + + +def maybe_mkdir(*a, **kw): + """ + Creates a new directory if it doesn't exist, removes + existing symlink before creating the directory. + """ + # remove any symlink, if it is there.. + if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode): + LOG.debug('Removing old symlink at %s', *a) + os.unlink(*a) + try: + os.mkdir(*a, **kw) + except OSError, e: + if e.errno == errno.EEXIST: + pass + else: + raise + + +def list_all_partitions(): + """ + Return a list of devices and partitions + """ + dev_part_list = {} + for name in os.listdir('/dev/disk/by-path'): + target = os.readlink(os.path.join('/dev/disk/by-path', name)) + dev = target.split('/')[-1] + #print "name %s target %s dev %s" % (name, target, dev) + (baser) = re.search('(.*)-part\d+$', name) + if baser is not None: + basename = baser.group(1) + #print 'basename %s' % basename + base = os.readlink(os.path.join('/dev/disk/by-path', basename)).split('/')[-1] + if base not in dev_part_list: + dev_part_list[base] = [] + dev_part_list[base].append(dev) + else: + if dev not in dev_part_list: + dev_part_list[dev] = [] + return dev_part_list + + +def list_partitions(disk): + """ + Return a list of partitions on the given device + """ + disk = os.path.realpath(disk) + assert not is_partition(disk) + assert disk.startswith('/dev/') + base = disk[5:] + partitions = [] + with file('/proc/partitions', 'rb') as proc_partitions: + for line in proc_partitions.read().split('\n')[2:]: + fields = re.split('\s+', line) + if len(fields) < 5: + continue + name = fields [4] + if name != base and name.startswith(base): + partitions.append('/dev/' + name) + return partitions + + +def is_partition(dev): + """ + Check whether a given device is a partition or a full disk. + """ + dev = os.path.realpath(dev) + if not stat.S_ISBLK(os.lstat(dev).st_mode): + raise Error('not a block device', dev) + + # we can't tell just from the name of the device if it is a + # partition or not. look in the by-path dir and see if the + # referring symlink ends in -partNNN. + name = dev.split('/')[-1] + for name in os.listdir('/dev/disk/by-path'): + target = os.readlink(os.path.join('/dev/disk/by-path', name)) + cdev = target.split('/')[-1] + if '/dev/' + cdev != dev: + continue + (baser) = re.search('(.*)-part\d+$', name) + if baser is not None: + return True + else: + return False + + # hrm, don't know... + return False + + +def is_mounted(dev): + """ + Check if the given device is mounted. + """ + dev = os.path.realpath(dev) + with file('/proc/mounts', 'rb') as proc_mounts: + for line in proc_mounts: + fields = line.split() + if len(fields) < 3: + continue + mounts_dev = fields[0] + path = fields[1] + if mounts_dev.startswith('/') and os.path.exists(mounts_dev): + mounts_dev = os.path.realpath(mounts_dev) + if mounts_dev == dev: + return path + return None + + +def is_held(dev): + """ + Check if a device is held by another device (e.g., a dm-crypt mapping) + """ + assert os.path.exists(dev) + dev = os.path.realpath(dev) + base = dev[5:] + disk = base + while disk[-1].isdigit(): + disk = disk[:-1] + directory = '/sys/block/{disk}/{base}/holders'.format(disk=disk, base=base) + if not os.path.exists(directory): + return [] + return os.listdir(directory) + + +def verify_not_in_use(dev): + """ + Verify if a given device (path) is in use (e.g. mounted or + in use by device-mapper). + + :raises: Error if device is in use. + """ + assert os.path.exists(dev) + if is_partition(dev): + if is_mounted(dev): + raise Error('Device is mounted', dev) + holders = is_held(dev) + if holders: + raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) + else: + for partition in list_partitions(dev): + if is_mounted(partition): + raise Error('Device is mounted', partition) + holders = is_held(partition) + if holders: + raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders)) + + +def must_be_one_line(line): + """ + Checks if given line is really one single line. + + :raises: TruncatedLineError or TooManyLinesError + :return: Content of the line, or None if line isn't valid. + """ + if line[-1:] != '\n': + raise TruncatedLineError(line) + line = line[:-1] + if '\n' in line: + raise TooManyLinesError(line) + return line + + +def read_one_line(parent, name): + """ + Read a file whose sole contents are a single line. + + Strips the newline. + + :return: Contents of the line, or None if file did not exist. + """ + path = os.path.join(parent, name) + try: + line = file(path, 'rb').read() + except IOError as e: + if e.errno == errno.ENOENT: + return None + else: + raise + + try: + line = must_be_one_line(line) + except (TruncatedLineError, TooManyLinesError) as e: + raise Error('File is corrupt: {path}: {msg}'.format( + path=path, + msg=e, + )) + return line + + +def write_one_line(parent, name, text): + """ + Write a file whose sole contents are a single line. + + Adds a newline. + """ + path = os.path.join(parent, name) + tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) + with file(tmp, 'wb') as tmp_file: + tmp_file.write(text + '\n') + os.fsync(tmp_file.fileno()) + os.rename(tmp, path) + + +def check_osd_magic(path): + """ + Check that this path has the Ceph OSD magic. + + :raises: BadMagicError if this does not look like a Ceph OSD data + dir. + """ + magic = read_one_line(path, 'magic') + if magic is None: + # probably not mkfs'ed yet + raise BadMagicError(path) + if magic != CEPH_OSD_ONDISK_MAGIC: + raise BadMagicError(path) + + +def check_osd_id(osd_id): + """ + Ensures osd id is numeric. + """ + if not re.match(r'^[0-9]+$', osd_id): + raise Error('osd id is not numeric') + + +def allocate_osd_id( + cluster, + fsid, + keyring, + ): + """ + Accocates an OSD id on the given cluster. + + :raises: Error if the call to allocate the OSD id fails. + :return: The allocated OSD id. + """ + + LOG.debug('Allocating OSD id...') + try: + osd_id = _check_output( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'osd', 'create', '--concise', + fsid, + ], + ) + except subprocess.CalledProcessError as e: + raise Error('ceph osd create failed', e) + osd_id = must_be_one_line(osd_id) + check_osd_id(osd_id) + return osd_id + + +def get_osd_id(path): + """ + Gets the OSD id of the OSD at the given path. + """ + osd_id = read_one_line(path, 'whoami') + if osd_id is not None: + check_osd_id(osd_id) + return osd_id + + +def _check_output(*args, **kwargs): + process = subprocess.Popen( + stdout=subprocess.PIPE, + *args, **kwargs) + out, _ = process.communicate() + ret = process.wait() + if ret: + cmd = kwargs.get("args") + if cmd is None: + cmd = args[0] + #raise subprocess.CalledProcessError(ret, cmd, output=out) + error = subprocess.CalledProcessError(ret, cmd) + error.output = out + raise error + return out + + +def get_conf(cluster, variable): + """ + Get the value of the given configuration variable from the + cluster. + + :raises: Error if call to ceph-conf fails. + :return: The variable value or None. + """ + try: + process = subprocess.Popen( + args=[ + '/usr/bin/ceph-conf', + '--cluster={cluster}'.format( + cluster=cluster, + ), + '--name=osd.', + '--lookup', + variable, + ], + stdout=subprocess.PIPE, + close_fds=True, + ) + except OSError as e: + raise Error('error executing ceph-conf', e) + (out, _err) = process.communicate() + ret = process.wait() + if ret == 1: + # config entry not found + return None + elif ret != 0: + raise Error('getting variable from configuration failed') + value = str(out).split('\n', 1)[0] + # don't differentiate between "var=" and no var set + if not value: + return None + return value + + +def get_conf_with_default(cluster, variable): + """ + Get a config value that is known to the C++ code. + + This will fail if called on variables that are not defined in + common config options. + """ + try: + out = _check_output( + args=[ + 'ceph-osd', + '--cluster={cluster}'.format( + cluster=cluster, + ), + '--show-config-value={variable}'.format( + variable=variable, + ), + ], + close_fds=True, + ) + except subprocess.CalledProcessError as e: + raise Error( + 'getting variable from configuration failed', + e, + ) + + value = str(out).split('\n', 1)[0] + return value + + +def get_fsid(cluster): + """ + Get the fsid of the cluster. + + :return: The fsid or raises Error. + """ + fsid = get_conf(cluster=cluster, variable='fsid') + if fsid is None: + raise Error('getting cluster uuid from configuration failed') + return fsid + + +def get_or_create_dmcrypt_key( + _uuid, + key_dir, + ): + """ + Get path to dmcrypt key or create a new key file. + + :return: Path to the dmcrypt key file. + """ + path = os.path.join(key_dir, _uuid) + + # already have it? + if os.path.exists(path): + return path + + # make a new key + try: + if not os.path.exists(key_dir): + os.makedirs(key_dir) + with file('/dev/urandom', 'rb') as i: + key = i.read(256) + with file(path, 'wb') as key_file: + key_file.write(key) + return path + except: + raise Error('unable to read or create dm-crypt key', path) + + +def dmcrypt_map( + rawdev, + keypath, + _uuid, + ): + """ + Maps a device to a dmcrypt device. + + :return: Path to the dmcrypt device. + """ + dev = '/dev/mapper/'+ _uuid + args = [ + 'cryptsetup', + '--key-file', + keypath, + '--key-size', '256', + 'create', + _uuid, + rawdev, + ] + try: + subprocess.check_call(args) + return dev + + except subprocess.CalledProcessError as e: + raise Error('unable to map device', rawdev, e) + + +def dmcrypt_unmap( + _uuid + ): + """ + Removes the dmcrypt device with the given UUID. + """ + args = [ + 'cryptsetup', + 'remove', + _uuid + ] + + try: + subprocess.check_call(args) + + except subprocess.CalledProcessError as e: + raise Error('unable to unmap device', _uuid, e) + + +def mount( + dev, + fstype, + options, + ): + """ + Mounts a device with given filessystem type and + mount options to a tempfile path under /var/lib/ceph/tmp. + """ + # pick best-of-breed mount options based on fs type + if options is None: + options = MOUNT_OPTIONS.get(fstype, '') + + # mount + path = tempfile.mkdtemp( + prefix='mnt.', + dir='/var/lib/ceph/tmp', + ) + try: + LOG.debug('Mounting %s on %s with options %s', dev, path, options) + subprocess.check_call( + args=[ + 'mount', + '-o', options, + '--', + dev, + path, + ], + ) + except subprocess.CalledProcessError as e: + try: + os.rmdir(path) + except (OSError, IOError): + pass + raise MountError(e) + + return path + + +def unmount( + path, + ): + """ + Unmount and removes the given mount point. + """ + try: + LOG.debug('Unmounting %s', path) + subprocess.check_call( + args=[ + '/bin/umount', + '--', + path, + ], + ) + except subprocess.CalledProcessError as e: + raise UnmountError(e) + + os.rmdir(path) + + +########################################### + + +def get_free_partition_index(dev): + """ + Get the next free partition index on a given device. + + :return: Index number (> 1 if there is already a partition on the device) + or 1 if there is no partition table. + """ + try: + lines = _check_output( + args=[ + 'parted', + '--machine', + '--', + dev, + 'print', + ], + ) + except subprocess.CalledProcessError as e: + print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e + return 1 + + if not lines: + raise Error('parted failed to output anything') + lines = str(lines).splitlines(True) + + if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: + raise Error('weird parted units', lines[0]) + del lines[0] + + if not lines[0].startswith('/dev/'): + raise Error('weird parted disk entry', lines[0]) + del lines[0] + + seen = set() + for line in lines: + idx, _ = line.split(':', 1) + idx = int(idx) + seen.add(idx) + + num = 1 + while num in seen: + num += 1 + return num + + +def zap(dev): + """ + Destroy the partition table and content of a given disk. + """ + try: + LOG.debug('Zapping partition table on %s', dev) + + # try to wipe out any GPT partition table backups. sgdisk + # isn't too thorough. + lba_size = 4096 + size = 33 * lba_size + with file(dev, 'wb') as dev_file: + dev_file.seek(-size, os.SEEK_END) + dev_file.write(size*'\0') + + subprocess.check_call( + args=[ + 'sgdisk', + '--zap-all', + '--clear', + '--mbrtogpt', + '--', + dev, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + +def prepare_journal_dev( + data, + journal, + journal_size, + journal_uuid, + journal_dm_keypath, + ): + + if is_partition(journal): + LOG.debug('Journal %s is a partition', journal) + LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + return (journal, None, None) + + ptype = JOURNAL_UUID + if journal_dm_keypath: + ptype = DMCRYPT_JOURNAL_UUID + + # it is a whole disk. create a partition! + num = None + if journal == data: + # we're sharing the disk between osd data and journal; + # make journal be partition number 2, so it's pretty + num = 2 + journal_part = '{num}:0:{size}M'.format( + num=num, + size=journal_size, + ) + else: + # sgdisk has no way for me to say "whatever is the next + # free index number" when setting type guids etc, so we + # need to awkwardly look up the next free number, and then + # fix that in the call -- and hope nobody races with us; + # then again nothing guards the partition table from races + # anyway + num = get_free_partition_index(dev=journal) + journal_part = '{num}:0:+{size}M'.format( + num=num, + size=journal_size, + ) + LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + + try: + LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal) + subprocess.check_call( + args=[ + 'sgdisk', + '--new={part}'.format(part=journal_part), + '--change-name={num}:ceph journal'.format(num=num), + '--partition-guid={num}:{journal_uuid}'.format( + num=num, + journal_uuid=journal_uuid, + ), + '--typecode={num}:{uuid}'.format( + num=num, + uuid=ptype, + ), + '--', + journal, + ], + ) + subprocess.call( + args=[ + # wait for udev event queue to clear + 'udevadm', + 'settle', + '--timeout=10', + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + journal, + ], + ) + + journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( + journal_uuid=journal_uuid, + ) + + journal_dmcrypt = None + if journal_dm_keypath: + journal_dmcrypt = journal_symlink + journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid) + + LOG.debug('Journal is GPT partition %s', journal_symlink) + return (journal_symlink, journal_dmcrypt, journal_uuid) + + except subprocess.CalledProcessError as e: + raise Error(e) + + +def prepare_journal_file( + journal, + journal_size): + + if not os.path.exists(journal): + LOG.debug('Creating journal file %s with size %dM', journal, journal_size) + with file(journal, 'wb') as journal_file: + journal_file.truncate(journal_size * 1048576) + + # FIXME: should we resize an existing journal file? + + LOG.debug('Journal is file %s', journal) + LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data') + return (journal, None, None) + + +def prepare_journal( + data, + journal, + journal_size, + journal_uuid, + force_file, + force_dev, + journal_dm_keypath, + ): + + if journal is None: + if force_dev: + raise Error('Journal is unspecified; not a block device') + return (None, None, None) + + if not os.path.exists(journal): + if force_dev: + raise Error('Journal does not exist; not a block device', journal) + return prepare_journal_file(journal, journal_size) + + jmode = os.stat(journal).st_mode + if stat.S_ISREG(jmode): + if force_dev: + raise Error('Journal is not a block device', journal) + return prepare_journal_file(journal, journal_size) + + if stat.S_ISBLK(jmode): + if force_file: + raise Error('Journal is not a regular file', journal) + return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath) + + raise Error('Journal %s is neither a block device nor regular file', journal) + + +def adjust_symlink(target, path): + create = True + if os.path.lexists(path): + try: + mode = os.lstat(path).st_mode + if stat.S_ISREG(mode): + LOG.debug('Removing old file %s', path) + os.unlink(path) + elif stat.S_ISLNK(mode): + old = os.readlink(path) + if old != target: + LOG.debug('Removing old symlink %s -> %s', path, old) + os.unlink(path) + else: + create = False + except: + raise Error('unable to remove (or adjust) old file (symlink)', path) + if create: + LOG.debug('Creating symlink %s -> %s', path, target) + try: + os.symlink(target, path) + except: + raise Error('unable to create symlink %s -> %s' % (path, target)) + +def prepare_dir( + path, + journal, + cluster_uuid, + osd_uuid, + journal_uuid, + journal_dmcrypt = None, + ): + LOG.debug('Preparing osd data dir %s', path) + + if osd_uuid is None: + osd_uuid = str(uuid.uuid4()) + + if journal is not None: + # we're using an external journal; point to it here + adjust_symlink(journal, os.path.join(path, 'journal')) + + if journal_dmcrypt is not None: + adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt')) + else: + try: + os.unlink(os.path.join(path, 'journal_dmcrypt')) + except OSError: + pass + + write_one_line(path, 'ceph_fsid', cluster_uuid) + write_one_line(path, 'fsid', osd_uuid) + write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) + + if journal_uuid is not None: + # i.e., journal is a tagged partition + write_one_line(path, 'journal_uuid', journal_uuid) + +def prepare_dev( + data, + journal, + fstype, + mkfs_args, + mount_options, + cluster_uuid, + osd_uuid, + journal_uuid, + journal_dmcrypt, + osd_dm_keypath, + ): + """ + Prepare a data/journal combination to be used for an OSD. + + The ``magic`` file is written last, so it's presence is a reliable + indicator of the whole sequence having completed. + + WARNING: This will unconditionally overwrite anything given to + it. + """ + + ptype_tobe = TOBE_UUID + ptype_osd = OSD_UUID + if osd_dm_keypath: + ptype_tobe = DMCRYPT_TOBE_UUID + ptype_osd = DMCRYPT_OSD_UUID + + rawdev = None + if is_partition(data): + LOG.debug('OSD data device %s is a partition', data) + rawdev = data + else: + LOG.debug('Creating osd partition on %s', data) + try: + subprocess.check_call( + args=[ + 'sgdisk', + '--largest-new=1', + '--change-name=1:ceph data', + '--partition-guid=1:{osd_uuid}'.format( + osd_uuid=osd_uuid, + ), + '--typecode=1:%s' % ptype_tobe, + '--', + data, + ], + ) + subprocess.call( + args=[ + # wait for udev event queue to clear + 'udevadm', + 'settle', + '--timeout=10', + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + data, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + rawdev = '{data}1'.format(data=data) + + dev = None + if osd_dm_keypath: + dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid) + else: + dev = rawdev + + try: + args = [ + 'mkfs', + '-t', + fstype, + ] + if mkfs_args is not None: + args.extend(mkfs_args.split()) + if fstype == 'xfs': + args.extend(['-f']) # always force + else: + args.extend(MKFS_ARGS.get(fstype, [])) + args.extend([ + '--', + dev, + ]) + try: + LOG.debug('Creating %s fs on %s', fstype, dev) + subprocess.check_call(args=args) + except subprocess.CalledProcessError as e: + raise Error(e) + + #remove whitespaces from mount_options + if mount_options is not None: + mount_options = "".join(mount_options.split()) + + path = mount(dev=dev, fstype=fstype, options=mount_options) + + try: + prepare_dir( + path=path, + journal=journal, + cluster_uuid=cluster_uuid, + osd_uuid=osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + ) + finally: + unmount(path) + finally: + if rawdev != dev: + dmcrypt_unmap(osd_uuid) + + if not is_partition(data): + try: + subprocess.check_call( + args=[ + 'sgdisk', + '--typecode=1:%s' % ptype_osd, + '--', + data, + ], + ) + subprocess.call( + args=[ + # wait for udev event queue to clear + 'udevadm', + 'settle', + '--timeout=10', + ], + ) + subprocess.check_call( + args=[ + # also make sure the kernel refreshes the new table + 'partprobe', + data, + ], + ) + except subprocess.CalledProcessError as e: + raise Error(e) + + +def main_prepare(args): + journal_dm_keypath = None + osd_dm_keypath = None + + try: + if not os.path.exists(args.data): + raise Error('data path does not exist', args.data) + + # in use? + dmode = os.stat(args.data).st_mode + if stat.S_ISBLK(dmode): + verify_not_in_use(args.data) + + if args.journal and os.path.exists(args.journal): + jmode = os.stat(args.journal).st_mode + if stat.S_ISBLK(jmode): + verify_not_in_use(args.journal) + + if args.zap_disk is not None: + if stat.S_ISBLK(dmode) and not is_partition(args.data): + zap(args.data) + else: + raise Error('not full block device; cannot zap', args.data) + + if args.cluster_uuid is None: + args.cluster_uuid = get_fsid(cluster=args.cluster) + if args.cluster_uuid is None: + raise Error( + 'must have fsid in config or pass --cluster--uuid=', + ) + + if args.fs_type is None: + args.fs_type = get_conf( + cluster=args.cluster, + variable='osd_mkfs_type', + ) + if args.fs_type is None: + args.fs_type = get_conf( + cluster=args.cluster, + variable='osd_fs_type', + ) + if args.fs_type is None: + args.fs_type = DEFAULT_FS_TYPE + + mkfs_args = get_conf( + cluster=args.cluster, + variable='osd_mkfs_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + if mkfs_args is None: + mkfs_args = get_conf( + cluster=args.cluster, + variable='osd_fs_mkfs_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + + mount_options = get_conf( + cluster=args.cluster, + variable='osd_mount_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + if mount_options is None: + mount_options = get_conf( + cluster=args.cluster, + variable='osd_fs_mount_options_{fstype}'.format( + fstype=args.fs_type, + ), + ) + + journal_size = get_conf_with_default( + cluster=args.cluster, + variable='osd_journal_size', + ) + journal_size = int(journal_size) + + # colocate journal with data? + if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None: + LOG.info('Will colocate journal with data on %s', args.data) + args.journal = args.data + + if args.journal_uuid is None: + args.journal_uuid = str(uuid.uuid4()) + if args.osd_uuid is None: + args.osd_uuid = str(uuid.uuid4()) + + # dm-crypt keys? + if args.dmcrypt: + journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir) + osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir) + + # prepare journal + (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal( + data=args.data, + journal=args.journal, + journal_size=journal_size, + journal_uuid=args.journal_uuid, + force_file=args.journal_file, + force_dev=args.journal_dev, + journal_dm_keypath=journal_dm_keypath, + ) + + # prepare data + if stat.S_ISDIR(dmode): + if args.data_dev: + raise Error('data path is not a block device', args.data) + prepare_dir( + path=args.data, + journal=journal_symlink, + cluster_uuid=args.cluster_uuid, + osd_uuid=args.osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + ) + elif stat.S_ISBLK(dmode): + if args.data_dir: + raise Error('data path is not a directory', args.data) + prepare_dev( + data=args.data, + journal=journal_symlink, + fstype=args.fs_type, + mkfs_args=mkfs_args, + mount_options=mount_options, + cluster_uuid=args.cluster_uuid, + osd_uuid=args.osd_uuid, + journal_uuid=journal_uuid, + journal_dmcrypt=journal_dmcrypt, + osd_dm_keypath=osd_dm_keypath, + ) + else: + raise Error('not a dir or block device', args.data) + + except Error as e: + if journal_dm_keypath: + os.unlink(journal_dm_keypath) + if osd_dm_keypath: + os.unlink(osd_dm_keypath) + raise e + + +########################### + + +def mkfs( + path, + cluster, + osd_id, + fsid, + keyring, + ): + monmap = os.path.join(path, 'activate.monmap') + subprocess.check_call( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'mon', 'getmap', '-o', monmap, + ], + ) + + subprocess.check_call( + args=[ + '/usr/bin/ceph-osd', + '--cluster', cluster, + '--mkfs', + '--mkkey', + '-i', osd_id, + '--monmap', monmap, + '--osd-data', path, + '--osd-journal', os.path.join(path, 'journal'), + '--osd-uuid', fsid, + '--keyring', os.path.join(path, 'keyring'), + ], + ) + # TODO ceph-osd --mkfs removes the monmap file? + # os.unlink(monmap) + + +def auth_key( + path, + cluster, + osd_id, + keyring, + ): + subprocess.check_call( + args=[ + '/usr/bin/ceph', + '--cluster', cluster, + '--name', 'client.bootstrap-osd', + '--keyring', keyring, + 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), + '-i', os.path.join(path, 'keyring'), + 'osd', 'allow *', + 'mon', 'allow rwx', + ], + ) + + +def move_mount( + path, + cluster, + osd_id, + ): + LOG.debug('Moving mount to final location...') + parent = '/var/lib/ceph/osd' + osd_data = os.path.join( + parent, + '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), + ) + maybe_mkdir(osd_data) + subprocess.check_call( + args=[ + '/bin/mount', + '--move', + '--', + path, + osd_data, + ], + ) + + +def start_daemon( + cluster, + osd_id, + ): + LOG.debug('Starting %s osd.%s...', cluster, osd_id) + + path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, osd_id=osd_id) + + # upstart? + try: + if os.path.exists(os.path.join(path,'upstart')): + subprocess.check_call( + args=[ + '/sbin/initctl', + # use emit, not start, because start would fail if the + # instance was already running + 'emit', + # since the daemon starting doesn't guarantee much about + # the service being operational anyway, don't bother + # waiting for it + '--no-wait', + '--', + 'ceph-osd', + 'cluster={cluster}'.format(cluster=cluster), + 'id={osd_id}'.format(osd_id=osd_id), + ], + ) + elif os.path.exists(os.path.join(path, 'sysvinit')): + subprocess.check_call( + args=[ + '/usr/sbin/service', + 'ceph', + 'start', + 'osd.{osd_id}'.format(osd_id=osd_id), + ], + ) + else: + raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format( + cluster=cluster, + osd_id=osd_id, + )) + except subprocess.CalledProcessError as e: + raise Error('ceph osd start failed', e) + +def detect_fstype( + dev, + ): + fstype = _check_output( + args=[ + '/sbin/blkid', + # we don't want stale cached results + '-p', + '-s', 'TYPE', + '-o' 'value', + '--', + dev, + ], + ) + fstype = must_be_one_line(fstype) + return fstype + + +def mount_activate( + dev, + activate_key_template, + init, + ): + + try: + fstype = detect_fstype(dev=dev) + except (subprocess.CalledProcessError, + TruncatedLineError, + TooManyLinesError) as e: + raise FilesystemTypeError( + 'device {dev}'.format(dev=dev), + e, + ) + + # TODO always using mount options from cluster=ceph for + # now; see http://tracker.newdream.net/issues/3253 + mount_options = get_conf( + cluster='ceph', + variable='osd_mount_options_{fstype}'.format( + fstype=fstype, + ), + ) + + if mount_options is None: + mount_options = get_conf( + cluster='ceph', + variable='osd_fs_mount_options_{fstype}'.format( + fstype=fstype, + ), + ) + + #remove whitespaces from mount_options + if mount_options is not None: + mount_options = "".join(mount_options.split()) + + path = mount(dev=dev, fstype=fstype, options=mount_options) + + osd_id = None + cluster = None + try: + (osd_id, cluster) = activate(path, activate_key_template, init) + + # check if the disk is already active, or if something else is already + # mounted there + active = False + other = False + src_dev = os.stat(path).st_dev + try: + dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, + osd_id=osd_id)).st_dev + if src_dev == dst_dev: + active = True + else: + parent_dev = os.stat('/var/lib/ceph/osd').st_dev + if dst_dev != parent_dev: + other = True + except OSError: + pass + if active: + LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id)) + unmount(path) + elif other: + raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id)) + else: + move_mount( + path=path, + cluster=cluster, + osd_id=osd_id, + ) + return (cluster, osd_id) + + except: + LOG.error('Failed to activate') + unmount(path) + raise + finally: + # remove our temp dir + if os.path.exists(path): + os.rmdir(path) + + +def activate_dir( + path, + activate_key_template, + init, + ): + + if not os.path.exists(path): + raise Error( + 'directory %s does not exist' % path + ) + + (osd_id, cluster) = activate(path, activate_key_template, init) + canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format( + cluster=cluster, + osd_id=osd_id) + if path != canonical: + # symlink it from the proper location + create = True + if os.path.lexists(canonical): + old = os.readlink(canonical) + if old != path: + LOG.debug('Removing old symlink %s -> %s', canonical, old) + try: + os.unlink(canonical) + except: + raise Error('unable to remove old symlink %s', canonical) + else: + create = False + if create: + LOG.debug('Creating symlink %s -> %s', canonical, path) + try: + os.symlink(path, canonical) + except: + raise Error('unable to create symlink %s -> %s', canonical, path) + + return (cluster, osd_id) + + +def find_cluster_by_uuid(_uuid): + """ + Find a cluster name by searching /etc/ceph/*.conf for a conf file + with the right uuid. + """ + no_fsid = [] + if not os.path.exists('/etc/ceph'): + return None + for conf_file in os.listdir('/etc/ceph'): + if not conf_file.endswith('.conf'): + continue + cluster = conf_file[:-5] + fsid = get_conf(cluster, 'fsid') + if fsid is None: + no_fsid.append(cluster) + elif fsid == _uuid: + return cluster + # be tolerant of /etc/ceph/ceph.conf without an fsid defined. + if len(no_fsid) == 1 and no_fsid[0] == 'ceph': + LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway') + return 'ceph' + return None + +def activate( + path, + activate_key_template, + init, + ): + + try: + check_osd_magic(path) + + ceph_fsid = read_one_line(path, 'ceph_fsid') + if ceph_fsid is None: + raise Error('No cluster uuid assigned.') + LOG.debug('Cluster uuid is %s', ceph_fsid) + + cluster = find_cluster_by_uuid(ceph_fsid) + if cluster is None: + raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid) + LOG.debug('Cluster name is %s', cluster) + + fsid = read_one_line(path, 'fsid') + if fsid is None: + raise Error('No OSD uuid assigned.') + LOG.debug('OSD uuid is %s', fsid) + + keyring = activate_key_template.format(cluster=cluster) + + osd_id = get_osd_id(path) + if osd_id is None: + osd_id = allocate_osd_id( + cluster=cluster, + fsid=fsid, + keyring=keyring, + ) + write_one_line(path, 'whoami', osd_id) + LOG.debug('OSD id is %s', osd_id) + + if not os.path.exists(os.path.join(path, 'ready')): + LOG.debug('Initializing OSD...') + # re-running mkfs is safe, so just run until it completes + mkfs( + path=path, + cluster=cluster, + osd_id=osd_id, + fsid=fsid, + keyring=keyring, + ) + + if init is not None: + if init == 'auto': + conf_val = get_conf( + cluster=cluster, + variable='init' + ) + if conf_val is not None: + init = conf_val + else: + (distro, release, codename) = platform.dist() + if distro == 'Ubuntu': + init = 'upstart' + else: + init = 'sysvinit' + + LOG.debug('Marking with init system %s', init) + with file(os.path.join(path, init), 'w'): + pass + + # remove markers for others, just in case. + for other in INIT_SYSTEMS: + if other != init: + try: + os.unlink(os.path.join(path, other)) + except OSError: + pass + + if not os.path.exists(os.path.join(path, 'active')): + LOG.debug('Authorizing OSD key...') + auth_key( + path=path, + cluster=cluster, + osd_id=osd_id, + keyring=keyring, + ) + write_one_line(path, 'active', 'ok') + LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path) + return (osd_id, cluster) + except: + raise + + + +def main_activate(args): + cluster = None + osd_id = None + + if not os.path.exists(args.path): + raise Error('%s does not exist', args.path) + + mode = os.stat(args.path).st_mode + if stat.S_ISBLK(mode): + (cluster, osd_id) = mount_activate( + dev=args.path, + activate_key_template=args.activate_key_template, + init=args.mark_init, + ) + elif stat.S_ISDIR(mode): + (cluster, osd_id) = activate_dir( + path=args.path, + activate_key_template=args.activate_key_template, + init=args.mark_init, + ) + else: + raise Error('%s is not a directory or block device', args.path) + + start_daemon( + cluster=cluster, + osd_id=osd_id, + ) + + + +########################### + +def is_swap(dev): + dev = os.path.realpath(dev) + with file('/proc/swaps', 'rb') as proc_swaps: + for line in proc_swaps.readlines()[1:]: + fields = line.split() + if len(fields) < 3: + continue + swaps_dev = fields[0] + if swaps_dev.startswith('/') and os.path.exists(swaps_dev): + swaps_dev = os.path.realpath(swaps_dev) + if swaps_dev == dev: + return True + return False + +def get_oneliner(base, name): + path = os.path.join(base, name) + if os.path.isfile(path): + with open(path, 'r') as _file: + return _file.readline().rstrip() + return None + +def get_dev_fs(dev): + fscheck = subprocess.Popen( + [ + 'blkid', + '-s', + 'TYPE', + dev + ], + stdout = subprocess.PIPE, + stderr=subprocess.PIPE).stdout.read() + if 'TYPE' in fscheck: + fstype = fscheck.split()[1].split('"')[1] + return fstype + else: + return None + +def get_partition_type(part): + (base, partnum) = re.match('(\D+)(\d+)', part).group(1, 2) + sgdisk = subprocess.Popen( + [ + 'sgdisk', + '-p', + base, + ], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE).stdout.read() + for line in sgdisk.splitlines(): + m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line) + if m is not None: + num = m.group(1) + if num != partnum: + continue + return m.group(2) + return None + +def get_partition_uuid(dev): + (base, partnum) = re.match('(\D+)(\d+)', dev).group(1, 2) + out = subprocess.Popen( + [ 'sgdisk', '-i', partnum, base ], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE).stdout.read() + for line in out.splitlines(): + m = re.match('Partition unique GUID: (\S+)', line) + if m: + return m.group(1).lower() + return None + +def more_osd_info(path, uuid_map): + desc = [] + ceph_fsid = get_oneliner(path, 'ceph_fsid') + if ceph_fsid: + cluster = find_cluster_by_uuid(ceph_fsid) + if cluster: + desc.append('cluster ' + cluster) + else: + desc.append('unknown cluster ' + ceph_fsid) + + who = get_oneliner(path, 'whoami') + if who: + desc.append('osd.%s' % who) + + journal_uuid = get_oneliner(path, 'journal_uuid') + if journal_uuid: + journal_uuid = journal_uuid.lower() + if journal_uuid in uuid_map: + desc.append('journal %s' % uuid_map[journal_uuid]) + + return desc + + +def list_dev(dev, uuid_map, journal_map): + ptype = 'unknown' + prefix = '' + if is_partition(dev): + ptype = get_partition_type(dev) + prefix = ' ' + fs_type = get_dev_fs(dev) + path = is_mounted(dev) + + desc = [] + if ptype == 'ceph data': + if path: + desc.append('active') + desc.extend(more_osd_info(path, uuid_map)) + elif fs_type: + try: + tpath = mount(dev=dev, fstype=fs_type, options='') + if tpath: + try: + magic = get_oneliner(tpath, 'magic') + if magic is not None: + desc.append('prepared') + desc.extend(more_osd_info(tpath, uuid_map)) + finally: + unmount(tpath) + except MountError: + pass + if desc: + desc = ['ceph data'] + desc + else: + desc = ['ceph data', 'unprepared'] + elif ptype == 'ceph journal': + desc.append('ceph journal') + part_uuid = get_partition_uuid(dev) + if part_uuid and part_uuid in journal_map: + desc.append('for %s' % journal_map[part_uuid]) + else: + if is_swap(dev): + desc.append('swap') + else: + desc.append('other') + if fs_type: + desc.append(fs_type) + elif ptype: + desc.append(ptype) + if path: + desc.append('mounted on %s' % path) + + print '%s%s %s' % (prefix, dev, ', '.join(desc)) + + + +def main_list(args): + partmap = list_all_partitions() + + uuid_map = {} + journal_map = {} + for base, parts in sorted(partmap.iteritems()): + for p in parts: + dev = '/dev/' + p + part_uuid = get_partition_uuid(dev) + if part_uuid: + uuid_map[part_uuid] = dev + ptype = get_partition_type(dev) + if ptype == 'ceph data': + fs_type = get_dev_fs(dev) + try: + tpath = mount(dev=dev, fstype=fs_type, options='') + try: + journal_uuid = get_oneliner(tpath, 'journal_uuid') + if journal_uuid: + journal_map[journal_uuid.lower()] = dev + finally: + unmount(tpath) + except MountError: + pass + + for base, parts in sorted(partmap.iteritems()): + if parts: + print '/dev/%s :' % base + for p in sorted(parts): + list_dev('/dev/' + p, uuid_map, journal_map) + else: + list_dev('/dev/' + base, uuid_map, journal_map) + + +########################### + + +def parse_args(): + parser = argparse.ArgumentParser( + 'ceph-disk', + ) + parser.add_argument( + '-v', '--verbose', + action='store_true', default=None, + help='be more verbose', + ) + parser.set_defaults( + # we want to hold on to this, for later + prog=parser.prog, + cluster='ceph', + ) + + subparsers = parser.add_subparsers( + title='subcommands', + description='valid subcommands', + help='sub-command help', + ) + + prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD') + prepare_parser.add_argument( + '--cluster', + metavar='NAME', + help='cluster name to assign this disk to', + ) + prepare_parser.add_argument( + '--cluster-uuid', + metavar='UUID', + help='cluster uuid to assign this disk to', + ) + prepare_parser.add_argument( + '--osd-uuid', + metavar='UUID', + help='unique OSD uuid to assign this disk to', + ) + prepare_parser.add_argument( + '--journal-uuid', + metavar='UUID', + help='unique uuid to assign to the journal', + ) + prepare_parser.add_argument( + '--fs-type', + help='file system type to use (e.g. "ext4")', + ) + prepare_parser.add_argument( + '--zap-disk', + action='store_true', default=None, + help='destroy the partition table (and content) of a disk', + ) + prepare_parser.add_argument( + '--data-dir', + action='store_true', default=None, + help='verify that DATA is a dir', + ) + prepare_parser.add_argument( + '--data-dev', + action='store_true', default=None, + help='verify that DATA is a block device', + ) + prepare_parser.add_argument( + '--journal-file', + action='store_true', default=None, + help='verify that JOURNAL is a file', + ) + prepare_parser.add_argument( + '--journal-dev', + action='store_true', default=None, + help='verify that JOURNAL is a block device', + ) + prepare_parser.add_argument( + '--dmcrypt', + action='store_true', default=None, + help='encrypt DATA and/or JOURNAL devices with dm-crypt', + ) + prepare_parser.add_argument( + '--dmcrypt-key-dir', + metavar='KEYDIR', + default='/etc/ceph/dmcrypt-keys', + help='directory where dm-crypt keys are stored', + ) + prepare_parser.add_argument( + 'data', + metavar='DATA', + help='path to OSD data (a disk block device or directory)', + ) + prepare_parser.add_argument( + 'journal', + metavar='JOURNAL', + nargs='?', + help=('path to OSD journal disk block device;' + + ' leave out to store journal in file'), + ) + prepare_parser.set_defaults( + func=main_prepare, + ) + + activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD') + activate_parser.add_argument( + '--mount', + action='store_true', default=None, + help='mount a block device [deprecated, ignored]', + ) + activate_parser.add_argument( + '--activate-key', + metavar='PATH', + help='bootstrap-osd keyring path template (%(default)s)', + dest='activate_key_template', + ) + activate_parser.add_argument( + '--mark-init', + metavar='INITSYSTEM', + help='init system to manage this dir', + default='auto', + choices=INIT_SYSTEMS, + ) + activate_parser.add_argument( + 'path', + metavar='PATH', + nargs='?', + help='path to block device or directory', + ) + activate_parser.set_defaults( + activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', + func=main_activate, + ) + + list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs') + list_parser.set_defaults( + func=main_list, + ) + + args = parser.parse_args() + return args + + +def main(): + args = parse_args() + + loglevel = logging.INFO + if args.verbose: + loglevel = logging.DEBUG + + logging.basicConfig( + level=loglevel, + ) + + try: + args.func(args) + + except Error as e: + print >> sys.stderr, '{prog}: {msg}'.format( + prog=args.prog, + msg=e, + ) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/src/ceph-disk-activate b/src/ceph-disk-activate index f78ae17ce88..72e89f9af30 100755 --- a/src/ceph-disk-activate +++ b/src/ceph-disk-activate @@ -1,584 +1,3 @@ -#!/usr/bin/python - -import argparse -import errno -import logging -import os -import os.path -import re -import subprocess -import sys -import tempfile - - -log_name = __name__ -if log_name == '__main__': - log_name = os.path.basename(sys.argv[0]) -log = logging.getLogger(log_name) - - -class ActivateError(Exception): - """ - OSD activation error - """ - - def __str__(self): - doc = self.__doc__.strip() - return ': '.join([doc] + [str(a) for a in self.args]) - - -class BadMagicError(ActivateError): - """ - Does not look like a Ceph OSD, or incompatible version - """ - - -class TruncatedLineError(ActivateError): - """ - Line is truncated - """ - - -class TooManyLinesError(ActivateError): - """ - Too many lines - """ - - -class FilesystemTypeError(ActivateError): - """ - Cannot discover filesystem type - """ - - -class MountError(ActivateError): - """ - Mounting filesystem failed - """ - - -class UnmountError(ActivateError): - """ - Unmounting filesystem failed - """ - - -def maybe_mkdir(*a, **kw): - try: - os.mkdir(*a, **kw) - except OSError, e: - if e.errno == errno.EEXIST: - pass - else: - raise - - -def must_be_one_line(line): - if line[-1:] != '\n': - raise TruncatedLineError(line) - line = line[:-1] - if '\n' in line: - raise TooManyLinesError(line) - return line - - -def read_one_line(parent, name): - """ - Read a file whose sole contents are a single line. - - Strips the newline. - - :return: Contents of the line, or None if file did not exist. - """ - path = os.path.join(parent, name) - try: - line = file(path, 'rb').read() - except IOError as e: - if e.errno == errno.ENOENT: - return None - else: - raise - - try: - line = must_be_one_line(line) - except (TruncatedLineError, TooManyLinesError) as e: - raise ActivateError('File is corrupt: {path}: {msg}'.format( - path=path, - msg=e, - )) - return line - - -def write_one_line(parent, name, text): - """ - Write a file whose sole contents are a single line. - - Adds a newline. - """ - path = os.path.join(parent, name) - tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) - with file(tmp, 'wb') as f: - f.write(text + '\n') - os.fsync(f.fileno()) - os.rename(tmp, path) - - -CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' - - -def check_osd_magic(path): - """ - Check that this path has the Ceph OSD magic. - - :raises: BadMagicError if this does not look like a Ceph OSD data - dir. - """ - magic = read_one_line(path, 'magic') - if magic is None: - # probably not mkfs'ed yet - raise BadMagicError(path) - if magic != CEPH_OSD_ONDISK_MAGIC: - raise BadMagicError(path) - - -def check_osd_id(osd_id): - """ - Ensures osd id is numeric. - """ - if not re.match(r'^[0-9]+$', osd_id): - raise ActivateError('osd id is not numeric') - - -def get_osd_id(path): - osd_id = read_one_line(path, 'whoami') - if osd_id is not None: - check_osd_id(osd_id) - return osd_id - - -# TODO depend on python2.7 -def _check_output(*args, **kwargs): - process = subprocess.Popen( - stdout=subprocess.PIPE, - *args, **kwargs) - out, _ = process.communicate() - ret = process.wait() - if ret: - cmd = kwargs.get("args") - if cmd is None: - cmd = args[0] - raise subprocess.CalledProcessError(ret, cmd, output=out) - return out - - -def allocate_osd_id( - cluster, - fsid, - keyring, - ): - log.debug('Allocating OSD id...') - try: - osd_id = _check_output( - args=[ - 'ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'osd', 'create', '--concise', - fsid, - ], - ) - except subprocess.CalledProcessError as e: - raise ActivateError('ceph osd create failed', e) - osd_id = must_be_one_line(osd_id) - check_osd_id(osd_id) - return osd_id - - -def mkfs( - path, - cluster, - osd_id, - fsid, - keyring, - ): - monmap = os.path.join(path, 'activate.monmap') - subprocess.check_call( - args=[ - 'ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'mon', 'getmap', '-o', monmap, - ], - ) - - subprocess.check_call( - args=[ - 'ceph-osd', - '--cluster', cluster, - '--mkfs', - '--mkkey', - '-i', osd_id, - '--monmap', monmap, - '--osd-data', path, - '--osd-journal', os.path.join(path, 'journal'), - '--osd-uuid', fsid, - '--keyring', os.path.join(path, 'keyring'), - ], - ) - # TODO ceph-osd --mkfs removes the monmap file? - # os.unlink(monmap) - - -def auth_key( - path, - cluster, - osd_id, - keyring, - ): - subprocess.check_call( - args=[ - 'ceph', - '--cluster', cluster, - '--name', 'client.bootstrap-osd', - '--keyring', keyring, - 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id), - '-i', os.path.join(path, 'keyring'), - 'osd', 'allow *', - 'mon', 'allow rwx', - ], - ) - - -def move_mount( - path, - cluster, - osd_id, - ): - log.debug('Moving mount to final location...') - parent = '/var/lib/ceph/osd' - osd_data = os.path.join( - parent, - '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id), - ) - maybe_mkdir(osd_data) - subprocess.check_call( - args=[ - 'mount', - '--move', - '--', - path, - osd_data, - ], - ) - - -def upstart_start( - cluster, - osd_id, - ): - log.debug('Starting service...') - subprocess.check_call( - args=[ - 'initctl', - # use emit, not start, because start would fail if the - # instance was already running - 'emit', - # since the daemon starting doesn't guarantee much about - # the service being operational anyway, don't bother - # waiting for it - '--no-wait', - '--', - 'ceph-osd', - 'cluster={cluster}'.format(cluster=cluster), - 'id={osd_id}'.format(osd_id=osd_id), - ], - ) - - -def detect_fstype( - dev, - ): - fstype = _check_output( - args=[ - 'blkid', - # we don't want stale cached results - '-p', - '-s', 'TYPE', - '-o' 'value', - '--', - dev, - ], - ) - fstype = must_be_one_line(fstype) - return fstype - - -def get_conf(cluster, variable): - try: - p = subprocess.Popen( - args=[ - 'ceph-conf', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--name=osd.', - '--lookup', - variable, - ], - stdout=subprocess.PIPE, - close_fds=True, - ) - except OSError as e: - raise ActivateError('error executing ceph-conf', e) - (out, _err) = p.communicate() - ret = p.wait() - if ret == 1: - # config entry not found - return None - elif ret != 0: - raise ActivateError('getting variable from configuration failed') - value = out.split('\n', 1)[0] - # don't differentiate between "var=" and no var set - if not value: - return None - return value - - -MOUNT_OPTIONS = dict( - btrfs='noatime,user_subvol_rm_allowed', - # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll - # delay a moment before removing it fully because we did have some - # issues with ext4 before the xatts-in-leveldb work, and it seemed - # that user_xattr helped - ext4='noatime,user_xattr', - xfs='noatime', - ) - - -def mount( - dev, - fstype, - options, - ): - # pick best-of-breed mount options based on fs type - if options is None: - options = MOUNT_OPTIONS.get(fstype, '') - - # mount - path = tempfile.mkdtemp( - prefix='mnt.', - dir='/var/lib/ceph/tmp', - ) - try: - subprocess.check_call( - args=[ - 'mount', - '-o', options, - '--', - dev, - path, - ], - ) - except subprocess.CalledProcessError as e: - try: - os.rmdir(path) - except (OSError, IOError): - pass - raise MountError(e) - - return path - - -def unmount( - path, - ): - try: - subprocess.check_call( - args=[ - 'umount', - '--', - path, - ], - ) - except subprocess.CalledProcessError as e: - raise UnmountError(e) - - -def activate( - path, - activate_key_template, - do_mount, - ): - - if do_mount: - try: - fstype = detect_fstype(dev=path) - except (subprocess.CalledProcessError, - TruncatedLineError, - TooManyLinesError) as e: - raise FilesystemTypeError( - 'device {dev}'.format(dev=path), - e, - ) - - mount_options = get_conf( - # TODO always using mount options from cluster=ceph for - # now; see http://tracker.newdream.net/issues/3253 - cluster='ceph', - variable='osd_fs_mount_options_{fstype}'.format( - fstype=fstype, - ), - ) - - path = mount(dev=path, fstype=fstype, options=mount_options) - - try: - check_osd_magic(path) - - ceph_fsid = read_one_line(path, 'ceph_fsid') - if ceph_fsid is None: - raise ActivateError('No cluster uuid assigned.') - log.debug('Cluster uuid is %s', ceph_fsid) - - # TODO use ceph_fsid to find the right cluster - cluster = 'ceph' - log.debug('Cluster name is %s', cluster) - - fsid = read_one_line(path, 'fsid') - if fsid is None: - raise ActivateError('No OSD uuid assigned.') - log.debug('OSD uuid is %s', fsid) - - keyring = activate_key_template.format(cluster=cluster) - - osd_id = get_osd_id(path) - if osd_id is None: - osd_id = allocate_osd_id( - cluster=cluster, - fsid=fsid, - keyring=keyring, - ) - write_one_line(path, 'whoami', osd_id) - log.debug('OSD id is %s', osd_id) - - if not os.path.exists(os.path.join(path, 'ready')): - log.debug('Initializing OSD...') - # re-running mkfs is safe, so just run until it completes - mkfs( - path=path, - cluster=cluster, - osd_id=osd_id, - fsid=fsid, - keyring=keyring, - ) - - # indicate this daemon is managed by upstart - if not os.path.exists(os.path.join(path, 'upstart')): - with file(os.path.join(path, 'upstart'), 'w'): - pass - - if not os.path.exists(os.path.join(path, 'active')): - log.debug('Authorizing OSD key...') - auth_key( - path=path, - cluster=cluster, - osd_id=osd_id, - keyring=keyring, - ) - write_one_line(path, 'active', 'ok') - - # check if the disk is already active - active = False - src_dev = os.stat(path).st_dev - try: - dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format( - cluster=cluster, - osd_id=osd_id)).st_dev - if src_dev == dst_dev: - active = True - except: - pass - if active: - log.debug('OSD already mounted') - unmount(path) - else: - move_mount( - path=path, - cluster=cluster, - osd_id=osd_id, - ) - except: - unmount(path) - finally: - if do_mount: - # if we created a temp dir to mount it, remove it - os.rmdir(path) - - upstart_start( - cluster=cluster, - osd_id=osd_id, - ) - - -def parse_args(): - parser = argparse.ArgumentParser( - description='Activate a Ceph OSD', - ) - parser.add_argument( - '-v', '--verbose', - action='store_true', default=None, - help='be more verbose', - ) - parser.add_argument( - '--mount', - action='store_true', default=None, - help='mount the device first', - ) - parser.add_argument( - '--activate-key', - metavar='PATH', - help='bootstrap-osd keyring path template (%(default)s)', - dest='activate_key_template', - ) - parser.add_argument( - 'path', - metavar='PATH', - help='path to OSD data directory, or block device if using --mount', - ) - parser.set_defaults( - activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', - # we want to hold on to this, for later - prog=parser.prog, - ) - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - loglevel = logging.INFO - if args.verbose: - loglevel = logging.DEBUG - - logging.basicConfig( - level=loglevel, - ) - - try: - activate( - path=args.path, - activate_key_template=args.activate_key_template, - do_mount=args.mount, - ) - except ActivateError as e: - print >>sys.stderr, '{prog}: {msg}'.format( - prog=args.prog, - msg=e, - ) - sys.exit(1) - -if __name__ == '__main__': - main() +#!/bin/sh +dir=`dirname $0` +$dir/ceph-disk activate $* diff --git a/src/ceph-disk-prepare b/src/ceph-disk-prepare index e5c4bdb9050..f9255eb8831 100755 --- a/src/ceph-disk-prepare +++ b/src/ceph-disk-prepare @@ -1,529 +1,3 @@ -#!/usr/bin/python - -import argparse -import logging -import os -import os.path -import subprocess -import sys -import tempfile -import uuid - - -log_name = __name__ -if log_name == '__main__': - log_name = os.path.basename(sys.argv[0]) -log = logging.getLogger(log_name) - - -class PrepareError(Exception): - """ - OSD preparation error - """ - - def __str__(self): - doc = self.__doc__.strip() - return ': '.join([doc] + [str(a) for a in self.args]) - - -class MountError(PrepareError): - """ - Mounting filesystem failed - """ - - -class UnmountError(PrepareError): - """ - Unmounting filesystem failed - """ - - -def write_one_line(parent, name, text): - """ - Write a file whose sole contents are a single line. - - Adds a newline. - """ - path = os.path.join(parent, name) - tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid()) - with file(tmp, 'wb') as f: - f.write(text + '\n') - os.fsync(f.fileno()) - os.rename(tmp, path) - - -CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026' - -JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106' - - -# TODO depend on python2.7 -def _check_output(*args, **kwargs): - process = subprocess.Popen( - stdout=subprocess.PIPE, - *args, **kwargs) - out, _ = process.communicate() - ret = process.wait() - if ret: - cmd = kwargs.get("args") - if cmd is None: - cmd = args[0] - raise subprocess.CalledProcessError(ret, cmd, output=out) - return out - - -def get_conf(cluster, variable): - try: - p = subprocess.Popen( - args=[ - 'ceph-conf', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--name=osd.', - '--lookup', - variable, - ], - stdout=subprocess.PIPE, - close_fds=True, - ) - except OSError as e: - raise PrepareError('error executing ceph-conf', e) - (out, _err) = p.communicate() - ret = p.wait() - if ret == 1: - # config entry not found - return None - elif ret != 0: - raise PrepareError('getting variable from configuration failed') - value = out.split('\n', 1)[0] - # don't differentiate between "var=" and no var set - if not value: - return None - return value - - -def get_conf_with_default(cluster, variable): - """ - Get a config value that is known to the C++ code. - - This will fail if called on variables that are not defined in - common config options. - """ - try: - out = _check_output( - args=[ - 'ceph-osd', - '--cluster={cluster}'.format( - cluster=cluster, - ), - '--show-config-value={variable}'.format( - variable=variable, - ), - ], - close_fds=True, - ) - except subprocess.CalledProcessError as e: - raise PrepareError( - 'getting variable from configuration failed', - e, - ) - - value = out.split('\n', 1)[0] - return value - - -def get_fsid(cluster): - fsid = get_conf(cluster=cluster, variable='fsid') - if fsid is None: - raise PrepareError('getting cluster uuid from configuration failed') - return fsid - - -DEFAULT_FS_TYPE = 'xfs' - -MOUNT_OPTIONS = dict( - btrfs='noatime,user_subvol_rm_allowed', - ext4='noatime,user_xattr', - xfs='noatime', - ) - -MKFS_ARGS = dict( - btrfs=[ - '-m', 'single', - '-l', '32768', - '-n', '32768', - ], - xfs=[ - # xfs insists on not overwriting previous fs; even if we wipe - # partition table, we often recreate it exactly the same way, - # so we'll see ghosts of filesystems past - '-f', - '-i', 'size=2048', - ], - ) - - -def mount( - dev, - fstype, - options, - ): - # pick best-of-breed mount options based on fs type - if options is None: - options = MOUNT_OPTIONS.get(fstype, '') - - # mount - path = tempfile.mkdtemp( - prefix='mnt.', - dir='/var/lib/ceph/tmp', - ) - try: - subprocess.check_call( - args=[ - 'mount', - '-o', options, - '--', - dev, - path, - ], - ) - except subprocess.CalledProcessError as e: - try: - os.rmdir(path) - except (OSError, IOError): - pass - raise MountError(e) - - return path - - -def unmount( - path, - ): - try: - subprocess.check_call( - args=[ - 'umount', - '--', - path, - ], - ) - except subprocess.CalledProcessError as e: - raise UnmountError(e) - - os.rmdir(path) - - -def get_free_partition_index(dev): - try: - lines = _check_output( - args=[ - 'parted', - '--machine', - '--', - dev, - 'print', - ], - ) - except subprocess.CalledProcessError as e: - print 'cannot read partition index; assume it isn\'t present\n' - return 1 - - if not lines: - raise PrepareError('parted failed to output anything') - lines = lines.splitlines(True) - - if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']: - raise PrepareError('weird parted units', lines[0]) - del lines[0] - - if not lines[0].startswith('/dev/'): - raise PrepareError('weird parted disk entry', lines[0]) - del lines[0] - - seen = set() - for line in lines: - idx, _ = line.split(':', 1) - idx = int(idx) - seen.add(idx) - - num = 1 - while num in seen: - num += 1 - return num - - -def prepare( - disk, - journal, - journal_size, - fstype, - mkfs_args, - mount_options, - cluster_uuid, - ): - """ - Prepare a disk to be used as an OSD data disk. - - The ``magic`` file is written last, so it's presence is a reliable - indicator of the whole sequence having completed. - - WARNING: This will unconditionally overwrite anything given to - it. - """ - - try: - # this kills the crab - subprocess.check_call( - args=[ - 'sgdisk', - '--zap-all', - '--clear', - '--mbrtogpt', - '--', - disk, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - osd_uuid = str(uuid.uuid4()) - - # store the partition uuid iff using external journal - journal_uuid = None - - if journal is not None: - journal_uuid = str(uuid.uuid4()) - - if journal == disk: - # we're sharing the disk between osd data and journal; - # make journal be partition number 2, so it's pretty; put - # journal at end of free space so partitioning tools don't - # reorder them suddenly - num = 2 - journal_part = '{num}:-{size}M:0'.format( - num=num, - size=journal_size, - ) - else: - # sgdisk has no way for me to say "whatever is the next - # free index number" when setting type guids etc, so we - # need to awkwardly look up the next free number, and then - # fix that in the call -- and hope nobody races with us; - # then again nothing guards the partition table from races - # anyway - num = get_free_partition_index(dev=journal) - journal_part = '{num}:0:+{size}M'.format( - num=num, - size=journal_size, - ) - - try: - subprocess.check_call( - args=[ - 'sgdisk', - '--new={part}'.format(part=journal_part), - '--change-name={num}:ceph journal'.format(num=num), - '--partition-guid={num}:{journal_uuid}'.format( - num=num, - journal_uuid=journal_uuid, - ), - '--typecode={num}:{uuid}'.format( - num=num, - uuid=JOURNAL_UUID, - ), - '--', - journal, - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - journal, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - try: - subprocess.check_call( - args=[ - 'sgdisk', - '--largest-new=1', - '--change-name=1:ceph data', - '--partition-guid=1:{osd_uuid}'.format( - osd_uuid=osd_uuid, - ), - '--typecode=1:89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be', - '--', - disk, - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - disk, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - dev = '{disk}1'.format(disk=disk) - args = [ - 'mkfs', - '--type={fstype}'.format(fstype=fstype), - ] - args.extend(MKFS_ARGS.get(fstype, [])) - if mkfs_args is not None: - args.extend(mkfs_args.split()) - args.extend - args.extend([ - '--', - dev, - ]) - try: - subprocess.check_call(args=args) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - path = mount(dev=dev, fstype=fstype, options=mount_options) - try: - if journal_uuid is not None: - # we're using an external journal; point to it here - os.symlink( - '/dev/disk/by-partuuid/{journal_uuid}'.format( - journal_uuid=journal_uuid, - ), - os.path.join(path, 'journal'), - ) - write_one_line(path, 'ceph_fsid', cluster_uuid) - write_one_line(path, 'fsid', osd_uuid) - write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC) - finally: - unmount(path) - - try: - subprocess.check_call( - args=[ - 'sgdisk', - '--typecode=1:4fbd7e29-9d25-41b8-afd0-062c0ceff05d', - '--', - disk, - ], - ) - except subprocess.CalledProcessError as e: - raise PrepareError(e) - - -def parse_args(): - parser = argparse.ArgumentParser( - description='Prepare a disk for a Ceph OSD', - ) - parser.add_argument( - '-v', '--verbose', - action='store_true', default=None, - help='be more verbose', - ) - parser.add_argument( - '--cluster', - metavar='NAME', - help='cluster name to assign this disk to', - ) - parser.add_argument( - '--cluster-uuid', - metavar='UUID', - help='cluster uuid to assign this disk to', - ) - parser.add_argument( - '--fs-type', - help='file system type to use (e.g. "ext4")', - ) - parser.add_argument( - 'disk', - metavar='DISK', - help='path to OSD data disk block device', - ) - parser.add_argument( - 'journal', - metavar='JOURNAL', - nargs='?', - help=('path to OSD journal disk block device;' - + ' leave out to store journal in file'), - ) - parser.set_defaults( - # we want to hold on to this, for later - prog=parser.prog, - cluster='ceph', - ) - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - loglevel = logging.INFO - if args.verbose: - loglevel = logging.DEBUG - - logging.basicConfig( - level=loglevel, - ) - - try: - if args.cluster_uuid is None: - args.cluster_uuid = get_fsid(cluster=args.cluster) - if args.cluster_uuid is None: - raise PrepareError( - 'must have fsid in config or pass --cluster--uuid=', - ) - - if args.fs_type is None: - args.fs_type = get_conf( - cluster=args.cluster, - variable='osd_fs_type', - ) - if args.fs_type is None: - args.fs_type = DEFAULT_FS_TYPE - - mkfs_args = get_conf( - cluster=args.cluster, - variable='osd_fs_mkfs_arguments_{fstype}'.format( - fstype=args.fs_type, - ), - ) - - mount_options = get_conf( - cluster=args.cluster, - variable='osd_fs_mount_options_{fstype}'.format( - fstype=args.fs_type, - ), - ) - - journal_size = get_conf_with_default( - cluster=args.cluster, - variable='osd_journal_size', - ) - journal_size = int(journal_size) - - prepare( - disk=args.disk, - journal=args.journal, - journal_size=journal_size, - fstype=args.fs_type, - mkfs_args=mkfs_args, - mount_options=mount_options, - cluster_uuid=args.cluster_uuid, - ) - except PrepareError as e: - print >>sys.stderr, '{prog}: {msg}'.format( - prog=args.prog, - msg=e, - ) - sys.exit(1) - -if __name__ == '__main__': - main() +#!/bin/sh +dir=`dirname $0` +$dir/ceph-disk prepare $* diff --git a/src/ceph_common.sh b/src/ceph_common.sh index f20628bfa22..5576018fa4f 100644 --- a/src/ceph_common.sh +++ b/src/ceph_common.sh @@ -45,6 +45,13 @@ check_host() { #echo host for $name is $host, i am $hostname + # sysvinit managed instance in standird location? + if [ -e "/var/lib/ceph/$type/ceph-$id/sysvinit" ]; then + host="$hostname" + echo "=== $type.$id === " + return 0 + fi + # ignore all sections without 'host' defined if [ -z "$host" ]; then return 1 @@ -121,14 +128,49 @@ do_root_cmd() { fi } +get_local_daemon_list() { + type=$1 + if [ -d "/var/lib/ceph/$type" ]; then + for i in `find /var/lib/ceph/$type -mindepth 1 -maxdepth 1 -type d -printf '%f\n'`; do + if [ -e "/var/lib/ceph/$type/$i/sysvinit" ]; then + id=`echo $i | sed 's/.*-//'` + local="$local $type.$id" + fi + done + fi +} + +get_local_name_list() { + orig=$1 + local="" + + if [ -z "$orig" ]; then + # enumerate local directories + get_local_daemon_list "mon" + get_local_daemon_list "osd" + get_local_daemon_list "mds" + return + fi + + for f in $orig; do + type=`echo $f | cut -c 1-3` # e.g. 'mon', if $item is 'mon1' + id=`echo $f | cut -c 4- | sed 's/\\.//'` + get_local_daemon_list $type + + # FIXME + done +} + get_name_list() { orig=$1 + # extract list of monitors, mdss, osds defined in startup.conf + allconf=`$CCONF -c $conf -l mon | egrep -v '^mon$' ; \ + $CCONF -c $conf -l mds | egrep -v '^mds$' ; \ + $CCONF -c $conf -l osd | egrep -v '^osd$'` + if [ -z "$orig" ]; then - # extract list of monitors, mdss, osds defined in startup.conf - what=`$CCONF -c $conf -l mon | egrep -v '^mon$' ; \ - $CCONF -c $conf -l mds | egrep -v '^mds$' ; \ - $CCONF -c $conf -l osd | egrep -v '^osd$'` + what="$allconf $local" return fi @@ -136,17 +178,16 @@ get_name_list() { for f in $orig; do type=`echo $f | cut -c 1-3` # e.g. 'mon', if $item is 'mon1' id=`echo $f | cut -c 4- | sed 's/\\.//'` - all=`$CCONF -c $conf -l $type | egrep -v "^$type$" || true` case $f in mon | osd | mds) - what="$what $all" + what=`echo $allconf $local | grep ^$type || true` ;; *) - if echo " " $all " " | egrep -v -q "( $type$id | $type.$id )"; then - echo "$0: $type.$id not found ($conf defines \"$all\")" + if echo " " "$allconf" "$local" " " | egrep -v -q "( $type$id | $type.$id )"; then + echo "$0: $type.$id not found ($conf defines \"$all\", /var/lib/ceph defines \"$local\")" exit 1 fi - what="$what $f" + what="$f" ;; esac done diff --git a/src/init-ceph.in b/src/init-ceph.in index ee03679d10d..6530db7742f 100644 --- a/src/init-ceph.in +++ b/src/init-ceph.in @@ -165,6 +165,7 @@ verify_conf command=$1 [ -n "$*" ] && shift +get_local_name_list "$@" get_name_list "$@" for name in $what; do diff --git a/src/upstart/ceph-hotplug.conf b/src/upstart/ceph-hotplug.conf deleted file mode 100644 index 702045293a2..00000000000 --- a/src/upstart/ceph-hotplug.conf +++ /dev/null @@ -1,11 +0,0 @@ -description "Ceph hotplug" - -start on block-device-added \ - DEVTYPE=partition \ - ID_PART_ENTRY_TYPE=4fbd7e29-9d25-41b8-afd0-062c0ceff05d -stop on runlevel [!2345] - -task -instance $DEVNAME - -exec /usr/sbin/ceph-disk-activate --mount -- "$DEVNAME" diff --git a/udev/95-ceph-osd.rules b/udev/95-ceph-osd.rules new file mode 100644 index 00000000000..77e6ef37c5d --- /dev/null +++ b/udev/95-ceph-osd.rules @@ -0,0 +1,21 @@ +# activate ceph-tagged partitions +ACTION=="add", SUBSYSTEM=="block", \ + ENV{DEVTYPE}=="partition", \ + ENV{ID_PART_ENTRY_TYPE}=="4fbd7e29-9d25-41b8-afd0-062c0ceff05d", \ + RUN+="/usr/sbin/ceph-disk-activate --mount /dev/$name" + +# Map journal if using dm-crypt +ACTION=="add" SUBSYSTEM=="block", \ + ENV{DEVTYPE}=="partition", \ + ENV{ID_PART_ENTRY_TYPE}=="45b0969e-9b03-4f30-b4c6-5ec00ceff106", \ + RUN+="/sbin/cryptsetup --key-file /etc/ceph/dmcrypt-keys/$env{ID_PART_ENTRY_UUID} --key-size 256 create $env{ID_PART_ENTRY_UUID} /dev/$name" + +# Map data device and +# activate ceph-tagged partitions +# for dm-crypted data devices +ACTION=="add" SUBSYSTEM=="block", \ + ENV{DEVTYPE}=="partition", \ + ENV{ID_PART_ENTRY_TYPE}=="4fbd7e29-9d25-41b8-afd0-5ec00ceff05d", \ + RUN+="/sbin/cryptsetup --key-file /etc/ceph/dmcrypt-keys/$env{ID_PART_ENTRY_UUID} --key-size 256 create $env{ID_PART_ENTRY_UUID} /dev/$name", \ + RUN+="/bin/bash -c 'while [ ! -e /dev/mapper/$env{ID_PART_ENTRY_UUID} ];do sleep 1; done'", \ + RUN+="/usr/sbin/ceph-disk-activate --mount /dev/mapper/$env{ID_PART_ENTRY_UUID}" |