summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-05-01 12:57:43 -0700
committerSage Weil <sage@inktank.com>2013-05-01 12:57:43 -0700
commit15e6544f0892e8472f436149d15fc30257496c0b (patch)
treef578cb0f1b3616fa8e25915c9b81218e774bfc4b
parentfdc05346177a60f064fe351ca81c6078cd065179 (diff)
parent418cff585bf6fc292d260a464369d82f7cdb3b79 (diff)
downloadceph-15e6544f0892e8472f436149d15fc30257496c0b.tar.gz
Merge remote-tracking branch 'gh/bobtail-deploy' into bobtail-next
-rw-r--r--Makefile.am3
-rw-r--r--ceph.spec.in17
-rw-r--r--debian/ceph.dirs1
-rw-r--r--debian/ceph.install8
-rw-r--r--debian/ceph.postinst1
-rw-r--r--debian/ceph.prerm5
-rw-r--r--debian/control2
-rwxr-xr-xdebian/rules3
-rw-r--r--src/Makefile.am12
-rwxr-xr-xsrc/ceph-create-keys12
-rwxr-xr-xsrc/ceph-disk1925
-rwxr-xr-xsrc/ceph-disk-activate587
-rwxr-xr-xsrc/ceph-disk-prepare532
-rw-r--r--src/ceph_common.sh59
-rw-r--r--src/init-ceph.in1
-rw-r--r--src/upstart/ceph-hotplug.conf11
-rw-r--r--udev/95-ceph-osd.rules21
17 files changed, 2056 insertions, 1144 deletions
diff --git a/Makefile.am b/Makefile.am
index 3f4231438ad..adeb4e57728 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,7 +9,8 @@ EXTRA_DIST += \
src/test/run-cli-tests-maybe-unset-ccache \
src/test/cli \
src/test/downloads \
- udev/50-rbd.rules
+ udev/50-rbd.rules \
+ udev/95-ceph-osd.rules
all-local:
diff --git a/ceph.spec.in b/ceph.spec.in
index 7aae6fefedb..0ec887eab92 100644
--- a/ceph.spec.in
+++ b/ceph.spec.in
@@ -24,6 +24,11 @@ Source0: http://ceph.com/download/%{name}-%{version}.tar.bz2
Requires: librbd1 = %{version}-%{release}
Requires: librados2 = %{version}-%{release}
Requires: libcephfs1 = %{version}-%{release}
+Requires: python
+Requires: cryptsetup
+Requires: gptfdisk
+Requires: parted
+Requires: util-linux
Requires(post): binutils
BuildRoot: %{_tmppath}/%{name}-%{version}-build
BuildRequires: gcc-c++
@@ -285,6 +290,10 @@ mkdir -p $RPM_BUILD_ROOT/usr/share/java
mv $RPM_BUILD_ROOT/usr/lib64/libcephfs.jar $RPM_BUILD_ROOT/usr/share/java/.
mv $RPM_BUILD_ROOT/usr/lib64/libcephfs-test.jar $RPM_BUILD_ROOT/usr/share/java/.
+# udev rules
+install -D -m 644 udev/50-rbd.rules $RPM_BUILD_ROOT/lib/udev/rules.d/50-rbd.rules
+install -D -m 644 udev/95-ceph-osd.rules $RPM_BUILD_ROOT/lib/udev/rules.d/95-ceph-osd.rules
+
%clean
rm -rf $RPM_BUILD_ROOT
@@ -382,9 +391,11 @@ fi
%{_libdir}/rados-classes/libcls_lock.so*
%{_libdir}/rados-classes/libcls_kvs.so*
%{_libdir}/rados-classes/libcls_refcount.so*
-/sbin/ceph-disk-activate
-/sbin/ceph-disk-prepare
-/sbin/ceph-create-keys
+%{_sbindir}/ceph-disk
+%{_sbindir}/ceph-disk-activate
+%{_sbindir}/ceph-disk-prepare
+%{_sbindir}/ceph-create-keys
+/lib/udev/rules.d/95-ceph-osd.rules
#################################################################################
%files fuse
diff --git a/debian/ceph.dirs b/debian/ceph.dirs
index b9b8a21816f..ca7a880636c 100644
--- a/debian/ceph.dirs
+++ b/debian/ceph.dirs
@@ -5,3 +5,4 @@ var/lib/ceph/mon
var/lib/ceph/osd
var/lib/ceph/mds
var/lib/ceph/bootstrap-osd
+var/lib/ceph/bootstrap-mds
diff --git a/debian/ceph.install b/debian/ceph.install
index da097b24c86..82df2bacc21 100644
--- a/debian/ceph.install
+++ b/debian/ceph.install
@@ -6,9 +6,10 @@ usr/bin/ceph-run
usr/bin/ceph-mon
usr/bin/ceph-osd
usr/bin/ceph-debugpack
-sbin/ceph-disk-prepare usr/sbin/
-sbin/ceph-disk-activate usr/sbin/
-sbin/ceph-create-keys usr/sbin/
+usr/sbin/ceph-disk
+usr/sbin/ceph-disk-prepare
+usr/sbin/ceph-disk-activate
+usr/sbin/ceph-create-keys
sbin/mkcephfs
usr/lib/ceph/ceph_common.sh
usr/lib/rados-classes/*
@@ -24,3 +25,4 @@ usr/share/man/man8/monmaptool.8
usr/share/man/man8/ceph-clsinfo.8
usr/share/man/man8/ceph-debugpack.8
etc/bash_completion.d/ceph
+lib/udev/rules.d/95-ceph-osd.rules
diff --git a/debian/ceph.postinst b/debian/ceph.postinst
index 1f9469d8f6c..4edbf10d93b 100644
--- a/debian/ceph.postinst
+++ b/debian/ceph.postinst
@@ -27,6 +27,7 @@ set -e
case "$1" in
configure)
rm -f /etc/init/ceph.conf
+ start ceph-all || :
;;
abort-upgrade|abort-remove|abort-deconfigure)
:
diff --git a/debian/ceph.prerm b/debian/ceph.prerm
new file mode 100644
index 00000000000..159a96e33c3
--- /dev/null
+++ b/debian/ceph.prerm
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+stop ceph-all || :
+
+exit 0 \ No newline at end of file
diff --git a/debian/control b/debian/control
index 2ad36d94acb..27e36f09733 100644
--- a/debian/control
+++ b/debian/control
@@ -12,7 +12,7 @@ Standards-Version: 3.9.3
Package: ceph
Architecture: linux-any
Depends: ${shlibs:Depends}, ${misc:Depends}, sdparm | hdparm, binutils, ceph-common, uuid-runtime, python, xfsprogs
-Recommends: ceph-mds, librados2, librbd1, btrfs-tools, gdisk, parted
+Recommends: ceph-mds, librados2, librbd1, btrfs-tools, gdisk, parted, cryptsetup-bin
Description: distributed storage and file system
Ceph is a distributed storage system designed to provide excellent
performance, reliability, and scalability.
diff --git a/debian/rules b/debian/rules
index 8f0b4ef2762..260f27fcd68 100755
--- a/debian/rules
+++ b/debian/rules
@@ -90,6 +90,7 @@ install: build
$(MAKE) DESTDIR=$(DESTDIR) install
sed -i "/dependency_libs/ s/'.*'/''/" `find . -name '*.la'`
install -D -m 644 udev/50-rbd.rules $(DESTDIR)/lib/udev/rules.d/50-rbd.rules
+ install -D -m 644 udev/95-ceph-osd.rules $(DESTDIR)/lib/udev/rules.d/95-ceph-osd.rules
# Add here commands to install the package into debian/testpack.
# Build architecture-independent files here.
@@ -127,6 +128,8 @@ binary-arch: build install
# per package, so do this ourselves
install -d -m0755 debian/ceph/etc/init
install -m0644 src/upstart/ceph*.conf debian/ceph/etc/init
+ install -d -m0755 debian/ceph-mds/etc/init
+ mv debian/ceph/etc/init/ceph-mds* debian/ceph-mds/etc/init
install -d -m0755 debian/radosgw/etc/init
install -m0644 src/upstart/radosgw*.conf debian/radosgw/etc/init
dh_installman -a
diff --git a/src/Makefile.am b/src/Makefile.am
index 50267c3b188..7eabd62b121 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -26,11 +26,17 @@ bin_PROGRAMS =
# like bin_PROGRAMS, but these targets are only built for debug builds
bin_DEBUGPROGRAMS =
sbin_PROGRAMS =
-sbin_SCRIPTS = \
+# like sbin_SCRIPTS but can be used to install to e.g. /usr/sbin
+ceph_sbindir = $(prefix)$(sbindir)
+ceph_sbin_SCRIPTS = \
+ ceph-disk \
ceph-disk-prepare \
ceph-disk-activate \
- ceph-create-keys \
+ ceph-create-keys
+
+sbin_SCRIPTS = \
mount.fuse.ceph
+
bin_SCRIPTS = ceph-run $(srcdir)/ceph-clsinfo ceph-debugpack ceph-rbdnamer
dist_bin_SCRIPTS =
# C/C++ tests to build will be appended to this
@@ -1093,13 +1099,13 @@ EXTRA_DIST += \
$(srcdir)/upstart/ceph-osd.conf \
$(srcdir)/upstart/ceph-osd-all.conf \
$(srcdir)/upstart/ceph-osd-all-starter.conf \
- $(srcdir)/upstart/ceph-hotplug.conf \
$(srcdir)/upstart/ceph-mds.conf \
$(srcdir)/upstart/ceph-mds-all.conf \
$(srcdir)/upstart/ceph-mds-all-starter.conf \
$(srcdir)/upstart/radosgw.conf \
$(srcdir)/upstart/radosgw-all.conf \
$(srcdir)/upstart/radosgw-all-starter.conf \
+ ceph-disk \
ceph-disk-prepare \
ceph-disk-activate \
ceph-create-keys \
diff --git a/src/ceph-create-keys b/src/ceph-create-keys
index 438e51d3076..272bb3ec6ef 100755
--- a/src/ceph-create-keys
+++ b/src/ceph-create-keys
@@ -190,6 +190,7 @@ def main():
wait_for_quorum(cluster=args.cluster, mon_id=args.id)
get_key(cluster=args.cluster, mon_id=args.id)
+
bootstrap_key(
cluster=args.cluster,
type_='osd',
@@ -203,6 +204,17 @@ def main():
),
)
+ bootstrap_key(
+ cluster=args.cluster,
+ type_='mds',
+ caps=dict(
+ mon=[
+ r'allow command auth get-or-create * osd allow\ * mds allow mon allow\ rwx',
+ 'allow command mon getmap',
+ ],
+ ),
+ )
+
if __name__ == '__main__':
main()
diff --git a/src/ceph-disk b/src/ceph-disk
new file mode 100755
index 00000000000..de76f3c8c52
--- /dev/null
+++ b/src/ceph-disk
@@ -0,0 +1,1925 @@
+#!/usr/bin/python
+
+import argparse
+import errno
+import logging
+import os
+import os.path
+import platform
+import re
+import subprocess
+import stat
+import sys
+import tempfile
+import uuid
+
+CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'
+
+JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106'
+DMCRYPT_JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-5ec00ceff106'
+OSD_UUID = '4fbd7e29-9d25-41b8-afd0-062c0ceff05d'
+DMCRYPT_OSD_UUID = '4fbd7e29-9d25-41b8-afd0-5ec00ceff05d'
+TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be'
+DMCRYPT_TOBE_UUID = '89c57f98-2fe5-4dc0-89c1-5ec00ceff2be'
+
+DEFAULT_FS_TYPE = 'xfs'
+
+MOUNT_OPTIONS = dict(
+ btrfs='noatime,user_subvol_rm_allowed',
+ # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll
+ # delay a moment before removing it fully because we did have some
+ # issues with ext4 before the xatts-in-leveldb work, and it seemed
+ # that user_xattr helped
+ ext4='noatime,user_xattr',
+ xfs='noatime',
+ )
+
+MKFS_ARGS = dict(
+ btrfs=[
+ '-m', 'single',
+ '-l', '32768',
+ '-n', '32768',
+ ],
+ xfs=[
+ # xfs insists on not overwriting previous fs; even if we wipe
+ # partition table, we often recreate it exactly the same way,
+ # so we'll see ghosts of filesystems past
+ '-f',
+ '-i', 'size=2048',
+ ],
+ )
+
+INIT_SYSTEMS = [
+ 'upstart',
+ 'sysvinit',
+ 'systemd',
+ 'auto',
+ ]
+
+
+LOG_NAME = __name__
+if LOG_NAME == '__main__':
+ LOG_NAME = os.path.basename(sys.argv[0])
+LOG = logging.getLogger(LOG_NAME)
+
+
+###### exceptions ########
+
+class Error(Exception):
+ """
+ Error
+ """
+
+ def __str__(self):
+ doc = self.__doc__.strip()
+ return ': '.join([doc] + [str(a) for a in self.args])
+
+class MountError(Error):
+ """
+ Mounting filesystem failed
+ """
+
+class UnmountError(Error):
+ """
+ Unmounting filesystem failed
+ """
+
+class BadMagicError(Error):
+ """
+ Does not look like a Ceph OSD, or incompatible version
+ """
+
+class TruncatedLineError(Error):
+ """
+ Line is truncated
+ """
+
+class TooManyLinesError(Error):
+ """
+ Too many lines
+ """
+
+class FilesystemTypeError(Error):
+ """
+ Cannot discover filesystem type
+ """
+
+
+####### utils
+
+
+def maybe_mkdir(*a, **kw):
+ """
+ Creates a new directory if it doesn't exist, removes
+ existing symlink before creating the directory.
+ """
+ # remove any symlink, if it is there..
+ if os.path.exists(*a) and stat.S_ISLNK(os.lstat(*a).st_mode):
+ LOG.debug('Removing old symlink at %s', *a)
+ os.unlink(*a)
+ try:
+ os.mkdir(*a, **kw)
+ except OSError, e:
+ if e.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+
+
+def list_all_partitions():
+ """
+ Return a list of devices and partitions
+ """
+ dev_part_list = {}
+ for name in os.listdir('/dev/disk/by-path'):
+ target = os.readlink(os.path.join('/dev/disk/by-path', name))
+ dev = target.split('/')[-1]
+ #print "name %s target %s dev %s" % (name, target, dev)
+ (baser) = re.search('(.*)-part\d+$', name)
+ if baser is not None:
+ basename = baser.group(1)
+ #print 'basename %s' % basename
+ base = os.readlink(os.path.join('/dev/disk/by-path', basename)).split('/')[-1]
+ if base not in dev_part_list:
+ dev_part_list[base] = []
+ dev_part_list[base].append(dev)
+ else:
+ if dev not in dev_part_list:
+ dev_part_list[dev] = []
+ return dev_part_list
+
+
+def list_partitions(disk):
+ """
+ Return a list of partitions on the given device
+ """
+ disk = os.path.realpath(disk)
+ assert not is_partition(disk)
+ assert disk.startswith('/dev/')
+ base = disk[5:]
+ partitions = []
+ with file('/proc/partitions', 'rb') as proc_partitions:
+ for line in proc_partitions.read().split('\n')[2:]:
+ fields = re.split('\s+', line)
+ if len(fields) < 5:
+ continue
+ name = fields [4]
+ if name != base and name.startswith(base):
+ partitions.append('/dev/' + name)
+ return partitions
+
+
+def is_partition(dev):
+ """
+ Check whether a given device is a partition or a full disk.
+ """
+ dev = os.path.realpath(dev)
+ if not stat.S_ISBLK(os.lstat(dev).st_mode):
+ raise Error('not a block device', dev)
+
+ # we can't tell just from the name of the device if it is a
+ # partition or not. look in the by-path dir and see if the
+ # referring symlink ends in -partNNN.
+ name = dev.split('/')[-1]
+ for name in os.listdir('/dev/disk/by-path'):
+ target = os.readlink(os.path.join('/dev/disk/by-path', name))
+ cdev = target.split('/')[-1]
+ if '/dev/' + cdev != dev:
+ continue
+ (baser) = re.search('(.*)-part\d+$', name)
+ if baser is not None:
+ return True
+ else:
+ return False
+
+ # hrm, don't know...
+ return False
+
+
+def is_mounted(dev):
+ """
+ Check if the given device is mounted.
+ """
+ dev = os.path.realpath(dev)
+ with file('/proc/mounts', 'rb') as proc_mounts:
+ for line in proc_mounts:
+ fields = line.split()
+ if len(fields) < 3:
+ continue
+ mounts_dev = fields[0]
+ path = fields[1]
+ if mounts_dev.startswith('/') and os.path.exists(mounts_dev):
+ mounts_dev = os.path.realpath(mounts_dev)
+ if mounts_dev == dev:
+ return path
+ return None
+
+
+def is_held(dev):
+ """
+ Check if a device is held by another device (e.g., a dm-crypt mapping)
+ """
+ assert os.path.exists(dev)
+ dev = os.path.realpath(dev)
+ base = dev[5:]
+ disk = base
+ while disk[-1].isdigit():
+ disk = disk[:-1]
+ directory = '/sys/block/{disk}/{base}/holders'.format(disk=disk, base=base)
+ if not os.path.exists(directory):
+ return []
+ return os.listdir(directory)
+
+
+def verify_not_in_use(dev):
+ """
+ Verify if a given device (path) is in use (e.g. mounted or
+ in use by device-mapper).
+
+ :raises: Error if device is in use.
+ """
+ assert os.path.exists(dev)
+ if is_partition(dev):
+ if is_mounted(dev):
+ raise Error('Device is mounted', dev)
+ holders = is_held(dev)
+ if holders:
+ raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders))
+ else:
+ for partition in list_partitions(dev):
+ if is_mounted(partition):
+ raise Error('Device is mounted', partition)
+ holders = is_held(partition)
+ if holders:
+ raise Error('Device %s is in use by a device-mapper mapping (dm-crypt?)' % partition, ','.join(holders))
+
+
+def must_be_one_line(line):
+ """
+ Checks if given line is really one single line.
+
+ :raises: TruncatedLineError or TooManyLinesError
+ :return: Content of the line, or None if line isn't valid.
+ """
+ if line[-1:] != '\n':
+ raise TruncatedLineError(line)
+ line = line[:-1]
+ if '\n' in line:
+ raise TooManyLinesError(line)
+ return line
+
+
+def read_one_line(parent, name):
+ """
+ Read a file whose sole contents are a single line.
+
+ Strips the newline.
+
+ :return: Contents of the line, or None if file did not exist.
+ """
+ path = os.path.join(parent, name)
+ try:
+ line = file(path, 'rb').read()
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ return None
+ else:
+ raise
+
+ try:
+ line = must_be_one_line(line)
+ except (TruncatedLineError, TooManyLinesError) as e:
+ raise Error('File is corrupt: {path}: {msg}'.format(
+ path=path,
+ msg=e,
+ ))
+ return line
+
+
+def write_one_line(parent, name, text):
+ """
+ Write a file whose sole contents are a single line.
+
+ Adds a newline.
+ """
+ path = os.path.join(parent, name)
+ tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
+ with file(tmp, 'wb') as tmp_file:
+ tmp_file.write(text + '\n')
+ os.fsync(tmp_file.fileno())
+ os.rename(tmp, path)
+
+
+def check_osd_magic(path):
+ """
+ Check that this path has the Ceph OSD magic.
+
+ :raises: BadMagicError if this does not look like a Ceph OSD data
+ dir.
+ """
+ magic = read_one_line(path, 'magic')
+ if magic is None:
+ # probably not mkfs'ed yet
+ raise BadMagicError(path)
+ if magic != CEPH_OSD_ONDISK_MAGIC:
+ raise BadMagicError(path)
+
+
+def check_osd_id(osd_id):
+ """
+ Ensures osd id is numeric.
+ """
+ if not re.match(r'^[0-9]+$', osd_id):
+ raise Error('osd id is not numeric')
+
+
+def allocate_osd_id(
+ cluster,
+ fsid,
+ keyring,
+ ):
+ """
+ Accocates an OSD id on the given cluster.
+
+ :raises: Error if the call to allocate the OSD id fails.
+ :return: The allocated OSD id.
+ """
+
+ LOG.debug('Allocating OSD id...')
+ try:
+ osd_id = _check_output(
+ args=[
+ '/usr/bin/ceph',
+ '--cluster', cluster,
+ '--name', 'client.bootstrap-osd',
+ '--keyring', keyring,
+ 'osd', 'create', '--concise',
+ fsid,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ raise Error('ceph osd create failed', e)
+ osd_id = must_be_one_line(osd_id)
+ check_osd_id(osd_id)
+ return osd_id
+
+
+def get_osd_id(path):
+ """
+ Gets the OSD id of the OSD at the given path.
+ """
+ osd_id = read_one_line(path, 'whoami')
+ if osd_id is not None:
+ check_osd_id(osd_id)
+ return osd_id
+
+
+def _check_output(*args, **kwargs):
+ process = subprocess.Popen(
+ stdout=subprocess.PIPE,
+ *args, **kwargs)
+ out, _ = process.communicate()
+ ret = process.wait()
+ if ret:
+ cmd = kwargs.get("args")
+ if cmd is None:
+ cmd = args[0]
+ #raise subprocess.CalledProcessError(ret, cmd, output=out)
+ error = subprocess.CalledProcessError(ret, cmd)
+ error.output = out
+ raise error
+ return out
+
+
+def get_conf(cluster, variable):
+ """
+ Get the value of the given configuration variable from the
+ cluster.
+
+ :raises: Error if call to ceph-conf fails.
+ :return: The variable value or None.
+ """
+ try:
+ process = subprocess.Popen(
+ args=[
+ '/usr/bin/ceph-conf',
+ '--cluster={cluster}'.format(
+ cluster=cluster,
+ ),
+ '--name=osd.',
+ '--lookup',
+ variable,
+ ],
+ stdout=subprocess.PIPE,
+ close_fds=True,
+ )
+ except OSError as e:
+ raise Error('error executing ceph-conf', e)
+ (out, _err) = process.communicate()
+ ret = process.wait()
+ if ret == 1:
+ # config entry not found
+ return None
+ elif ret != 0:
+ raise Error('getting variable from configuration failed')
+ value = str(out).split('\n', 1)[0]
+ # don't differentiate between "var=" and no var set
+ if not value:
+ return None
+ return value
+
+
+def get_conf_with_default(cluster, variable):
+ """
+ Get a config value that is known to the C++ code.
+
+ This will fail if called on variables that are not defined in
+ common config options.
+ """
+ try:
+ out = _check_output(
+ args=[
+ 'ceph-osd',
+ '--cluster={cluster}'.format(
+ cluster=cluster,
+ ),
+ '--show-config-value={variable}'.format(
+ variable=variable,
+ ),
+ ],
+ close_fds=True,
+ )
+ except subprocess.CalledProcessError as e:
+ raise Error(
+ 'getting variable from configuration failed',
+ e,
+ )
+
+ value = str(out).split('\n', 1)[0]
+ return value
+
+
+def get_fsid(cluster):
+ """
+ Get the fsid of the cluster.
+
+ :return: The fsid or raises Error.
+ """
+ fsid = get_conf(cluster=cluster, variable='fsid')
+ if fsid is None:
+ raise Error('getting cluster uuid from configuration failed')
+ return fsid
+
+
+def get_or_create_dmcrypt_key(
+ _uuid,
+ key_dir,
+ ):
+ """
+ Get path to dmcrypt key or create a new key file.
+
+ :return: Path to the dmcrypt key file.
+ """
+ path = os.path.join(key_dir, _uuid)
+
+ # already have it?
+ if os.path.exists(path):
+ return path
+
+ # make a new key
+ try:
+ if not os.path.exists(key_dir):
+ os.makedirs(key_dir)
+ with file('/dev/urandom', 'rb') as i:
+ key = i.read(256)
+ with file(path, 'wb') as key_file:
+ key_file.write(key)
+ return path
+ except:
+ raise Error('unable to read or create dm-crypt key', path)
+
+
+def dmcrypt_map(
+ rawdev,
+ keypath,
+ _uuid,
+ ):
+ """
+ Maps a device to a dmcrypt device.
+
+ :return: Path to the dmcrypt device.
+ """
+ dev = '/dev/mapper/'+ _uuid
+ args = [
+ 'cryptsetup',
+ '--key-file',
+ keypath,
+ '--key-size', '256',
+ 'create',
+ _uuid,
+ rawdev,
+ ]
+ try:
+ subprocess.check_call(args)
+ return dev
+
+ except subprocess.CalledProcessError as e:
+ raise Error('unable to map device', rawdev, e)
+
+
+def dmcrypt_unmap(
+ _uuid
+ ):
+ """
+ Removes the dmcrypt device with the given UUID.
+ """
+ args = [
+ 'cryptsetup',
+ 'remove',
+ _uuid
+ ]
+
+ try:
+ subprocess.check_call(args)
+
+ except subprocess.CalledProcessError as e:
+ raise Error('unable to unmap device', _uuid, e)
+
+
+def mount(
+ dev,
+ fstype,
+ options,
+ ):
+ """
+ Mounts a device with given filessystem type and
+ mount options to a tempfile path under /var/lib/ceph/tmp.
+ """
+ # pick best-of-breed mount options based on fs type
+ if options is None:
+ options = MOUNT_OPTIONS.get(fstype, '')
+
+ # mount
+ path = tempfile.mkdtemp(
+ prefix='mnt.',
+ dir='/var/lib/ceph/tmp',
+ )
+ try:
+ LOG.debug('Mounting %s on %s with options %s', dev, path, options)
+ subprocess.check_call(
+ args=[
+ 'mount',
+ '-o', options,
+ '--',
+ dev,
+ path,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ try:
+ os.rmdir(path)
+ except (OSError, IOError):
+ pass
+ raise MountError(e)
+
+ return path
+
+
+def unmount(
+ path,
+ ):
+ """
+ Unmount and removes the given mount point.
+ """
+ try:
+ LOG.debug('Unmounting %s', path)
+ subprocess.check_call(
+ args=[
+ '/bin/umount',
+ '--',
+ path,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ raise UnmountError(e)
+
+ os.rmdir(path)
+
+
+###########################################
+
+
+def get_free_partition_index(dev):
+ """
+ Get the next free partition index on a given device.
+
+ :return: Index number (> 1 if there is already a partition on the device)
+ or 1 if there is no partition table.
+ """
+ try:
+ lines = _check_output(
+ args=[
+ 'parted',
+ '--machine',
+ '--',
+ dev,
+ 'print',
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ print 'cannot read partition index; assume it isn\'t present\n (Error: %s)' % e
+ return 1
+
+ if not lines:
+ raise Error('parted failed to output anything')
+ lines = str(lines).splitlines(True)
+
+ if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']:
+ raise Error('weird parted units', lines[0])
+ del lines[0]
+
+ if not lines[0].startswith('/dev/'):
+ raise Error('weird parted disk entry', lines[0])
+ del lines[0]
+
+ seen = set()
+ for line in lines:
+ idx, _ = line.split(':', 1)
+ idx = int(idx)
+ seen.add(idx)
+
+ num = 1
+ while num in seen:
+ num += 1
+ return num
+
+
+def zap(dev):
+ """
+ Destroy the partition table and content of a given disk.
+ """
+ try:
+ LOG.debug('Zapping partition table on %s', dev)
+
+ # try to wipe out any GPT partition table backups. sgdisk
+ # isn't too thorough.
+ lba_size = 4096
+ size = 33 * lba_size
+ with file(dev, 'wb') as dev_file:
+ dev_file.seek(-size, os.SEEK_END)
+ dev_file.write(size*'\0')
+
+ subprocess.check_call(
+ args=[
+ 'sgdisk',
+ '--zap-all',
+ '--clear',
+ '--mbrtogpt',
+ '--',
+ dev,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ raise Error(e)
+
+
+def prepare_journal_dev(
+ data,
+ journal,
+ journal_size,
+ journal_uuid,
+ journal_dm_keypath,
+ ):
+
+ if is_partition(journal):
+ LOG.debug('Journal %s is a partition', journal)
+ LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
+ return (journal, None, None)
+
+ ptype = JOURNAL_UUID
+ if journal_dm_keypath:
+ ptype = DMCRYPT_JOURNAL_UUID
+
+ # it is a whole disk. create a partition!
+ num = None
+ if journal == data:
+ # we're sharing the disk between osd data and journal;
+ # make journal be partition number 2, so it's pretty
+ num = 2
+ journal_part = '{num}:0:{size}M'.format(
+ num=num,
+ size=journal_size,
+ )
+ else:
+ # sgdisk has no way for me to say "whatever is the next
+ # free index number" when setting type guids etc, so we
+ # need to awkwardly look up the next free number, and then
+ # fix that in the call -- and hope nobody races with us;
+ # then again nothing guards the partition table from races
+ # anyway
+ num = get_free_partition_index(dev=journal)
+ journal_part = '{num}:0:+{size}M'.format(
+ num=num,
+ size=journal_size,
+ )
+ LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
+
+ try:
+ LOG.debug('Creating journal partition num %d size %d on %s', num, journal_size, journal)
+ subprocess.check_call(
+ args=[
+ 'sgdisk',
+ '--new={part}'.format(part=journal_part),
+ '--change-name={num}:ceph journal'.format(num=num),
+ '--partition-guid={num}:{journal_uuid}'.format(
+ num=num,
+ journal_uuid=journal_uuid,
+ ),
+ '--typecode={num}:{uuid}'.format(
+ num=num,
+ uuid=ptype,
+ ),
+ '--',
+ journal,
+ ],
+ )
+ subprocess.call(
+ args=[
+ # wait for udev event queue to clear
+ 'udevadm',
+ 'settle',
+ '--timeout=10',
+ ],
+ )
+ subprocess.check_call(
+ args=[
+ # also make sure the kernel refreshes the new table
+ 'partprobe',
+ journal,
+ ],
+ )
+
+ journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format(
+ journal_uuid=journal_uuid,
+ )
+
+ journal_dmcrypt = None
+ if journal_dm_keypath:
+ journal_dmcrypt = journal_symlink
+ journal_symlink = '/dev/mapper/{uuid}'.format(uuid=journal_uuid)
+
+ LOG.debug('Journal is GPT partition %s', journal_symlink)
+ return (journal_symlink, journal_dmcrypt, journal_uuid)
+
+ except subprocess.CalledProcessError as e:
+ raise Error(e)
+
+
+def prepare_journal_file(
+ journal,
+ journal_size):
+
+ if not os.path.exists(journal):
+ LOG.debug('Creating journal file %s with size %dM', journal, journal_size)
+ with file(journal, 'wb') as journal_file:
+ journal_file.truncate(journal_size * 1048576)
+
+ # FIXME: should we resize an existing journal file?
+
+ LOG.debug('Journal is file %s', journal)
+ LOG.warning('OSD will not be hot-swappable if journal is not the same device as the osd data')
+ return (journal, None, None)
+
+
+def prepare_journal(
+ data,
+ journal,
+ journal_size,
+ journal_uuid,
+ force_file,
+ force_dev,
+ journal_dm_keypath,
+ ):
+
+ if journal is None:
+ if force_dev:
+ raise Error('Journal is unspecified; not a block device')
+ return (None, None, None)
+
+ if not os.path.exists(journal):
+ if force_dev:
+ raise Error('Journal does not exist; not a block device', journal)
+ return prepare_journal_file(journal, journal_size)
+
+ jmode = os.stat(journal).st_mode
+ if stat.S_ISREG(jmode):
+ if force_dev:
+ raise Error('Journal is not a block device', journal)
+ return prepare_journal_file(journal, journal_size)
+
+ if stat.S_ISBLK(jmode):
+ if force_file:
+ raise Error('Journal is not a regular file', journal)
+ return prepare_journal_dev(data, journal, journal_size, journal_uuid, journal_dm_keypath)
+
+ raise Error('Journal %s is neither a block device nor regular file', journal)
+
+
+def adjust_symlink(target, path):
+ create = True
+ if os.path.lexists(path):
+ try:
+ mode = os.lstat(path).st_mode
+ if stat.S_ISREG(mode):
+ LOG.debug('Removing old file %s', path)
+ os.unlink(path)
+ elif stat.S_ISLNK(mode):
+ old = os.readlink(path)
+ if old != target:
+ LOG.debug('Removing old symlink %s -> %s', path, old)
+ os.unlink(path)
+ else:
+ create = False
+ except:
+ raise Error('unable to remove (or adjust) old file (symlink)', path)
+ if create:
+ LOG.debug('Creating symlink %s -> %s', path, target)
+ try:
+ os.symlink(target, path)
+ except:
+ raise Error('unable to create symlink %s -> %s' % (path, target))
+
+def prepare_dir(
+ path,
+ journal,
+ cluster_uuid,
+ osd_uuid,
+ journal_uuid,
+ journal_dmcrypt = None,
+ ):
+ LOG.debug('Preparing osd data dir %s', path)
+
+ if osd_uuid is None:
+ osd_uuid = str(uuid.uuid4())
+
+ if journal is not None:
+ # we're using an external journal; point to it here
+ adjust_symlink(journal, os.path.join(path, 'journal'))
+
+ if journal_dmcrypt is not None:
+ adjust_symlink(journal_dmcrypt, os.path.join(path, 'journal_dmcrypt'))
+ else:
+ try:
+ os.unlink(os.path.join(path, 'journal_dmcrypt'))
+ except OSError:
+ pass
+
+ write_one_line(path, 'ceph_fsid', cluster_uuid)
+ write_one_line(path, 'fsid', osd_uuid)
+ write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC)
+
+ if journal_uuid is not None:
+ # i.e., journal is a tagged partition
+ write_one_line(path, 'journal_uuid', journal_uuid)
+
+def prepare_dev(
+ data,
+ journal,
+ fstype,
+ mkfs_args,
+ mount_options,
+ cluster_uuid,
+ osd_uuid,
+ journal_uuid,
+ journal_dmcrypt,
+ osd_dm_keypath,
+ ):
+ """
+ Prepare a data/journal combination to be used for an OSD.
+
+ The ``magic`` file is written last, so it's presence is a reliable
+ indicator of the whole sequence having completed.
+
+ WARNING: This will unconditionally overwrite anything given to
+ it.
+ """
+
+ ptype_tobe = TOBE_UUID
+ ptype_osd = OSD_UUID
+ if osd_dm_keypath:
+ ptype_tobe = DMCRYPT_TOBE_UUID
+ ptype_osd = DMCRYPT_OSD_UUID
+
+ rawdev = None
+ if is_partition(data):
+ LOG.debug('OSD data device %s is a partition', data)
+ rawdev = data
+ else:
+ LOG.debug('Creating osd partition on %s', data)
+ try:
+ subprocess.check_call(
+ args=[
+ 'sgdisk',
+ '--largest-new=1',
+ '--change-name=1:ceph data',
+ '--partition-guid=1:{osd_uuid}'.format(
+ osd_uuid=osd_uuid,
+ ),
+ '--typecode=1:%s' % ptype_tobe,
+ '--',
+ data,
+ ],
+ )
+ subprocess.call(
+ args=[
+ # wait for udev event queue to clear
+ 'udevadm',
+ 'settle',
+ '--timeout=10',
+ ],
+ )
+ subprocess.check_call(
+ args=[
+ # also make sure the kernel refreshes the new table
+ 'partprobe',
+ data,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ raise Error(e)
+
+ rawdev = '{data}1'.format(data=data)
+
+ dev = None
+ if osd_dm_keypath:
+ dev = dmcrypt_map(rawdev, osd_dm_keypath, osd_uuid)
+ else:
+ dev = rawdev
+
+ try:
+ args = [
+ 'mkfs',
+ '-t',
+ fstype,
+ ]
+ if mkfs_args is not None:
+ args.extend(mkfs_args.split())
+ if fstype == 'xfs':
+ args.extend(['-f']) # always force
+ else:
+ args.extend(MKFS_ARGS.get(fstype, []))
+ args.extend([
+ '--',
+ dev,
+ ])
+ try:
+ LOG.debug('Creating %s fs on %s', fstype, dev)
+ subprocess.check_call(args=args)
+ except subprocess.CalledProcessError as e:
+ raise Error(e)
+
+ #remove whitespaces from mount_options
+ if mount_options is not None:
+ mount_options = "".join(mount_options.split())
+
+ path = mount(dev=dev, fstype=fstype, options=mount_options)
+
+ try:
+ prepare_dir(
+ path=path,
+ journal=journal,
+ cluster_uuid=cluster_uuid,
+ osd_uuid=osd_uuid,
+ journal_uuid=journal_uuid,
+ journal_dmcrypt=journal_dmcrypt,
+ )
+ finally:
+ unmount(path)
+ finally:
+ if rawdev != dev:
+ dmcrypt_unmap(osd_uuid)
+
+ if not is_partition(data):
+ try:
+ subprocess.check_call(
+ args=[
+ 'sgdisk',
+ '--typecode=1:%s' % ptype_osd,
+ '--',
+ data,
+ ],
+ )
+ subprocess.call(
+ args=[
+ # wait for udev event queue to clear
+ 'udevadm',
+ 'settle',
+ '--timeout=10',
+ ],
+ )
+ subprocess.check_call(
+ args=[
+ # also make sure the kernel refreshes the new table
+ 'partprobe',
+ data,
+ ],
+ )
+ except subprocess.CalledProcessError as e:
+ raise Error(e)
+
+
+def main_prepare(args):
+ journal_dm_keypath = None
+ osd_dm_keypath = None
+
+ try:
+ if not os.path.exists(args.data):
+ raise Error('data path does not exist', args.data)
+
+ # in use?
+ dmode = os.stat(args.data).st_mode
+ if stat.S_ISBLK(dmode):
+ verify_not_in_use(args.data)
+
+ if args.journal and os.path.exists(args.journal):
+ jmode = os.stat(args.journal).st_mode
+ if stat.S_ISBLK(jmode):
+ verify_not_in_use(args.journal)
+
+ if args.zap_disk is not None:
+ if stat.S_ISBLK(dmode) and not is_partition(args.data):
+ zap(args.data)
+ else:
+ raise Error('not full block device; cannot zap', args.data)
+
+ if args.cluster_uuid is None:
+ args.cluster_uuid = get_fsid(cluster=args.cluster)
+ if args.cluster_uuid is None:
+ raise Error(
+ 'must have fsid in config or pass --cluster--uuid=',
+ )
+
+ if args.fs_type is None:
+ args.fs_type = get_conf(
+ cluster=args.cluster,
+ variable='osd_mkfs_type',
+ )
+ if args.fs_type is None:
+ args.fs_type = get_conf(
+ cluster=args.cluster,
+ variable='osd_fs_type',
+ )
+ if args.fs_type is None:
+ args.fs_type = DEFAULT_FS_TYPE
+
+ mkfs_args = get_conf(
+ cluster=args.cluster,
+ variable='osd_mkfs_options_{fstype}'.format(
+ fstype=args.fs_type,
+ ),
+ )
+ if mkfs_args is None:
+ mkfs_args = get_conf(
+ cluster=args.cluster,
+ variable='osd_fs_mkfs_options_{fstype}'.format(
+ fstype=args.fs_type,
+ ),
+ )
+
+ mount_options = get_conf(
+ cluster=args.cluster,
+ variable='osd_mount_options_{fstype}'.format(
+ fstype=args.fs_type,
+ ),
+ )
+ if mount_options is None:
+ mount_options = get_conf(
+ cluster=args.cluster,
+ variable='osd_fs_mount_options_{fstype}'.format(
+ fstype=args.fs_type,
+ ),
+ )
+
+ journal_size = get_conf_with_default(
+ cluster=args.cluster,
+ variable='osd_journal_size',
+ )
+ journal_size = int(journal_size)
+
+ # colocate journal with data?
+ if stat.S_ISBLK(dmode) and not is_partition(args.data) and args.journal is None and args.journal_file is None:
+ LOG.info('Will colocate journal with data on %s', args.data)
+ args.journal = args.data
+
+ if args.journal_uuid is None:
+ args.journal_uuid = str(uuid.uuid4())
+ if args.osd_uuid is None:
+ args.osd_uuid = str(uuid.uuid4())
+
+ # dm-crypt keys?
+ if args.dmcrypt:
+ journal_dm_keypath = get_or_create_dmcrypt_key(args.journal_uuid, args.dmcrypt_key_dir)
+ osd_dm_keypath = get_or_create_dmcrypt_key(args.osd_uuid, args.dmcrypt_key_dir)
+
+ # prepare journal
+ (journal_symlink, journal_dmcrypt, journal_uuid) = prepare_journal(
+ data=args.data,
+ journal=args.journal,
+ journal_size=journal_size,
+ journal_uuid=args.journal_uuid,
+ force_file=args.journal_file,
+ force_dev=args.journal_dev,
+ journal_dm_keypath=journal_dm_keypath,
+ )
+
+ # prepare data
+ if stat.S_ISDIR(dmode):
+ if args.data_dev:
+ raise Error('data path is not a block device', args.data)
+ prepare_dir(
+ path=args.data,
+ journal=journal_symlink,
+ cluster_uuid=args.cluster_uuid,
+ osd_uuid=args.osd_uuid,
+ journal_uuid=journal_uuid,
+ journal_dmcrypt=journal_dmcrypt,
+ )
+ elif stat.S_ISBLK(dmode):
+ if args.data_dir:
+ raise Error('data path is not a directory', args.data)
+ prepare_dev(
+ data=args.data,
+ journal=journal_symlink,
+ fstype=args.fs_type,
+ mkfs_args=mkfs_args,
+ mount_options=mount_options,
+ cluster_uuid=args.cluster_uuid,
+ osd_uuid=args.osd_uuid,
+ journal_uuid=journal_uuid,
+ journal_dmcrypt=journal_dmcrypt,
+ osd_dm_keypath=osd_dm_keypath,
+ )
+ else:
+ raise Error('not a dir or block device', args.data)
+
+ except Error as e:
+ if journal_dm_keypath:
+ os.unlink(journal_dm_keypath)
+ if osd_dm_keypath:
+ os.unlink(osd_dm_keypath)
+ raise e
+
+
+###########################
+
+
+def mkfs(
+ path,
+ cluster,
+ osd_id,
+ fsid,
+ keyring,
+ ):
+ monmap = os.path.join(path, 'activate.monmap')
+ subprocess.check_call(
+ args=[
+ '/usr/bin/ceph',
+ '--cluster', cluster,
+ '--name', 'client.bootstrap-osd',
+ '--keyring', keyring,
+ 'mon', 'getmap', '-o', monmap,
+ ],
+ )
+
+ subprocess.check_call(
+ args=[
+ '/usr/bin/ceph-osd',
+ '--cluster', cluster,
+ '--mkfs',
+ '--mkkey',
+ '-i', osd_id,
+ '--monmap', monmap,
+ '--osd-data', path,
+ '--osd-journal', os.path.join(path, 'journal'),
+ '--osd-uuid', fsid,
+ '--keyring', os.path.join(path, 'keyring'),
+ ],
+ )
+ # TODO ceph-osd --mkfs removes the monmap file?
+ # os.unlink(monmap)
+
+
+def auth_key(
+ path,
+ cluster,
+ osd_id,
+ keyring,
+ ):
+ subprocess.check_call(
+ args=[
+ '/usr/bin/ceph',
+ '--cluster', cluster,
+ '--name', 'client.bootstrap-osd',
+ '--keyring', keyring,
+ 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
+ '-i', os.path.join(path, 'keyring'),
+ 'osd', 'allow *',
+ 'mon', 'allow rwx',
+ ],
+ )
+
+
+def move_mount(
+ path,
+ cluster,
+ osd_id,
+ ):
+ LOG.debug('Moving mount to final location...')
+ parent = '/var/lib/ceph/osd'
+ osd_data = os.path.join(
+ parent,
+ '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id),
+ )
+ maybe_mkdir(osd_data)
+ subprocess.check_call(
+ args=[
+ '/bin/mount',
+ '--move',
+ '--',
+ path,
+ osd_data,
+ ],
+ )
+
+
+def start_daemon(
+ cluster,
+ osd_id,
+ ):
+ LOG.debug('Starting %s osd.%s...', cluster, osd_id)
+
+ path = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
+ cluster=cluster, osd_id=osd_id)
+
+ # upstart?
+ try:
+ if os.path.exists(os.path.join(path,'upstart')):
+ subprocess.check_call(
+ args=[
+ '/sbin/initctl',
+ # use emit, not start, because start would fail if the
+ # instance was already running
+ 'emit',
+ # since the daemon starting doesn't guarantee much about
+ # the service being operational anyway, don't bother
+ # waiting for it
+ '--no-wait',
+ '--',
+ 'ceph-osd',
+ 'cluster={cluster}'.format(cluster=cluster),
+ 'id={osd_id}'.format(osd_id=osd_id),
+ ],
+ )
+ elif os.path.exists(os.path.join(path, 'sysvinit')):
+ subprocess.check_call(
+ args=[
+ '/usr/sbin/service',
+ 'ceph',
+ 'start',
+ 'osd.{osd_id}'.format(osd_id=osd_id),
+ ],
+ )
+ else:
+ raise Error('{cluster} osd.{osd_id} is not tagged with an init system'.format(
+ cluster=cluster,
+ osd_id=osd_id,
+ ))
+ except subprocess.CalledProcessError as e:
+ raise Error('ceph osd start failed', e)
+
+def detect_fstype(
+ dev,
+ ):
+ fstype = _check_output(
+ args=[
+ '/sbin/blkid',
+ # we don't want stale cached results
+ '-p',
+ '-s', 'TYPE',
+ '-o' 'value',
+ '--',
+ dev,
+ ],
+ )
+ fstype = must_be_one_line(fstype)
+ return fstype
+
+
+def mount_activate(
+ dev,
+ activate_key_template,
+ init,
+ ):
+
+ try:
+ fstype = detect_fstype(dev=dev)
+ except (subprocess.CalledProcessError,
+ TruncatedLineError,
+ TooManyLinesError) as e:
+ raise FilesystemTypeError(
+ 'device {dev}'.format(dev=dev),
+ e,
+ )
+
+ # TODO always using mount options from cluster=ceph for
+ # now; see http://tracker.newdream.net/issues/3253
+ mount_options = get_conf(
+ cluster='ceph',
+ variable='osd_mount_options_{fstype}'.format(
+ fstype=fstype,
+ ),
+ )
+
+ if mount_options is None:
+ mount_options = get_conf(
+ cluster='ceph',
+ variable='osd_fs_mount_options_{fstype}'.format(
+ fstype=fstype,
+ ),
+ )
+
+ #remove whitespaces from mount_options
+ if mount_options is not None:
+ mount_options = "".join(mount_options.split())
+
+ path = mount(dev=dev, fstype=fstype, options=mount_options)
+
+ osd_id = None
+ cluster = None
+ try:
+ (osd_id, cluster) = activate(path, activate_key_template, init)
+
+ # check if the disk is already active, or if something else is already
+ # mounted there
+ active = False
+ other = False
+ src_dev = os.stat(path).st_dev
+ try:
+ dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
+ cluster=cluster,
+ osd_id=osd_id)).st_dev
+ if src_dev == dst_dev:
+ active = True
+ else:
+ parent_dev = os.stat('/var/lib/ceph/osd').st_dev
+ if dst_dev != parent_dev:
+ other = True
+ except OSError:
+ pass
+ if active:
+ LOG.info('%s osd.%s already mounted in position; unmounting ours.' % (cluster, osd_id))
+ unmount(path)
+ elif other:
+ raise Error('another %s osd.%s already mounted in position (old/different cluster instance?); unmounting ours.' % (cluster, osd_id))
+ else:
+ move_mount(
+ path=path,
+ cluster=cluster,
+ osd_id=osd_id,
+ )
+ return (cluster, osd_id)
+
+ except:
+ LOG.error('Failed to activate')
+ unmount(path)
+ raise
+ finally:
+ # remove our temp dir
+ if os.path.exists(path):
+ os.rmdir(path)
+
+
+def activate_dir(
+ path,
+ activate_key_template,
+ init,
+ ):
+
+ if not os.path.exists(path):
+ raise Error(
+ 'directory %s does not exist' % path
+ )
+
+ (osd_id, cluster) = activate(path, activate_key_template, init)
+ canonical = '/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
+ cluster=cluster,
+ osd_id=osd_id)
+ if path != canonical:
+ # symlink it from the proper location
+ create = True
+ if os.path.lexists(canonical):
+ old = os.readlink(canonical)
+ if old != path:
+ LOG.debug('Removing old symlink %s -> %s', canonical, old)
+ try:
+ os.unlink(canonical)
+ except:
+ raise Error('unable to remove old symlink %s', canonical)
+ else:
+ create = False
+ if create:
+ LOG.debug('Creating symlink %s -> %s', canonical, path)
+ try:
+ os.symlink(path, canonical)
+ except:
+ raise Error('unable to create symlink %s -> %s', canonical, path)
+
+ return (cluster, osd_id)
+
+
+def find_cluster_by_uuid(_uuid):
+ """
+ Find a cluster name by searching /etc/ceph/*.conf for a conf file
+ with the right uuid.
+ """
+ no_fsid = []
+ if not os.path.exists('/etc/ceph'):
+ return None
+ for conf_file in os.listdir('/etc/ceph'):
+ if not conf_file.endswith('.conf'):
+ continue
+ cluster = conf_file[:-5]
+ fsid = get_conf(cluster, 'fsid')
+ if fsid is None:
+ no_fsid.append(cluster)
+ elif fsid == _uuid:
+ return cluster
+ # be tolerant of /etc/ceph/ceph.conf without an fsid defined.
+ if len(no_fsid) == 1 and no_fsid[0] == 'ceph':
+ LOG.warning('No fsid defined in /etc/ceph/ceph.conf; using anyway')
+ return 'ceph'
+ return None
+
+def activate(
+ path,
+ activate_key_template,
+ init,
+ ):
+
+ try:
+ check_osd_magic(path)
+
+ ceph_fsid = read_one_line(path, 'ceph_fsid')
+ if ceph_fsid is None:
+ raise Error('No cluster uuid assigned.')
+ LOG.debug('Cluster uuid is %s', ceph_fsid)
+
+ cluster = find_cluster_by_uuid(ceph_fsid)
+ if cluster is None:
+ raise Error('No cluster conf found in /etc/ceph with fsid %s' % ceph_fsid)
+ LOG.debug('Cluster name is %s', cluster)
+
+ fsid = read_one_line(path, 'fsid')
+ if fsid is None:
+ raise Error('No OSD uuid assigned.')
+ LOG.debug('OSD uuid is %s', fsid)
+
+ keyring = activate_key_template.format(cluster=cluster)
+
+ osd_id = get_osd_id(path)
+ if osd_id is None:
+ osd_id = allocate_osd_id(
+ cluster=cluster,
+ fsid=fsid,
+ keyring=keyring,
+ )
+ write_one_line(path, 'whoami', osd_id)
+ LOG.debug('OSD id is %s', osd_id)
+
+ if not os.path.exists(os.path.join(path, 'ready')):
+ LOG.debug('Initializing OSD...')
+ # re-running mkfs is safe, so just run until it completes
+ mkfs(
+ path=path,
+ cluster=cluster,
+ osd_id=osd_id,
+ fsid=fsid,
+ keyring=keyring,
+ )
+
+ if init is not None:
+ if init == 'auto':
+ conf_val = get_conf(
+ cluster=cluster,
+ variable='init'
+ )
+ if conf_val is not None:
+ init = conf_val
+ else:
+ (distro, release, codename) = platform.dist()
+ if distro == 'Ubuntu':
+ init = 'upstart'
+ else:
+ init = 'sysvinit'
+
+ LOG.debug('Marking with init system %s', init)
+ with file(os.path.join(path, init), 'w'):
+ pass
+
+ # remove markers for others, just in case.
+ for other in INIT_SYSTEMS:
+ if other != init:
+ try:
+ os.unlink(os.path.join(path, other))
+ except OSError:
+ pass
+
+ if not os.path.exists(os.path.join(path, 'active')):
+ LOG.debug('Authorizing OSD key...')
+ auth_key(
+ path=path,
+ cluster=cluster,
+ osd_id=osd_id,
+ keyring=keyring,
+ )
+ write_one_line(path, 'active', 'ok')
+ LOG.debug('%s osd.%s data dir is ready at %s', cluster, osd_id, path)
+ return (osd_id, cluster)
+ except:
+ raise
+
+
+
+def main_activate(args):
+ cluster = None
+ osd_id = None
+
+ if not os.path.exists(args.path):
+ raise Error('%s does not exist', args.path)
+
+ mode = os.stat(args.path).st_mode
+ if stat.S_ISBLK(mode):
+ (cluster, osd_id) = mount_activate(
+ dev=args.path,
+ activate_key_template=args.activate_key_template,
+ init=args.mark_init,
+ )
+ elif stat.S_ISDIR(mode):
+ (cluster, osd_id) = activate_dir(
+ path=args.path,
+ activate_key_template=args.activate_key_template,
+ init=args.mark_init,
+ )
+ else:
+ raise Error('%s is not a directory or block device', args.path)
+
+ start_daemon(
+ cluster=cluster,
+ osd_id=osd_id,
+ )
+
+
+
+###########################
+
+def is_swap(dev):
+ dev = os.path.realpath(dev)
+ with file('/proc/swaps', 'rb') as proc_swaps:
+ for line in proc_swaps.readlines()[1:]:
+ fields = line.split()
+ if len(fields) < 3:
+ continue
+ swaps_dev = fields[0]
+ if swaps_dev.startswith('/') and os.path.exists(swaps_dev):
+ swaps_dev = os.path.realpath(swaps_dev)
+ if swaps_dev == dev:
+ return True
+ return False
+
+def get_oneliner(base, name):
+ path = os.path.join(base, name)
+ if os.path.isfile(path):
+ with open(path, 'r') as _file:
+ return _file.readline().rstrip()
+ return None
+
+def get_dev_fs(dev):
+ fscheck = subprocess.Popen(
+ [
+ 'blkid',
+ '-s',
+ 'TYPE',
+ dev
+ ],
+ stdout = subprocess.PIPE,
+ stderr=subprocess.PIPE).stdout.read()
+ if 'TYPE' in fscheck:
+ fstype = fscheck.split()[1].split('"')[1]
+ return fstype
+ else:
+ return None
+
+def get_partition_type(part):
+ (base, partnum) = re.match('(\D+)(\d+)', part).group(1, 2)
+ sgdisk = subprocess.Popen(
+ [
+ 'sgdisk',
+ '-p',
+ base,
+ ],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE).stdout.read()
+ for line in sgdisk.splitlines():
+ m = re.search('\s+(\d+)\s+\d+\s+\d+\s+\S+ \S+B\s+\S+\s+(.*)', line)
+ if m is not None:
+ num = m.group(1)
+ if num != partnum:
+ continue
+ return m.group(2)
+ return None
+
+def get_partition_uuid(dev):
+ (base, partnum) = re.match('(\D+)(\d+)', dev).group(1, 2)
+ out = subprocess.Popen(
+ [ 'sgdisk', '-i', partnum, base ],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE).stdout.read()
+ for line in out.splitlines():
+ m = re.match('Partition unique GUID: (\S+)', line)
+ if m:
+ return m.group(1).lower()
+ return None
+
+def more_osd_info(path, uuid_map):
+ desc = []
+ ceph_fsid = get_oneliner(path, 'ceph_fsid')
+ if ceph_fsid:
+ cluster = find_cluster_by_uuid(ceph_fsid)
+ if cluster:
+ desc.append('cluster ' + cluster)
+ else:
+ desc.append('unknown cluster ' + ceph_fsid)
+
+ who = get_oneliner(path, 'whoami')
+ if who:
+ desc.append('osd.%s' % who)
+
+ journal_uuid = get_oneliner(path, 'journal_uuid')
+ if journal_uuid:
+ journal_uuid = journal_uuid.lower()
+ if journal_uuid in uuid_map:
+ desc.append('journal %s' % uuid_map[journal_uuid])
+
+ return desc
+
+
+def list_dev(dev, uuid_map, journal_map):
+ ptype = 'unknown'
+ prefix = ''
+ if is_partition(dev):
+ ptype = get_partition_type(dev)
+ prefix = ' '
+ fs_type = get_dev_fs(dev)
+ path = is_mounted(dev)
+
+ desc = []
+ if ptype == 'ceph data':
+ if path:
+ desc.append('active')
+ desc.extend(more_osd_info(path, uuid_map))
+ elif fs_type:
+ try:
+ tpath = mount(dev=dev, fstype=fs_type, options='')
+ if tpath:
+ try:
+ magic = get_oneliner(tpath, 'magic')
+ if magic is not None:
+ desc.append('prepared')
+ desc.extend(more_osd_info(tpath, uuid_map))
+ finally:
+ unmount(tpath)
+ except MountError:
+ pass
+ if desc:
+ desc = ['ceph data'] + desc
+ else:
+ desc = ['ceph data', 'unprepared']
+ elif ptype == 'ceph journal':
+ desc.append('ceph journal')
+ part_uuid = get_partition_uuid(dev)
+ if part_uuid and part_uuid in journal_map:
+ desc.append('for %s' % journal_map[part_uuid])
+ else:
+ if is_swap(dev):
+ desc.append('swap')
+ else:
+ desc.append('other')
+ if fs_type:
+ desc.append(fs_type)
+ elif ptype:
+ desc.append(ptype)
+ if path:
+ desc.append('mounted on %s' % path)
+
+ print '%s%s %s' % (prefix, dev, ', '.join(desc))
+
+
+
+def main_list(args):
+ partmap = list_all_partitions()
+
+ uuid_map = {}
+ journal_map = {}
+ for base, parts in sorted(partmap.iteritems()):
+ for p in parts:
+ dev = '/dev/' + p
+ part_uuid = get_partition_uuid(dev)
+ if part_uuid:
+ uuid_map[part_uuid] = dev
+ ptype = get_partition_type(dev)
+ if ptype == 'ceph data':
+ fs_type = get_dev_fs(dev)
+ try:
+ tpath = mount(dev=dev, fstype=fs_type, options='')
+ try:
+ journal_uuid = get_oneliner(tpath, 'journal_uuid')
+ if journal_uuid:
+ journal_map[journal_uuid.lower()] = dev
+ finally:
+ unmount(tpath)
+ except MountError:
+ pass
+
+ for base, parts in sorted(partmap.iteritems()):
+ if parts:
+ print '/dev/%s :' % base
+ for p in sorted(parts):
+ list_dev('/dev/' + p, uuid_map, journal_map)
+ else:
+ list_dev('/dev/' + base, uuid_map, journal_map)
+
+
+###########################
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(
+ 'ceph-disk',
+ )
+ parser.add_argument(
+ '-v', '--verbose',
+ action='store_true', default=None,
+ help='be more verbose',
+ )
+ parser.set_defaults(
+ # we want to hold on to this, for later
+ prog=parser.prog,
+ cluster='ceph',
+ )
+
+ subparsers = parser.add_subparsers(
+ title='subcommands',
+ description='valid subcommands',
+ help='sub-command help',
+ )
+
+ prepare_parser = subparsers.add_parser('prepare', help='Prepare a directory or disk for a Ceph OSD')
+ prepare_parser.add_argument(
+ '--cluster',
+ metavar='NAME',
+ help='cluster name to assign this disk to',
+ )
+ prepare_parser.add_argument(
+ '--cluster-uuid',
+ metavar='UUID',
+ help='cluster uuid to assign this disk to',
+ )
+ prepare_parser.add_argument(
+ '--osd-uuid',
+ metavar='UUID',
+ help='unique OSD uuid to assign this disk to',
+ )
+ prepare_parser.add_argument(
+ '--journal-uuid',
+ metavar='UUID',
+ help='unique uuid to assign to the journal',
+ )
+ prepare_parser.add_argument(
+ '--fs-type',
+ help='file system type to use (e.g. "ext4")',
+ )
+ prepare_parser.add_argument(
+ '--zap-disk',
+ action='store_true', default=None,
+ help='destroy the partition table (and content) of a disk',
+ )
+ prepare_parser.add_argument(
+ '--data-dir',
+ action='store_true', default=None,
+ help='verify that DATA is a dir',
+ )
+ prepare_parser.add_argument(
+ '--data-dev',
+ action='store_true', default=None,
+ help='verify that DATA is a block device',
+ )
+ prepare_parser.add_argument(
+ '--journal-file',
+ action='store_true', default=None,
+ help='verify that JOURNAL is a file',
+ )
+ prepare_parser.add_argument(
+ '--journal-dev',
+ action='store_true', default=None,
+ help='verify that JOURNAL is a block device',
+ )
+ prepare_parser.add_argument(
+ '--dmcrypt',
+ action='store_true', default=None,
+ help='encrypt DATA and/or JOURNAL devices with dm-crypt',
+ )
+ prepare_parser.add_argument(
+ '--dmcrypt-key-dir',
+ metavar='KEYDIR',
+ default='/etc/ceph/dmcrypt-keys',
+ help='directory where dm-crypt keys are stored',
+ )
+ prepare_parser.add_argument(
+ 'data',
+ metavar='DATA',
+ help='path to OSD data (a disk block device or directory)',
+ )
+ prepare_parser.add_argument(
+ 'journal',
+ metavar='JOURNAL',
+ nargs='?',
+ help=('path to OSD journal disk block device;'
+ + ' leave out to store journal in file'),
+ )
+ prepare_parser.set_defaults(
+ func=main_prepare,
+ )
+
+ activate_parser = subparsers.add_parser('activate', help='Activate a Ceph OSD')
+ activate_parser.add_argument(
+ '--mount',
+ action='store_true', default=None,
+ help='mount a block device [deprecated, ignored]',
+ )
+ activate_parser.add_argument(
+ '--activate-key',
+ metavar='PATH',
+ help='bootstrap-osd keyring path template (%(default)s)',
+ dest='activate_key_template',
+ )
+ activate_parser.add_argument(
+ '--mark-init',
+ metavar='INITSYSTEM',
+ help='init system to manage this dir',
+ default='auto',
+ choices=INIT_SYSTEMS,
+ )
+ activate_parser.add_argument(
+ 'path',
+ metavar='PATH',
+ nargs='?',
+ help='path to block device or directory',
+ )
+ activate_parser.set_defaults(
+ activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring',
+ func=main_activate,
+ )
+
+ list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs')
+ list_parser.set_defaults(
+ func=main_list,
+ )
+
+ args = parser.parse_args()
+ return args
+
+
+def main():
+ args = parse_args()
+
+ loglevel = logging.INFO
+ if args.verbose:
+ loglevel = logging.DEBUG
+
+ logging.basicConfig(
+ level=loglevel,
+ )
+
+ try:
+ args.func(args)
+
+ except Error as e:
+ print >> sys.stderr, '{prog}: {msg}'.format(
+ prog=args.prog,
+ msg=e,
+ )
+ sys.exit(1)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/ceph-disk-activate b/src/ceph-disk-activate
index f78ae17ce88..72e89f9af30 100755
--- a/src/ceph-disk-activate
+++ b/src/ceph-disk-activate
@@ -1,584 +1,3 @@
-#!/usr/bin/python
-
-import argparse
-import errno
-import logging
-import os
-import os.path
-import re
-import subprocess
-import sys
-import tempfile
-
-
-log_name = __name__
-if log_name == '__main__':
- log_name = os.path.basename(sys.argv[0])
-log = logging.getLogger(log_name)
-
-
-class ActivateError(Exception):
- """
- OSD activation error
- """
-
- def __str__(self):
- doc = self.__doc__.strip()
- return ': '.join([doc] + [str(a) for a in self.args])
-
-
-class BadMagicError(ActivateError):
- """
- Does not look like a Ceph OSD, or incompatible version
- """
-
-
-class TruncatedLineError(ActivateError):
- """
- Line is truncated
- """
-
-
-class TooManyLinesError(ActivateError):
- """
- Too many lines
- """
-
-
-class FilesystemTypeError(ActivateError):
- """
- Cannot discover filesystem type
- """
-
-
-class MountError(ActivateError):
- """
- Mounting filesystem failed
- """
-
-
-class UnmountError(ActivateError):
- """
- Unmounting filesystem failed
- """
-
-
-def maybe_mkdir(*a, **kw):
- try:
- os.mkdir(*a, **kw)
- except OSError, e:
- if e.errno == errno.EEXIST:
- pass
- else:
- raise
-
-
-def must_be_one_line(line):
- if line[-1:] != '\n':
- raise TruncatedLineError(line)
- line = line[:-1]
- if '\n' in line:
- raise TooManyLinesError(line)
- return line
-
-
-def read_one_line(parent, name):
- """
- Read a file whose sole contents are a single line.
-
- Strips the newline.
-
- :return: Contents of the line, or None if file did not exist.
- """
- path = os.path.join(parent, name)
- try:
- line = file(path, 'rb').read()
- except IOError as e:
- if e.errno == errno.ENOENT:
- return None
- else:
- raise
-
- try:
- line = must_be_one_line(line)
- except (TruncatedLineError, TooManyLinesError) as e:
- raise ActivateError('File is corrupt: {path}: {msg}'.format(
- path=path,
- msg=e,
- ))
- return line
-
-
-def write_one_line(parent, name, text):
- """
- Write a file whose sole contents are a single line.
-
- Adds a newline.
- """
- path = os.path.join(parent, name)
- tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
- with file(tmp, 'wb') as f:
- f.write(text + '\n')
- os.fsync(f.fileno())
- os.rename(tmp, path)
-
-
-CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'
-
-
-def check_osd_magic(path):
- """
- Check that this path has the Ceph OSD magic.
-
- :raises: BadMagicError if this does not look like a Ceph OSD data
- dir.
- """
- magic = read_one_line(path, 'magic')
- if magic is None:
- # probably not mkfs'ed yet
- raise BadMagicError(path)
- if magic != CEPH_OSD_ONDISK_MAGIC:
- raise BadMagicError(path)
-
-
-def check_osd_id(osd_id):
- """
- Ensures osd id is numeric.
- """
- if not re.match(r'^[0-9]+$', osd_id):
- raise ActivateError('osd id is not numeric')
-
-
-def get_osd_id(path):
- osd_id = read_one_line(path, 'whoami')
- if osd_id is not None:
- check_osd_id(osd_id)
- return osd_id
-
-
-# TODO depend on python2.7
-def _check_output(*args, **kwargs):
- process = subprocess.Popen(
- stdout=subprocess.PIPE,
- *args, **kwargs)
- out, _ = process.communicate()
- ret = process.wait()
- if ret:
- cmd = kwargs.get("args")
- if cmd is None:
- cmd = args[0]
- raise subprocess.CalledProcessError(ret, cmd, output=out)
- return out
-
-
-def allocate_osd_id(
- cluster,
- fsid,
- keyring,
- ):
- log.debug('Allocating OSD id...')
- try:
- osd_id = _check_output(
- args=[
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', keyring,
- 'osd', 'create', '--concise',
- fsid,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise ActivateError('ceph osd create failed', e)
- osd_id = must_be_one_line(osd_id)
- check_osd_id(osd_id)
- return osd_id
-
-
-def mkfs(
- path,
- cluster,
- osd_id,
- fsid,
- keyring,
- ):
- monmap = os.path.join(path, 'activate.monmap')
- subprocess.check_call(
- args=[
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', keyring,
- 'mon', 'getmap', '-o', monmap,
- ],
- )
-
- subprocess.check_call(
- args=[
- 'ceph-osd',
- '--cluster', cluster,
- '--mkfs',
- '--mkkey',
- '-i', osd_id,
- '--monmap', monmap,
- '--osd-data', path,
- '--osd-journal', os.path.join(path, 'journal'),
- '--osd-uuid', fsid,
- '--keyring', os.path.join(path, 'keyring'),
- ],
- )
- # TODO ceph-osd --mkfs removes the monmap file?
- # os.unlink(monmap)
-
-
-def auth_key(
- path,
- cluster,
- osd_id,
- keyring,
- ):
- subprocess.check_call(
- args=[
- 'ceph',
- '--cluster', cluster,
- '--name', 'client.bootstrap-osd',
- '--keyring', keyring,
- 'auth', 'add', 'osd.{osd_id}'.format(osd_id=osd_id),
- '-i', os.path.join(path, 'keyring'),
- 'osd', 'allow *',
- 'mon', 'allow rwx',
- ],
- )
-
-
-def move_mount(
- path,
- cluster,
- osd_id,
- ):
- log.debug('Moving mount to final location...')
- parent = '/var/lib/ceph/osd'
- osd_data = os.path.join(
- parent,
- '{cluster}-{osd_id}'.format(cluster=cluster, osd_id=osd_id),
- )
- maybe_mkdir(osd_data)
- subprocess.check_call(
- args=[
- 'mount',
- '--move',
- '--',
- path,
- osd_data,
- ],
- )
-
-
-def upstart_start(
- cluster,
- osd_id,
- ):
- log.debug('Starting service...')
- subprocess.check_call(
- args=[
- 'initctl',
- # use emit, not start, because start would fail if the
- # instance was already running
- 'emit',
- # since the daemon starting doesn't guarantee much about
- # the service being operational anyway, don't bother
- # waiting for it
- '--no-wait',
- '--',
- 'ceph-osd',
- 'cluster={cluster}'.format(cluster=cluster),
- 'id={osd_id}'.format(osd_id=osd_id),
- ],
- )
-
-
-def detect_fstype(
- dev,
- ):
- fstype = _check_output(
- args=[
- 'blkid',
- # we don't want stale cached results
- '-p',
- '-s', 'TYPE',
- '-o' 'value',
- '--',
- dev,
- ],
- )
- fstype = must_be_one_line(fstype)
- return fstype
-
-
-def get_conf(cluster, variable):
- try:
- p = subprocess.Popen(
- args=[
- 'ceph-conf',
- '--cluster={cluster}'.format(
- cluster=cluster,
- ),
- '--name=osd.',
- '--lookup',
- variable,
- ],
- stdout=subprocess.PIPE,
- close_fds=True,
- )
- except OSError as e:
- raise ActivateError('error executing ceph-conf', e)
- (out, _err) = p.communicate()
- ret = p.wait()
- if ret == 1:
- # config entry not found
- return None
- elif ret != 0:
- raise ActivateError('getting variable from configuration failed')
- value = out.split('\n', 1)[0]
- # don't differentiate between "var=" and no var set
- if not value:
- return None
- return value
-
-
-MOUNT_OPTIONS = dict(
- btrfs='noatime,user_subvol_rm_allowed',
- # user_xattr is default ever since linux 2.6.39 / 3.0, but we'll
- # delay a moment before removing it fully because we did have some
- # issues with ext4 before the xatts-in-leveldb work, and it seemed
- # that user_xattr helped
- ext4='noatime,user_xattr',
- xfs='noatime',
- )
-
-
-def mount(
- dev,
- fstype,
- options,
- ):
- # pick best-of-breed mount options based on fs type
- if options is None:
- options = MOUNT_OPTIONS.get(fstype, '')
-
- # mount
- path = tempfile.mkdtemp(
- prefix='mnt.',
- dir='/var/lib/ceph/tmp',
- )
- try:
- subprocess.check_call(
- args=[
- 'mount',
- '-o', options,
- '--',
- dev,
- path,
- ],
- )
- except subprocess.CalledProcessError as e:
- try:
- os.rmdir(path)
- except (OSError, IOError):
- pass
- raise MountError(e)
-
- return path
-
-
-def unmount(
- path,
- ):
- try:
- subprocess.check_call(
- args=[
- 'umount',
- '--',
- path,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise UnmountError(e)
-
-
-def activate(
- path,
- activate_key_template,
- do_mount,
- ):
-
- if do_mount:
- try:
- fstype = detect_fstype(dev=path)
- except (subprocess.CalledProcessError,
- TruncatedLineError,
- TooManyLinesError) as e:
- raise FilesystemTypeError(
- 'device {dev}'.format(dev=path),
- e,
- )
-
- mount_options = get_conf(
- # TODO always using mount options from cluster=ceph for
- # now; see http://tracker.newdream.net/issues/3253
- cluster='ceph',
- variable='osd_fs_mount_options_{fstype}'.format(
- fstype=fstype,
- ),
- )
-
- path = mount(dev=path, fstype=fstype, options=mount_options)
-
- try:
- check_osd_magic(path)
-
- ceph_fsid = read_one_line(path, 'ceph_fsid')
- if ceph_fsid is None:
- raise ActivateError('No cluster uuid assigned.')
- log.debug('Cluster uuid is %s', ceph_fsid)
-
- # TODO use ceph_fsid to find the right cluster
- cluster = 'ceph'
- log.debug('Cluster name is %s', cluster)
-
- fsid = read_one_line(path, 'fsid')
- if fsid is None:
- raise ActivateError('No OSD uuid assigned.')
- log.debug('OSD uuid is %s', fsid)
-
- keyring = activate_key_template.format(cluster=cluster)
-
- osd_id = get_osd_id(path)
- if osd_id is None:
- osd_id = allocate_osd_id(
- cluster=cluster,
- fsid=fsid,
- keyring=keyring,
- )
- write_one_line(path, 'whoami', osd_id)
- log.debug('OSD id is %s', osd_id)
-
- if not os.path.exists(os.path.join(path, 'ready')):
- log.debug('Initializing OSD...')
- # re-running mkfs is safe, so just run until it completes
- mkfs(
- path=path,
- cluster=cluster,
- osd_id=osd_id,
- fsid=fsid,
- keyring=keyring,
- )
-
- # indicate this daemon is managed by upstart
- if not os.path.exists(os.path.join(path, 'upstart')):
- with file(os.path.join(path, 'upstart'), 'w'):
- pass
-
- if not os.path.exists(os.path.join(path, 'active')):
- log.debug('Authorizing OSD key...')
- auth_key(
- path=path,
- cluster=cluster,
- osd_id=osd_id,
- keyring=keyring,
- )
- write_one_line(path, 'active', 'ok')
-
- # check if the disk is already active
- active = False
- src_dev = os.stat(path).st_dev
- try:
- dst_dev = os.stat('/var/lib/ceph/osd/{cluster}-{osd_id}'.format(
- cluster=cluster,
- osd_id=osd_id)).st_dev
- if src_dev == dst_dev:
- active = True
- except:
- pass
- if active:
- log.debug('OSD already mounted')
- unmount(path)
- else:
- move_mount(
- path=path,
- cluster=cluster,
- osd_id=osd_id,
- )
- except:
- unmount(path)
- finally:
- if do_mount:
- # if we created a temp dir to mount it, remove it
- os.rmdir(path)
-
- upstart_start(
- cluster=cluster,
- osd_id=osd_id,
- )
-
-
-def parse_args():
- parser = argparse.ArgumentParser(
- description='Activate a Ceph OSD',
- )
- parser.add_argument(
- '-v', '--verbose',
- action='store_true', default=None,
- help='be more verbose',
- )
- parser.add_argument(
- '--mount',
- action='store_true', default=None,
- help='mount the device first',
- )
- parser.add_argument(
- '--activate-key',
- metavar='PATH',
- help='bootstrap-osd keyring path template (%(default)s)',
- dest='activate_key_template',
- )
- parser.add_argument(
- 'path',
- metavar='PATH',
- help='path to OSD data directory, or block device if using --mount',
- )
- parser.set_defaults(
- activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring',
- # we want to hold on to this, for later
- prog=parser.prog,
- )
- args = parser.parse_args()
- return args
-
-
-def main():
- args = parse_args()
-
- loglevel = logging.INFO
- if args.verbose:
- loglevel = logging.DEBUG
-
- logging.basicConfig(
- level=loglevel,
- )
-
- try:
- activate(
- path=args.path,
- activate_key_template=args.activate_key_template,
- do_mount=args.mount,
- )
- except ActivateError as e:
- print >>sys.stderr, '{prog}: {msg}'.format(
- prog=args.prog,
- msg=e,
- )
- sys.exit(1)
-
-if __name__ == '__main__':
- main()
+#!/bin/sh
+dir=`dirname $0`
+$dir/ceph-disk activate $*
diff --git a/src/ceph-disk-prepare b/src/ceph-disk-prepare
index e5c4bdb9050..f9255eb8831 100755
--- a/src/ceph-disk-prepare
+++ b/src/ceph-disk-prepare
@@ -1,529 +1,3 @@
-#!/usr/bin/python
-
-import argparse
-import logging
-import os
-import os.path
-import subprocess
-import sys
-import tempfile
-import uuid
-
-
-log_name = __name__
-if log_name == '__main__':
- log_name = os.path.basename(sys.argv[0])
-log = logging.getLogger(log_name)
-
-
-class PrepareError(Exception):
- """
- OSD preparation error
- """
-
- def __str__(self):
- doc = self.__doc__.strip()
- return ': '.join([doc] + [str(a) for a in self.args])
-
-
-class MountError(PrepareError):
- """
- Mounting filesystem failed
- """
-
-
-class UnmountError(PrepareError):
- """
- Unmounting filesystem failed
- """
-
-
-def write_one_line(parent, name, text):
- """
- Write a file whose sole contents are a single line.
-
- Adds a newline.
- """
- path = os.path.join(parent, name)
- tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
- with file(tmp, 'wb') as f:
- f.write(text + '\n')
- os.fsync(f.fileno())
- os.rename(tmp, path)
-
-
-CEPH_OSD_ONDISK_MAGIC = 'ceph osd volume v026'
-
-JOURNAL_UUID = '45b0969e-9b03-4f30-b4c6-b4b80ceff106'
-
-
-# TODO depend on python2.7
-def _check_output(*args, **kwargs):
- process = subprocess.Popen(
- stdout=subprocess.PIPE,
- *args, **kwargs)
- out, _ = process.communicate()
- ret = process.wait()
- if ret:
- cmd = kwargs.get("args")
- if cmd is None:
- cmd = args[0]
- raise subprocess.CalledProcessError(ret, cmd, output=out)
- return out
-
-
-def get_conf(cluster, variable):
- try:
- p = subprocess.Popen(
- args=[
- 'ceph-conf',
- '--cluster={cluster}'.format(
- cluster=cluster,
- ),
- '--name=osd.',
- '--lookup',
- variable,
- ],
- stdout=subprocess.PIPE,
- close_fds=True,
- )
- except OSError as e:
- raise PrepareError('error executing ceph-conf', e)
- (out, _err) = p.communicate()
- ret = p.wait()
- if ret == 1:
- # config entry not found
- return None
- elif ret != 0:
- raise PrepareError('getting variable from configuration failed')
- value = out.split('\n', 1)[0]
- # don't differentiate between "var=" and no var set
- if not value:
- return None
- return value
-
-
-def get_conf_with_default(cluster, variable):
- """
- Get a config value that is known to the C++ code.
-
- This will fail if called on variables that are not defined in
- common config options.
- """
- try:
- out = _check_output(
- args=[
- 'ceph-osd',
- '--cluster={cluster}'.format(
- cluster=cluster,
- ),
- '--show-config-value={variable}'.format(
- variable=variable,
- ),
- ],
- close_fds=True,
- )
- except subprocess.CalledProcessError as e:
- raise PrepareError(
- 'getting variable from configuration failed',
- e,
- )
-
- value = out.split('\n', 1)[0]
- return value
-
-
-def get_fsid(cluster):
- fsid = get_conf(cluster=cluster, variable='fsid')
- if fsid is None:
- raise PrepareError('getting cluster uuid from configuration failed')
- return fsid
-
-
-DEFAULT_FS_TYPE = 'xfs'
-
-MOUNT_OPTIONS = dict(
- btrfs='noatime,user_subvol_rm_allowed',
- ext4='noatime,user_xattr',
- xfs='noatime',
- )
-
-MKFS_ARGS = dict(
- btrfs=[
- '-m', 'single',
- '-l', '32768',
- '-n', '32768',
- ],
- xfs=[
- # xfs insists on not overwriting previous fs; even if we wipe
- # partition table, we often recreate it exactly the same way,
- # so we'll see ghosts of filesystems past
- '-f',
- '-i', 'size=2048',
- ],
- )
-
-
-def mount(
- dev,
- fstype,
- options,
- ):
- # pick best-of-breed mount options based on fs type
- if options is None:
- options = MOUNT_OPTIONS.get(fstype, '')
-
- # mount
- path = tempfile.mkdtemp(
- prefix='mnt.',
- dir='/var/lib/ceph/tmp',
- )
- try:
- subprocess.check_call(
- args=[
- 'mount',
- '-o', options,
- '--',
- dev,
- path,
- ],
- )
- except subprocess.CalledProcessError as e:
- try:
- os.rmdir(path)
- except (OSError, IOError):
- pass
- raise MountError(e)
-
- return path
-
-
-def unmount(
- path,
- ):
- try:
- subprocess.check_call(
- args=[
- 'umount',
- '--',
- path,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise UnmountError(e)
-
- os.rmdir(path)
-
-
-def get_free_partition_index(dev):
- try:
- lines = _check_output(
- args=[
- 'parted',
- '--machine',
- '--',
- dev,
- 'print',
- ],
- )
- except subprocess.CalledProcessError as e:
- print 'cannot read partition index; assume it isn\'t present\n'
- return 1
-
- if not lines:
- raise PrepareError('parted failed to output anything')
- lines = lines.splitlines(True)
-
- if lines[0] not in ['CHS;\n', 'CYL;\n', 'BYT;\n']:
- raise PrepareError('weird parted units', lines[0])
- del lines[0]
-
- if not lines[0].startswith('/dev/'):
- raise PrepareError('weird parted disk entry', lines[0])
- del lines[0]
-
- seen = set()
- for line in lines:
- idx, _ = line.split(':', 1)
- idx = int(idx)
- seen.add(idx)
-
- num = 1
- while num in seen:
- num += 1
- return num
-
-
-def prepare(
- disk,
- journal,
- journal_size,
- fstype,
- mkfs_args,
- mount_options,
- cluster_uuid,
- ):
- """
- Prepare a disk to be used as an OSD data disk.
-
- The ``magic`` file is written last, so it's presence is a reliable
- indicator of the whole sequence having completed.
-
- WARNING: This will unconditionally overwrite anything given to
- it.
- """
-
- try:
- # this kills the crab
- subprocess.check_call(
- args=[
- 'sgdisk',
- '--zap-all',
- '--clear',
- '--mbrtogpt',
- '--',
- disk,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise PrepareError(e)
-
- osd_uuid = str(uuid.uuid4())
-
- # store the partition uuid iff using external journal
- journal_uuid = None
-
- if journal is not None:
- journal_uuid = str(uuid.uuid4())
-
- if journal == disk:
- # we're sharing the disk between osd data and journal;
- # make journal be partition number 2, so it's pretty; put
- # journal at end of free space so partitioning tools don't
- # reorder them suddenly
- num = 2
- journal_part = '{num}:-{size}M:0'.format(
- num=num,
- size=journal_size,
- )
- else:
- # sgdisk has no way for me to say "whatever is the next
- # free index number" when setting type guids etc, so we
- # need to awkwardly look up the next free number, and then
- # fix that in the call -- and hope nobody races with us;
- # then again nothing guards the partition table from races
- # anyway
- num = get_free_partition_index(dev=journal)
- journal_part = '{num}:0:+{size}M'.format(
- num=num,
- size=journal_size,
- )
-
- try:
- subprocess.check_call(
- args=[
- 'sgdisk',
- '--new={part}'.format(part=journal_part),
- '--change-name={num}:ceph journal'.format(num=num),
- '--partition-guid={num}:{journal_uuid}'.format(
- num=num,
- journal_uuid=journal_uuid,
- ),
- '--typecode={num}:{uuid}'.format(
- num=num,
- uuid=JOURNAL_UUID,
- ),
- '--',
- journal,
- ],
- )
- subprocess.check_call(
- args=[
- # also make sure the kernel refreshes the new table
- 'partprobe',
- journal,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise PrepareError(e)
-
- try:
- subprocess.check_call(
- args=[
- 'sgdisk',
- '--largest-new=1',
- '--change-name=1:ceph data',
- '--partition-guid=1:{osd_uuid}'.format(
- osd_uuid=osd_uuid,
- ),
- '--typecode=1:89c57f98-2fe5-4dc0-89c1-f3ad0ceff2be',
- '--',
- disk,
- ],
- )
- subprocess.check_call(
- args=[
- # also make sure the kernel refreshes the new table
- 'partprobe',
- disk,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise PrepareError(e)
-
- dev = '{disk}1'.format(disk=disk)
- args = [
- 'mkfs',
- '--type={fstype}'.format(fstype=fstype),
- ]
- args.extend(MKFS_ARGS.get(fstype, []))
- if mkfs_args is not None:
- args.extend(mkfs_args.split())
- args.extend
- args.extend([
- '--',
- dev,
- ])
- try:
- subprocess.check_call(args=args)
- except subprocess.CalledProcessError as e:
- raise PrepareError(e)
-
- path = mount(dev=dev, fstype=fstype, options=mount_options)
- try:
- if journal_uuid is not None:
- # we're using an external journal; point to it here
- os.symlink(
- '/dev/disk/by-partuuid/{journal_uuid}'.format(
- journal_uuid=journal_uuid,
- ),
- os.path.join(path, 'journal'),
- )
- write_one_line(path, 'ceph_fsid', cluster_uuid)
- write_one_line(path, 'fsid', osd_uuid)
- write_one_line(path, 'magic', CEPH_OSD_ONDISK_MAGIC)
- finally:
- unmount(path)
-
- try:
- subprocess.check_call(
- args=[
- 'sgdisk',
- '--typecode=1:4fbd7e29-9d25-41b8-afd0-062c0ceff05d',
- '--',
- disk,
- ],
- )
- except subprocess.CalledProcessError as e:
- raise PrepareError(e)
-
-
-def parse_args():
- parser = argparse.ArgumentParser(
- description='Prepare a disk for a Ceph OSD',
- )
- parser.add_argument(
- '-v', '--verbose',
- action='store_true', default=None,
- help='be more verbose',
- )
- parser.add_argument(
- '--cluster',
- metavar='NAME',
- help='cluster name to assign this disk to',
- )
- parser.add_argument(
- '--cluster-uuid',
- metavar='UUID',
- help='cluster uuid to assign this disk to',
- )
- parser.add_argument(
- '--fs-type',
- help='file system type to use (e.g. "ext4")',
- )
- parser.add_argument(
- 'disk',
- metavar='DISK',
- help='path to OSD data disk block device',
- )
- parser.add_argument(
- 'journal',
- metavar='JOURNAL',
- nargs='?',
- help=('path to OSD journal disk block device;'
- + ' leave out to store journal in file'),
- )
- parser.set_defaults(
- # we want to hold on to this, for later
- prog=parser.prog,
- cluster='ceph',
- )
- args = parser.parse_args()
- return args
-
-
-def main():
- args = parse_args()
-
- loglevel = logging.INFO
- if args.verbose:
- loglevel = logging.DEBUG
-
- logging.basicConfig(
- level=loglevel,
- )
-
- try:
- if args.cluster_uuid is None:
- args.cluster_uuid = get_fsid(cluster=args.cluster)
- if args.cluster_uuid is None:
- raise PrepareError(
- 'must have fsid in config or pass --cluster--uuid=',
- )
-
- if args.fs_type is None:
- args.fs_type = get_conf(
- cluster=args.cluster,
- variable='osd_fs_type',
- )
- if args.fs_type is None:
- args.fs_type = DEFAULT_FS_TYPE
-
- mkfs_args = get_conf(
- cluster=args.cluster,
- variable='osd_fs_mkfs_arguments_{fstype}'.format(
- fstype=args.fs_type,
- ),
- )
-
- mount_options = get_conf(
- cluster=args.cluster,
- variable='osd_fs_mount_options_{fstype}'.format(
- fstype=args.fs_type,
- ),
- )
-
- journal_size = get_conf_with_default(
- cluster=args.cluster,
- variable='osd_journal_size',
- )
- journal_size = int(journal_size)
-
- prepare(
- disk=args.disk,
- journal=args.journal,
- journal_size=journal_size,
- fstype=args.fs_type,
- mkfs_args=mkfs_args,
- mount_options=mount_options,
- cluster_uuid=args.cluster_uuid,
- )
- except PrepareError as e:
- print >>sys.stderr, '{prog}: {msg}'.format(
- prog=args.prog,
- msg=e,
- )
- sys.exit(1)
-
-if __name__ == '__main__':
- main()
+#!/bin/sh
+dir=`dirname $0`
+$dir/ceph-disk prepare $*
diff --git a/src/ceph_common.sh b/src/ceph_common.sh
index f20628bfa22..5576018fa4f 100644
--- a/src/ceph_common.sh
+++ b/src/ceph_common.sh
@@ -45,6 +45,13 @@ check_host() {
#echo host for $name is $host, i am $hostname
+ # sysvinit managed instance in standird location?
+ if [ -e "/var/lib/ceph/$type/ceph-$id/sysvinit" ]; then
+ host="$hostname"
+ echo "=== $type.$id === "
+ return 0
+ fi
+
# ignore all sections without 'host' defined
if [ -z "$host" ]; then
return 1
@@ -121,14 +128,49 @@ do_root_cmd() {
fi
}
+get_local_daemon_list() {
+ type=$1
+ if [ -d "/var/lib/ceph/$type" ]; then
+ for i in `find /var/lib/ceph/$type -mindepth 1 -maxdepth 1 -type d -printf '%f\n'`; do
+ if [ -e "/var/lib/ceph/$type/$i/sysvinit" ]; then
+ id=`echo $i | sed 's/.*-//'`
+ local="$local $type.$id"
+ fi
+ done
+ fi
+}
+
+get_local_name_list() {
+ orig=$1
+ local=""
+
+ if [ -z "$orig" ]; then
+ # enumerate local directories
+ get_local_daemon_list "mon"
+ get_local_daemon_list "osd"
+ get_local_daemon_list "mds"
+ return
+ fi
+
+ for f in $orig; do
+ type=`echo $f | cut -c 1-3` # e.g. 'mon', if $item is 'mon1'
+ id=`echo $f | cut -c 4- | sed 's/\\.//'`
+ get_local_daemon_list $type
+
+ # FIXME
+ done
+}
+
get_name_list() {
orig=$1
+ # extract list of monitors, mdss, osds defined in startup.conf
+ allconf=`$CCONF -c $conf -l mon | egrep -v '^mon$' ; \
+ $CCONF -c $conf -l mds | egrep -v '^mds$' ; \
+ $CCONF -c $conf -l osd | egrep -v '^osd$'`
+
if [ -z "$orig" ]; then
- # extract list of monitors, mdss, osds defined in startup.conf
- what=`$CCONF -c $conf -l mon | egrep -v '^mon$' ; \
- $CCONF -c $conf -l mds | egrep -v '^mds$' ; \
- $CCONF -c $conf -l osd | egrep -v '^osd$'`
+ what="$allconf $local"
return
fi
@@ -136,17 +178,16 @@ get_name_list() {
for f in $orig; do
type=`echo $f | cut -c 1-3` # e.g. 'mon', if $item is 'mon1'
id=`echo $f | cut -c 4- | sed 's/\\.//'`
- all=`$CCONF -c $conf -l $type | egrep -v "^$type$" || true`
case $f in
mon | osd | mds)
- what="$what $all"
+ what=`echo $allconf $local | grep ^$type || true`
;;
*)
- if echo " " $all " " | egrep -v -q "( $type$id | $type.$id )"; then
- echo "$0: $type.$id not found ($conf defines \"$all\")"
+ if echo " " "$allconf" "$local" " " | egrep -v -q "( $type$id | $type.$id )"; then
+ echo "$0: $type.$id not found ($conf defines \"$all\", /var/lib/ceph defines \"$local\")"
exit 1
fi
- what="$what $f"
+ what="$f"
;;
esac
done
diff --git a/src/init-ceph.in b/src/init-ceph.in
index ee03679d10d..6530db7742f 100644
--- a/src/init-ceph.in
+++ b/src/init-ceph.in
@@ -165,6 +165,7 @@ verify_conf
command=$1
[ -n "$*" ] && shift
+get_local_name_list "$@"
get_name_list "$@"
for name in $what; do
diff --git a/src/upstart/ceph-hotplug.conf b/src/upstart/ceph-hotplug.conf
deleted file mode 100644
index 702045293a2..00000000000
--- a/src/upstart/ceph-hotplug.conf
+++ /dev/null
@@ -1,11 +0,0 @@
-description "Ceph hotplug"
-
-start on block-device-added \
- DEVTYPE=partition \
- ID_PART_ENTRY_TYPE=4fbd7e29-9d25-41b8-afd0-062c0ceff05d
-stop on runlevel [!2345]
-
-task
-instance $DEVNAME
-
-exec /usr/sbin/ceph-disk-activate --mount -- "$DEVNAME"
diff --git a/udev/95-ceph-osd.rules b/udev/95-ceph-osd.rules
new file mode 100644
index 00000000000..77e6ef37c5d
--- /dev/null
+++ b/udev/95-ceph-osd.rules
@@ -0,0 +1,21 @@
+# activate ceph-tagged partitions
+ACTION=="add", SUBSYSTEM=="block", \
+ ENV{DEVTYPE}=="partition", \
+ ENV{ID_PART_ENTRY_TYPE}=="4fbd7e29-9d25-41b8-afd0-062c0ceff05d", \
+ RUN+="/usr/sbin/ceph-disk-activate --mount /dev/$name"
+
+# Map journal if using dm-crypt
+ACTION=="add" SUBSYSTEM=="block", \
+ ENV{DEVTYPE}=="partition", \
+ ENV{ID_PART_ENTRY_TYPE}=="45b0969e-9b03-4f30-b4c6-5ec00ceff106", \
+ RUN+="/sbin/cryptsetup --key-file /etc/ceph/dmcrypt-keys/$env{ID_PART_ENTRY_UUID} --key-size 256 create $env{ID_PART_ENTRY_UUID} /dev/$name"
+
+# Map data device and
+# activate ceph-tagged partitions
+# for dm-crypted data devices
+ACTION=="add" SUBSYSTEM=="block", \
+ ENV{DEVTYPE}=="partition", \
+ ENV{ID_PART_ENTRY_TYPE}=="4fbd7e29-9d25-41b8-afd0-5ec00ceff05d", \
+ RUN+="/sbin/cryptsetup --key-file /etc/ceph/dmcrypt-keys/$env{ID_PART_ENTRY_UUID} --key-size 256 create $env{ID_PART_ENTRY_UUID} /dev/$name", \
+ RUN+="/bin/bash -c 'while [ ! -e /dev/mapper/$env{ID_PART_ENTRY_UUID} ];do sleep 1; done'", \
+ RUN+="/usr/sbin/ceph-disk-activate --mount /dev/mapper/$env{ID_PART_ENTRY_UUID}"