summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-06-10 14:12:16 -0700
committerSage Weil <sage@inktank.com>2012-06-10 14:12:16 -0700
commit720aa4663f8933b7eefaa84b221df5d25093f92a (patch)
tree5efeaafdbe058851d44b6c49031fb78463eaa1c1
parentb25f27705b6dc63a1b8b565afc3aeb9fe0b7f1a4 (diff)
parent3d22546101a5e7e63aa534fd755cb7d0bfa515cf (diff)
downloadceph-720aa4663f8933b7eefaa84b221df5d25093f92a.tar.gz
Merge remote-tracking branch 'gh/wip-rbd-format'
-rw-r--r--doc/man/8/rados.rst21
-rwxr-xr-xqa/workunits/rbd/copy.sh2
-rwxr-xr-xqa/workunits/rbd/import_export.sh4
-rw-r--r--src/Makefile.am11
-rw-r--r--src/cls_rbd.cc736
-rw-r--r--src/cls_rgw.cc63
-rw-r--r--src/include/rbd/librbd.h4
-rw-r--r--src/include/rbd/librbd.hpp2
-rw-r--r--src/include/rbd_types.h21
-rw-r--r--src/librbd.cc646
-rw-r--r--src/librbd/cls_rbd_client.cc318
-rw-r--r--src/librbd/cls_rbd_client.h65
-rw-r--r--src/objclass/class_api.cc60
-rw-r--r--src/objclass/class_debug.cc7
-rw-r--r--src/objclass/objclass.h36
-rw-r--r--src/os/DBObjectMap.cc2
-rw-r--r--src/pybind/rbd.py20
-rw-r--r--src/rados.cc159
-rw-r--r--src/rbd.cc45
-rw-r--r--src/test/pybind/test_rbd.py8
-rw-r--r--src/test/rbd/test_cls_rbd.cc337
-rw-r--r--src/test/test_librbd.cc83
-rwxr-xr-xsrc/vstart.sh3
23 files changed, 2173 insertions, 480 deletions
diff --git a/doc/man/8/rados.rst b/doc/man/8/rados.rst
index b805f63236f..0122f350d31 100644
--- a/doc/man/8/rados.rst
+++ b/doc/man/8/rados.rst
@@ -98,6 +98,27 @@ Pool specific commands
object size is 4 KB, and the default number of simulated threads
(parallel writes) is 16.
+:command:`listomapkeys` *name*
+ List all the keys stored in the object map of object name.
+
+:command:`listomapvals` *name*
+ List all key/value pairs stored in the object map of object name.
+ The values are dumped in hexadecimal.
+
+:command:`getomapval` *name* *key*
+ Dump the hexadecimal value of key in the object map of object name.
+
+:command:`setomapval` *name* *key* *value*
+ Set the value of key in the object map of object name.
+
+:command:`rmomapkey` *name* *key*
+ Remove key from the object map of object name.
+
+:command:`getomapheader` *name*
+ Dump the hexadecimal value of the object map header of object name.
+
+:command:`setomapheader` *name* *value*
+ Set the value of the object map header of object name.
Examples
========
diff --git a/qa/workunits/rbd/copy.sh b/qa/workunits/rbd/copy.sh
index 5d922920ad9..933b735b786 100755
--- a/qa/workunits/rbd/copy.sh
+++ b/qa/workunits/rbd/copy.sh
@@ -17,7 +17,7 @@ dd if=/bin/ls of=/tmp/img1 bs=1k seek=10000
dd if=/bin/ln of=/tmp/img1 bs=1k seek=100000
# import, snapshot
-rbd import /tmp/img1 testimg1
+rbd import $RBD_CREATE_ARGS /tmp/img1 testimg1
rbd resize testimg1 --size=256
rbd export testimg1 /tmp/img2
rbd snap create testimg1 --snap=snap1
diff --git a/qa/workunits/rbd/import_export.sh b/qa/workunits/rbd/import_export.sh
index a2af194b43a..249ee448e3d 100755
--- a/qa/workunits/rbd/import_export.sh
+++ b/qa/workunits/rbd/import_export.sh
@@ -10,11 +10,11 @@ dd if=/bin/grep of=/tmp/img bs=1k seek=1000000
rbd rm testimg || true
-rbd import /tmp/img testimg
+rbd import $RBD_CREATE_ARGS /tmp/img testimg
rbd export testimg /tmp/img2
cmp /tmp/img /tmp/img2
rm /tmp/img /tmp/img2
-echo OK \ No newline at end of file
+echo OK
diff --git a/src/Makefile.am b/src/Makefile.am
index 784098ebd26..d913e63287e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -353,6 +353,7 @@ endif
# librbd
librbd_la_SOURCES = \
librbd.cc \
+ librbd/cls_rbd_client.cc \
librbd/LibrbdWriteback.cc \
osdc/ObjectCacher.cc
librbd_la_CFLAGS = ${AM_CFLAGS}
@@ -691,7 +692,7 @@ unittest_ipaddr_LDADD = ${UNITTEST_LDADD} $(LIBGLOBAL_LDA)
unittest_ipaddr_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
check_PROGRAMS += unittest_ipaddr
-test_librbd_SOURCES = test/test_librbd.cc
+test_librbd_SOURCES = test/test_librbd.cc test/rados-api/test.cc
test_librbd_LDADD = librbd.la librados.la ${UNITTEST_STATIC_LDADD}
test_librbd_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
bin_DEBUGPROGRAMS += test_librbd
@@ -701,6 +702,13 @@ test_librbd_fsx_LDADD = librbd.la librados.la
test_librbd_fsx_CFLAGS = ${AM_CFLAGS} -Wno-format
bin_DEBUGPROGRAMS += test_librbd_fsx
+test_cls_rbd_SOURCES = test/rbd/test_cls_rbd.cc \
+ test/rados-api/test.cc \
+ librbd/cls_rbd_client.cc
+test_cls_rbd_LDADD = librados.la ${UNITTEST_STATIC_LDADD}
+test_cls_rbd_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
+bin_DEBUGPROGRAMS += test_cls_rbd
+
test_rados_api_io_SOURCES = test/rados-api/io.cc test/rados-api/test.cc
test_rados_api_io_LDFLAGS = ${AM_LDFLAGS}
test_rados_api_io_LDADD = librados.la ${UNITTEST_STATIC_LDADD}
@@ -1365,6 +1373,7 @@ noinst_HEADERS = \
librados/IoCtxImpl.h\
librados/PoolAsyncCompletionImpl.h\
librados/RadosClient.h\
+ librbd/cls_rbd_client.h\
librbd/LibrbdWriteback.h\
logrotate.conf\
json_spirit/json_spirit.h\
diff --git a/src/cls_rbd.cc b/src/cls_rbd.cc
index c1f2d046b01..46c78fcb6be 100644
--- a/src/cls_rbd.cc
+++ b/src/cls_rbd.cc
@@ -1,26 +1,72 @@
-
-
-
-#include <iostream>
-#include <string.h>
-#include <stdlib.h>
-#include <errno.h>
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/** \file
+ *
+ * This is an OSD class that implements methods for
+ * use with rbd.
+ *
+ * Most of these deal with the rbd header object. Methods prefixed
+ * with old_ deal with the original rbd design, in which clients read
+ * and interpreted the header object directly.
+ *
+ * The new format is meant to be opaque to clients - all their
+ * interactions with non-data objects should go through this
+ * class. The OSD class interface leaves the class to implement its
+ * own argument and payload serialization/deserialization, so for ease
+ * of implementation we use the existing ceph encoding/decoding
+ * methods. Something like json might be preferable, but the rbd
+ * kernel module has to be able understand format as well. The
+ * datatypes exposed to the clients are strings, unsigned integers,
+ * and vectors of those types. The on-wire format can be found in
+ * src/include/encoding.h.
+ *
+ * The methods for interacting with the new format document their
+ * parameters as the client sees them - it would be silly to mention
+ * in each one that they take an input and an output bufferlist.
+ */
#include "include/types.h"
#include "objclass/objclass.h"
#include "include/rbd_types.h"
-CLS_VER(1,3)
+#include <algorithm>
+#include <cstring>
+#include <cstdlib>
+#include <errno.h>
+#include <iostream>
+#include <map>
+#include <sstream>
+#include <vector>
+
+CLS_VER(2,0)
CLS_NAME(rbd)
cls_handle_t h_class;
-cls_method_handle_t h_snapshots_list;
+cls_method_handle_t h_create;
+cls_method_handle_t h_get_features;
+cls_method_handle_t h_get_size;
+cls_method_handle_t h_set_size;
+cls_method_handle_t h_get_snapcontext;
+cls_method_handle_t h_get_object_prefix;
+cls_method_handle_t h_get_snapshot_name;
cls_method_handle_t h_snapshot_add;
cls_method_handle_t h_snapshot_remove;
-cls_method_handle_t h_snapshot_revert;
+cls_method_handle_t h_old_snapshots_list;
+cls_method_handle_t h_old_snapshot_add;
+cls_method_handle_t h_old_snapshot_remove;
cls_method_handle_t h_assign_bid;
-cls_method_handle_t h_test_exec;
+
+#define RBD_MAX_KEYS_READ 64
+#define RBD_SNAP_KEY_PREFIX "snapshot_"
+
+typedef struct cls_rbd_snap {
+ snapid_t id;
+ string name;
+ uint64_t image_size;
+ uint64_t features;
+} cls_rbd_snap;
static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
{
@@ -29,7 +75,7 @@ static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
int rc;
struct rbd_obj_header_ondisk *header;
- cls_log("snapshots_list");
+ CLS_LOG(20, "snapshots_list");
while (1) {
int len = sizeof(*header) +
@@ -55,7 +101,524 @@ static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
return 0;
}
-int snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static void key_from_snap_id(snapid_t snap_id, string *out)
+{
+ ostringstream oss;
+ oss << RBD_SNAP_KEY_PREFIX
+ << std::setw(16) << std::setfill('0') << std::hex << snap_id;
+ *out = oss.str();
+}
+
+static snapid_t snap_id_from_key(const string &key)
+{
+ istringstream iss(key);
+ uint64_t id;
+ iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
+ return id;
+}
+
+static int decode_snapshot_metadata(uint64_t snap_id, bufferlist *in,
+ cls_rbd_snap *snap)
+{
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(snap->name, iter);
+ ::decode(snap->image_size, iter);
+ ::decode(snap->features, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding snapshot metadata for snap_id: %llu", snap_id);
+ return -EIO;
+ }
+
+ snap->id = snap_id;
+
+ return 0;
+}
+
+static int read_snapshot_metadata(cls_method_context_t hctx, uint64_t snap_id,
+ cls_rbd_snap *snap_meta)
+{
+ bufferlist snapbl;
+ string snapshot_key;
+ key_from_snap_id(snap_id, &snapshot_key);
+ int r = cls_cxx_map_get_val(hctx, snapshot_key, &snapbl);
+ if (r < 0) {
+ CLS_ERR("error reading snapshot metadata: %d", r);
+ return r;
+ }
+
+ return decode_snapshot_metadata(snap_id, &snapbl, snap_meta);
+}
+
+template<typename T>
+static int read_key(cls_method_context_t hctx, const string &key, T *out)
+{
+ bufferlist bl;
+ int r = cls_cxx_map_get_val(hctx, key, &bl);
+ if (r < 0) {
+ CLS_ERR("error reading omap key %s: %d", key.c_str(), r);
+ return r;
+ }
+
+ try {
+ bufferlist::iterator it = bl.begin();
+ ::decode(*out, it);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding %s", key.c_str());
+ return -EIO;
+ }
+
+ return 0;
+}
+
+/**
+ * Initialize the header with basic metadata.
+ * Extra features may initialize more fields in the future.
+ * Everything is stored as key/value pairs as omaps in the header object.
+ *
+ * If features the OSD does not understand are requested, -ENOSYS is
+ * returned.
+ *
+ * Input:
+ * @param size number of bytes in the image (uint64_t)
+ * @param order bits to shift to determine the size of data objects (uint8_t)
+ * @param features what optional things this image will use (uint64_t)
+ * @param object_prefix a prefix for all the data objects
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ string object_prefix;
+ uint64_t features, size;
+ uint8_t order;
+
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(size, iter);
+ ::decode(order, iter);
+ ::decode(features, iter);
+ ::decode(object_prefix, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
+ object_prefix.c_str(), size, order, features);
+
+ if (features & ~RBD_FEATURES_ALL) {
+ return -ENOSYS;
+ }
+
+ if (!object_prefix.size()) {
+ return -EINVAL;
+ }
+
+ bufferlist stored_prefixbl;
+ int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
+ if (r != -ENOENT) {
+ CLS_ERR("reading object_prefix returned %d", r);
+ return -EEXIST;
+ }
+
+ bufferlist sizebl;
+ ::encode(size, sizebl);
+ r = cls_cxx_map_set_val(hctx, "size", &sizebl);
+ if (r < 0)
+ return r;
+
+ bufferlist orderbl;
+ ::encode(order, orderbl);
+ r = cls_cxx_map_set_val(hctx, "order", &orderbl);
+ if (r < 0)
+ return r;
+
+ bufferlist featuresbl;
+ ::encode(features, featuresbl);
+ r = cls_cxx_map_set_val(hctx, "features", &featuresbl);
+ if (r < 0)
+ return r;
+
+ bufferlist object_prefixbl;
+ ::encode(object_prefix, object_prefixbl);
+ r = cls_cxx_map_set_val(hctx, "object_prefix", &object_prefixbl);
+ if (r < 0)
+ return r;
+
+ bufferlist snap_seqbl;
+ uint64_t snap_seq = 0;
+ ::encode(snap_seq, snap_seqbl);
+ r = cls_cxx_map_set_val(hctx, "snap_seq", &snap_seqbl);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+/**
+ * Input:
+ * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
+ *
+ * Output:
+ * @param features list of enabled features for the given snapshot (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ uint64_t features, snap_id;
+
+ bufferlist::iterator iter = in->begin();
+ try {
+ ::decode(snap_id, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "get_features snap_id=%llu", snap_id);
+
+ if (snap_id == CEPH_NOSNAP) {
+ int r = read_key(hctx, "features", &features);
+ if (r < 0)
+ return r;
+ } else {
+ cls_rbd_snap snap;
+ int r = read_snapshot_metadata(hctx, snap_id, &snap);
+ if (r < 0)
+ return r;
+
+ features = snap.features;
+ }
+
+ uint64_t incompatible = features & RBD_FEATURES_INCOMPATIBLE;
+ ::encode(features, *out);
+ ::encode(incompatible, *out);
+
+ return 0;
+}
+
+/**
+ * Input:
+ * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
+ *
+ * Output:
+ * @param order bits to shift to get the size of data objects (uint8_t)
+ * @param size size of the image in bytes for the given snapshot (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ uint64_t snap_id, size;
+ uint8_t order;
+
+ bufferlist::iterator iter = in->begin();
+ try {
+ ::decode(snap_id, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "get_size snap_id=%llu", snap_id);
+
+ int r = read_key(hctx, "order", &order);
+ if (r < 0)
+ return r;
+
+ if (snap_id == CEPH_NOSNAP) {
+ r = read_key(hctx, "size", &size);
+ if (r < 0)
+ return r;
+ } else {
+ cls_rbd_snap snap;
+ int r = read_snapshot_metadata(hctx, snap_id, &snap);
+ if (r < 0)
+ return r;
+
+ size = snap.image_size;
+ }
+
+ ::encode(order, *out);
+ ::encode(size, *out);
+
+ return 0;
+}
+
+/**
+ * Input:
+ * @param size new capacity of the image in bytes (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ uint64_t size;
+
+ bufferlist::iterator iter = in->begin();
+ try {
+ ::decode(size, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "set_size size=%llu", size);
+
+ // check that size exists to make sure this is a header object
+ // that was created correctly
+ uint64_t orig_size;
+ int r = read_key(hctx, "size", &orig_size);
+ if (r < 0)
+ return r;
+
+ bufferlist sizebl;
+ ::encode(size, sizebl);
+
+ r = cls_cxx_map_set_val(hctx, "size", &sizebl);
+ if (r < 0) {
+ CLS_ERR("error writing snapshot metadata: %d", r);
+ return r;
+ }
+
+ return 0;
+}
+
+/**
+ * Get the information needed to create a rados snap context for doing
+ * I/O to the data objects. This must include all snapshots.
+ *
+ * Output:
+ * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
+ * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "get_snapcontext");
+
+ int r;
+ uint64_t max_read = RBD_MAX_KEYS_READ;
+ vector<snapid_t> snap_ids;
+ string last_read = RBD_SNAP_KEY_PREFIX;
+
+ do {
+ set<string> keys;
+ r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys);
+ if (r < 0)
+ return r;
+
+ for (set<string>::const_iterator it = keys.begin();
+ it != keys.end(); ++it) {
+ snapid_t snap_id = snap_id_from_key(*it);
+ snap_ids.push_back(snap_id);
+ }
+ if (keys.size() > 0)
+ last_read = *(keys.rbegin());
+ } while (r == RBD_MAX_KEYS_READ);
+
+ uint64_t snap_seq;
+ r = read_key(hctx, "snap_seq", &snap_seq);
+ if (r < 0)
+ return r;
+
+ // snap_ids must be descending in a snap context
+ std::reverse(snap_ids.begin(), snap_ids.end());
+
+ ::encode(snap_seq, *out);
+ ::encode(snap_ids, *out);
+
+ return 0;
+}
+
+/**
+ * Output:
+ * @param object_prefix prefix for data object names (string)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(20, "get_object_prefix");
+
+ string object_prefix;
+ int r = read_key(hctx, "object_prefix", &object_prefix);
+ if (r < 0)
+ return r;
+
+ ::encode(object_prefix, *out);
+
+ return 0;
+}
+
+int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ uint64_t snap_id;
+
+ bufferlist::iterator iter = in->begin();
+ try {
+ ::decode(snap_id, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "get_snapshot_name snap_id=%llu", snap_id);
+
+ if (snap_id == CEPH_NOSNAP)
+ return -EINVAL;
+
+ cls_rbd_snap snap;
+ int r = read_snapshot_metadata(hctx, snap_id, &snap);
+ if (r < 0)
+ return r;
+
+ ::encode(snap.name, *out);
+
+ return 0;
+}
+
+/**
+ * Adds a snapshot to an rbd header. Ensures the id and name are unique.
+ *
+ * Input:
+ * @param snap_name name of the snapshot (string)
+ * @param snap_id id of the snapshot (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure.
+ * @returns -ESTALE if the input snap_id is less than the image's snap_seq
+ * @returns -EEXIST if the id or name are already used by another snapshot
+ */
+int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist snap_namebl, snap_idbl;
+ snapid_t snap_id;
+ string snap_name;
+
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(snap_name, iter);
+ ::decode(snap_id, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_name.c_str(), snap_id.val);
+
+ if (snap_id > CEPH_MAXSNAP)
+ return -EINVAL;
+
+ uint64_t cur_snap_seq;
+ int r = read_key(hctx, "snap_seq", &cur_snap_seq);
+ if (r < 0)
+ return r;
+
+ // client lost a race with another snapshot creation.
+ // snap_seq must be monotonically increasing.
+ if (snap_id < cur_snap_seq)
+ return -ESTALE;
+
+ uint64_t size;
+ r = read_key(hctx, "size", &size);
+ if (r < 0)
+ return r;
+ uint64_t features;
+ r = read_key(hctx, "features", &features);
+ if (r < 0)
+ return r;
+
+ int max_read = RBD_MAX_KEYS_READ;
+ string last_read = RBD_SNAP_KEY_PREFIX;
+ do {
+ map<string, bufferlist> vals;
+ r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
+ max_read, &vals);
+ if (r < 0)
+ return r;
+
+ for (map<string, bufferlist>::iterator it = vals.begin();
+ it != vals.end(); ++it) {
+ snapid_t cur_snap_id = snap_id_from_key(it->first);
+ cls_rbd_snap snap_meta;
+ r = decode_snapshot_metadata(cur_snap_id, &it->second, &snap_meta);
+ if (r < 0)
+ return r;
+
+ if (snap_meta.name == snap_name || snap_meta.id == snap_id) {
+ CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
+ snap_name.c_str(), snap_id.val,
+ snap_meta.name.c_str(), snap_meta.id.val);
+ return -EEXIST;
+ }
+ }
+
+ if (vals.size() > 0)
+ last_read = vals.rbegin()->first;
+ } while (r == RBD_MAX_KEYS_READ);
+
+ bufferlist snap_metabl, snap_seqbl;
+ ::encode(snap_name, snap_metabl);
+ ::encode(size, snap_metabl);
+ ::encode(features, snap_metabl);
+
+ ::encode(snap_id, snap_seqbl);
+
+ string snapshot_key;
+ key_from_snap_id(snap_id, &snapshot_key);
+ map<string, bufferlist> vals;
+ vals["snap_seq"] = snap_seqbl;
+ vals[snapshot_key] = snap_metabl;
+ r = cls_cxx_map_set_vals(hctx, &vals);
+ if (r < 0) {
+ CLS_ERR("error writing snapshot metadata: %d", r);
+ return r;
+ }
+
+ return 0;
+}
+
+/**
+ * Removes a snapshot from an rbd header.
+ *
+ * Input:
+ * @param snap_id the id of the snapshot to remove (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ snapid_t snap_id;
+
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(snap_id, iter);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "snapshot_remove id=%llu", snap_id.val);
+
+ // check if the key exists. we can rely on remove_key doing this for
+ // us, since OMAPRMKEYS returns success if the key is not there.
+ // bug or feature? sounds like a bug, since tmap did not have this
+ // behavior, but cls_rgw may rely on it...
+ string snapshot_key;
+ bufferlist snapbl;
+ key_from_snap_id(snap_id, &snapshot_key);
+ int r = cls_cxx_map_get_val(hctx, snapshot_key, &snapbl);
+ if (r == -ENOENT)
+ return -ENOENT;
+
+ r = cls_cxx_map_remove_key(hctx, snapshot_key);
+ if (r < 0) {
+ CLS_ERR("error writing snapshot metadata: %d", r);
+ return r;
+ }
+
+ return 0;
+}
+
+/****************************** Old format *******************************/
+
+int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
bufferlist bl;
struct rbd_obj_header_ondisk *header;
@@ -89,7 +652,7 @@ int snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return 0;
}
-int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
bufferlist bl;
struct rbd_obj_header_ondisk *header;
@@ -121,7 +684,6 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
}
snap_name = s.c_str();
-
const char *cur_snap_name;
for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
@@ -166,81 +728,7 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return 0;
}
-int snapshot_revert(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- bufferlist bl;
- struct rbd_obj_header_ondisk *header;
- bufferlist newbl;
- bufferptr header_bp(sizeof(*header));
- struct rbd_obj_snap_ondisk *new_snaps;
-
- int rc = snap_read_header(hctx, bl);
- if (rc < 0)
- return rc;
-
- header = (struct rbd_obj_header_ondisk *)bl.c_str();
-
- int snaps_id_ofs = sizeof(*header);
- int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
- const char *snap_name;
- const char *snap_names = ((char *)header) + names_ofs;
- const char *end = snap_names + header->snap_names_len;
- bufferlist::iterator iter = in->begin();
- string s;
- int i;
- bool found = false;
- struct rbd_obj_snap_ondisk snap;
-
- try {
- ::decode(s, iter);
- } catch (const buffer::error &err) {
- return -EINVAL;
- }
- snap_name = s.c_str();
-
- for (i = 0; snap_names < end; i++) {
- if (strcmp(snap_names, snap_name) == 0) {
- snap = header->snaps[i];
- found = true;
- break;
- }
- snap_names += strlen(snap_names) + 1;
- }
- if (!found) {
- CLS_LOG("couldn't find snap %s\n",snap_name);
- return -ENOENT;
- }
-
- header->image_size = snap.image_size;
- header->snap_seq = header->snap_seq + 1;
-
- snap_names += strlen(snap_names) + 1;
- i++;
-
- header->snap_count = header->snap_count - i;
- bufferptr new_names_bp(end - snap_names);
- bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
-
- memcpy(header_bp.c_str(), header, sizeof(*header));
- newbl.push_back(header_bp);
-
- if (header->snap_count) {
- memcpy(new_snaps_bp.c_str(), header->snaps + i, sizeof(header->snaps[0]) * header->snap_count);
- memcpy(new_names_bp.c_str(), snap_names, end - snap_names);
- newbl.push_back(new_snaps_bp);
- newbl.push_back(new_names_bp);
- }
-
- rc = cls_cxx_write_full(hctx, &newbl);
- if (rc < 0)
- return rc;
-
- ::encode(snap.id, *out);
-
- return out->length();
-}
-
-int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
bufferlist bl;
struct rbd_obj_header_ondisk *header;
@@ -282,7 +770,7 @@ int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
snap_names += strlen(snap_names) + 1;
}
if (!found) {
- CLS_LOG("couldn't find snap %s\n",snap_name);
+ CLS_ERR("couldn't find snap %s\n", snap_name);
return -ENOENT;
}
@@ -298,7 +786,7 @@ int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
if (header->snap_count) {
int snaps_len = 0;
int names_len = 0;
- CLS_LOG("i=%d\n", i);
+ CLS_LOG(20, "i=%d\n", i);
if (i > 0) {
snaps_len = sizeof(header->snaps[0]) * i;
names_len = snap_names - orig_names;
@@ -338,7 +826,7 @@ int rbd_assign_bid(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return rc;
if (rc && rc < (int)sizeof(info)) {
- CLS_LOG("bad rbd_info object, read %d bytes, expected %d", rc, sizeof(info));
+ CLS_ERR("bad rbd_info object, read %d bytes, expected %d", rc, sizeof(info));
return -EIO;
}
@@ -358,7 +846,7 @@ int rbd_assign_bid(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
newbl.push_back(bp);
rc = cls_cxx_write_full(hctx, &newbl);
if (rc < 0) {
- CLS_LOG("error writing rbd_info, got rc=%d", rc);
+ CLS_ERR("error writing rbd_info, got rc=%d", rc);
return rc;
}
@@ -367,30 +855,54 @@ int rbd_assign_bid(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return out->length();
}
-/* Used for testing rados_exec */
-static int test_exec(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
- bufferlist bl;
- std::string testing123("testing123");
- ::encode(testing123, *out);
- return out->length();
-}
-
void __cls_init()
{
- CLS_LOG("Loaded rbd class!");
+ CLS_LOG(20, "Loaded rbd class!");
cls_register("rbd", &h_class);
- cls_register_cxx_method(h_class, "snap_list", CLS_METHOD_RD | CLS_METHOD_PUBLIC, snapshots_list, &h_snapshots_list);
- cls_register_cxx_method(h_class, "snap_add", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, snapshot_add, &h_snapshot_add);
- cls_register_cxx_method(h_class, "snap_remove", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, snapshot_remove, &h_snapshot_remove);
- cls_register_cxx_method(h_class, "snap_revert", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, snapshot_revert, &h_snapshot_revert);
+ cls_register_cxx_method(h_class, "create",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ create, &h_create);
+ cls_register_cxx_method(h_class, "get_features",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ get_features, &h_get_features);
+ cls_register_cxx_method(h_class, "get_size",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ get_size, &h_get_size);
+ cls_register_cxx_method(h_class, "set_size",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ set_size, &h_set_size);
+ cls_register_cxx_method(h_class, "get_snapcontext",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ get_snapcontext, &h_get_snapcontext);
+ cls_register_cxx_method(h_class, "get_object_prefix",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ get_object_prefix, &h_get_object_prefix);
+ cls_register_cxx_method(h_class, "get_snapshot_name",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ get_snapshot_name, &h_get_snapshot_name);
+ cls_register_cxx_method(h_class, "snapshot_add",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ snapshot_add, &h_snapshot_add);
+ cls_register_cxx_method(h_class, "snapshot_remove",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ snapshot_remove, &h_snapshot_remove);
+
+ /* methods for the old format */
+ cls_register_cxx_method(h_class, "snap_list",
+ CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+ old_snapshots_list, &h_old_snapshots_list);
+ cls_register_cxx_method(h_class, "snap_add",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ old_snapshot_add, &h_old_snapshot_add);
+ cls_register_cxx_method(h_class, "snap_remove",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ old_snapshot_remove, &h_old_snapshot_remove);
/* assign a unique block id for rbd blocks */
- cls_register_cxx_method(h_class, "assign_bid", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rbd_assign_bid, &h_assign_bid);
-
- cls_register_cxx_method(h_class, "test_exec", CLS_METHOD_RD | CLS_METHOD_PUBLIC, test_exec, &h_test_exec);
+ cls_register_cxx_method(h_class, "assign_bid",
+ CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+ rbd_assign_bid, &h_assign_bid);
return;
}
-
diff --git a/src/cls_rgw.cc b/src/cls_rgw.cc
index d50e95068fe..baf8604de59 100644
--- a/src/cls_rgw.cc
+++ b/src/cls_rgw.cc
@@ -1,3 +1,6 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include <iostream>
#include <string.h>
@@ -38,7 +41,7 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
try {
::decode(op, iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_list(): failed to decode request\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_list(): failed to decode request\n");
return -EINVAL;
}
@@ -52,14 +55,14 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
try {
::decode(new_dir.header, header_iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_list(): failed to decode header\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_list(): failed to decode header\n");
return -EINVAL;
}
bufferlist bl;
map<string, bufferlist> keys;
- rc = cls_cxx_map_read_keys(hctx, op.start_obj, op.filter_prefix, op.num_entries + 1, &keys);
+ rc = cls_cxx_map_get_vals(hctx, op.start_obj, op.filter_prefix, op.num_entries + 1, &keys);
if (rc < 0)
return rc;
@@ -74,7 +77,7 @@ int rgw_bucket_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
try {
::decode(entry, eiter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_list(): failed to decode entry, key=%s\n", kiter->first.c_str());
+ CLS_LOG(1, "ERROR: rgw_bucket_list(): failed to decode entry, key=%s\n", kiter->first.c_str());
return -EINVAL;
}
@@ -105,7 +108,7 @@ int rgw_bucket_init_index(cls_method_context_t hctx, bufferlist *in, bufferlist
}
if (header_bl.length() != 0) {
- CLS_LOG("ERROR: index already initialized\n");
+ CLS_LOG(1, "ERROR: index already initialized\n");
return -EINVAL;
}
@@ -123,20 +126,20 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
try {
::decode(op, iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_prepare_op(): failed to decode request\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to decode request\n");
return -EINVAL;
}
if (op.tag.empty()) {
- CLS_LOG("ERROR: tag is empty\n");
+ CLS_LOG(1, "ERROR: tag is empty\n");
return -EINVAL;
}
- CLS_LOG("rgw_bucket_prepare_op(): request: op=%d name=%s tag=%s\n", op.op, op.name.c_str(), op.tag.c_str());
+ CLS_LOG(1, "rgw_bucket_prepare_op(): request: op=%d name=%s tag=%s\n", op.op, op.name.c_str(), op.tag.c_str());
// get on-disk state
bufferlist cur_value;
- int rc = cls_cxx_map_read_key(hctx, op.name, &cur_value);
+ int rc = cls_cxx_map_get_val(hctx, op.name, &cur_value);
if (rc < 0 && rc != -ENOENT)
return rc;
@@ -151,7 +154,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
bufferlist::iterator biter = cur_value.begin();
::decode(entry, biter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_prepare_op(): failed to decode entry\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to decode entry\n");
/* ignoring error */
noent = true;
@@ -174,7 +177,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
// write out new key to disk
bufferlist info_bl;
::encode(entry, info_bl);
- cls_cxx_map_write_key(hctx, op.name, &info_bl);
+ cls_cxx_map_set_val(hctx, op.name, &info_bl);
return rc;
}
@@ -186,10 +189,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
try {
::decode(op, iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_complete_op(): failed to decode request\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode request\n");
return -EINVAL;
}
- CLS_LOG("rgw_bucket_complete_op(): request: op=%d name=%s epoch=%lld tag=%s\n", op.op, op.name.c_str(), op.epoch, op.tag.c_str());
+ CLS_LOG(1, "rgw_bucket_complete_op(): request: op=%d name=%s epoch=%lld tag=%s\n", op.op, op.name.c_str(), op.epoch, op.tag.c_str());
bufferlist header_bl;
struct rgw_bucket_dir_header header;
@@ -200,14 +203,14 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
try {
::decode(header, header_iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_complete_op(): failed to decode header\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode header\n");
return -EINVAL;
}
bufferlist current_entry;
struct rgw_bucket_dir_entry entry;
bool ondisk = true;
- rc = cls_cxx_map_read_key(hctx, op.name, &current_entry);
+ rc = cls_cxx_map_get_val(hctx, op.name, &current_entry);
if (rc < 0) {
if (rc != -ENOENT) {
return rc;
@@ -223,16 +226,16 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
bufferlist::iterator cur_iter = current_entry.begin();
try {
::decode(entry, cur_iter);
- CLS_LOG("rgw_bucket_complete_op(): existing entry: epoch=%lld name=%s locator=%s\n", entry.epoch, entry.name.c_str(), entry.locator.c_str());
+ CLS_LOG(1, "rgw_bucket_complete_op(): existing entry: epoch=%lld name=%s locator=%s\n", entry.epoch, entry.name.c_str(), entry.locator.c_str());
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_bucket_complete_op(): failed to decode entry\n");
+ CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode entry\n");
}
}
if (op.tag.size()) {
map<string, struct rgw_bucket_pending_info>::iterator pinter = entry.pending_map.find(op.tag);
if (pinter == entry.pending_map.end()) {
- CLS_LOG("ERROR: couldn't find tag for pending operation\n");
+ CLS_LOG(1, "ERROR: couldn't find tag for pending operation\n");
return -EINVAL;
}
entry.pending_map.erase(pinter);
@@ -242,10 +245,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
bufferlist update_bl;
if (op.tag.size() && op.op == CLS_RGW_OP_CANCEL) {
- CLS_LOG("rgw_bucket_complete_op(): cancel requested\n");
+ CLS_LOG(1, "rgw_bucket_complete_op(): cancel requested\n");
cancel = true;
} else if (op.epoch <= entry.epoch) {
- CLS_LOG("rgw_bucket_complete_op(): skipping request, old epoch\n");
+ CLS_LOG(1, "rgw_bucket_complete_op(): skipping request, old epoch\n");
cancel = true;
}
@@ -254,7 +257,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
if (op.tag.size()) {
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- return cls_cxx_map_write_key(hctx, op.name, &new_key_bl);
+ return cls_cxx_map_set_val(hctx, op.name, &new_key_bl);
} else {
return 0;
}
@@ -278,7 +281,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
entry.exists = false;
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- int ret = cls_cxx_map_write_key(hctx, op.name, &new_key_bl);
+ int ret = cls_cxx_map_set_val(hctx, op.name, &new_key_bl);
if (ret < 0)
return ret;
}
@@ -299,7 +302,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
stats.total_size_rounded += get_rounded_size(meta.size);
bufferlist new_key_bl;
::encode(entry, new_key_bl);
- int ret = cls_cxx_map_write_key(hctx, op.name, &new_key_bl);
+ int ret = cls_cxx_map_set_val(hctx, op.name, &new_key_bl);
if (ret < 0)
return ret;
}
@@ -313,7 +316,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
- CLS_LOG("rgw_dir_suggest_changes()");
+ CLS_LOG(1, "rgw_dir_suggest_changes()");
bufferlist header_bl;
struct rgw_bucket_dir_header header;
@@ -326,7 +329,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
bufferlist::iterator header_iter = header_bl.begin();
::decode(header, header_iter);
} catch (buffer::error& error) {
- CLS_LOG("ERROR: rgw_dir_suggest_changes(): failed to decode header\n");
+ CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode header\n");
return -EINVAL;
}
@@ -341,12 +344,12 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
::decode(op, in_iter);
::decode(cur_change, in_iter);
} catch (buffer::error& err) {
- CLS_LOG("ERROR: rgw_dir_suggest_changes(): failed to decode request\n");
+ CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode request\n");
return -EINVAL;
}
bufferlist cur_disk_bl;
- int ret = cls_cxx_map_read_key(hctx, cur_change.name, &cur_disk_bl);
+ int ret = cls_cxx_map_get_val(hctx, cur_change.name, &cur_disk_bl);
if (ret < 0 && ret != -ENOENT)
return -EINVAL;
@@ -355,7 +358,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
try {
::decode(cur_disk, cur_disk_iter);
} catch (buffer::error& error) {
- CLS_LOG("ERROR: rgw_dir_suggest_changes(): failed to decode cur_disk\n");
+ CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode cur_disk\n");
return -EINVAL;
}
@@ -391,7 +394,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
stats.total_size_rounded += get_rounded_size(cur_change.meta.size);
bufferlist cur_state_bl;
::encode(cur_change, cur_state_bl);
- ret = cls_cxx_map_write_key(hctx, cur_change.name, &cur_state_bl);
+ ret = cls_cxx_map_set_val(hctx, cur_change.name, &cur_state_bl);
if (ret < 0)
return ret;
break;
@@ -410,7 +413,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
void __cls_init()
{
- CLS_LOG("Loaded rgw class!");
+ CLS_LOG(1, "Loaded rgw class!");
cls_register("rgw", &h_class);
cls_register_cxx_method(h_class, "bucket_init_index", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_init_index, &h_rgw_bucket_init_index);
diff --git a/src/include/rbd/librbd.h b/src/include/rbd/librbd.h
index c2ed7bfb0b2..8aafcad9a75 100644
--- a/src/include/rbd/librbd.h
+++ b/src/include/rbd/librbd.h
@@ -30,7 +30,7 @@ extern "C" {
#define LIBRBD_VER_MAJOR 0
#define LIBRBD_VER_MINOR 1
-#define LIBRBD_VER_EXTRA 2
+#define LIBRBD_VER_EXTRA 3
#define LIBRBD_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
@@ -67,6 +67,8 @@ void rbd_version(int *major, int *minor, int *extra);
/* images */
int rbd_list(rados_ioctx_t io, char *names, size_t *size);
int rbd_create(rados_ioctx_t io, const char *name, uint64_t size, int *order);
+int rbd_create2(rados_ioctx_t io, const char *name, uint64_t size,
+ uint64_t features, int *order);
int rbd_remove(rados_ioctx_t io, const char *name);
int rbd_remove_with_progress(rados_ioctx_t io, const char *name,
librbd_progress_fn_t cb, void *cbdata);
diff --git a/src/include/rbd/librbd.hpp b/src/include/rbd/librbd.hpp
index 915833a9088..6ada8d41393 100644
--- a/src/include/rbd/librbd.hpp
+++ b/src/include/rbd/librbd.hpp
@@ -68,6 +68,8 @@ public:
int open(IoCtx& io_ctx, Image& image, const char *name, const char *snapname);
int list(IoCtx& io_ctx, std::vector<std::string>& names);
int create(IoCtx& io_ctx, const char *name, uint64_t size, int *order);
+ int create2(IoCtx& io_ctx, const char *name, uint64_t size,
+ uint64_t features, int *order);
int remove(IoCtx& io_ctx, const char *name);
int remove_with_progress(IoCtx& io_ctx, const char *name, ProgressContext& pctx);
int rename(IoCtx& src_io_ctx, const char *srcname, const char *destname);
diff --git a/src/include/rbd_types.h b/src/include/rbd_types.h
index f165c40e361..a91ca84a307 100644
--- a/src/include/rbd_types.h
+++ b/src/include/rbd_types.h
@@ -19,11 +19,26 @@
#include <sys/types.h>
#endif
+
+
+/* New-style rbd image 'foo' consists of objects
+ * rbd_header.foo - image metadata
+ * rbd_data.<id>.00000000
+ * rbd_data.<id>.00000001
+ * ... - data
+ */
+
+#define RBD_HEADER_PREFIX "rbd_header."
+#define RBD_DATA_PREFIX "rbd_data."
+
+#define RBD_FEATURES_INCOMPATIBLE 0
+#define RBD_FEATURES_ALL 0
+
/*
- * rbd image 'foo' consists of objects
+ * old-style rbd image 'foo' consists of objects
* foo.rbd - image metadata
- * foo.00000000
- * foo.00000001
+ * rb.<idhi>.<idlo>.00000000
+ * rb.<idhi>.<idlo>.00000001
* ... - data
*/
diff --git a/src/librbd.cc b/src/librbd.cc
index a064e013e62..73fd6cbd639 100644
--- a/src/librbd.cc
+++ b/src/librbd.cc
@@ -24,8 +24,14 @@
#include "include/rbd/librbd.hpp"
#include "osdc/ObjectCacher.h"
+#include "librbd/cls_rbd_client.h"
#include "librbd/LibrbdWriteback.h"
+#include <algorithm>
+#include <map>
+#include <string>
+#include <vector>
+
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd: "
@@ -86,9 +92,39 @@ namespace librbd {
struct SnapInfo {
snap_t id;
uint64_t size;
- SnapInfo(snap_t _id, uint64_t _size) : id(_id), size(_size) {};
+ uint64_t features;
+ SnapInfo(snap_t _id, uint64_t _size, uint64_t _features) :
+ id(_id), size(_size), features(_features) {};
};
+ const string md_oid(const string &name, bool old_format)
+ {
+ if (old_format)
+ return name + RBD_SUFFIX;
+ else
+ return RBD_HEADER_PREFIX + name;
+ }
+
+ int detect_format(IoCtx &io_ctx, const string &name, bool *old_format,
+ uint64_t *size)
+ {
+ CephContext *cct = (CephContext *)io_ctx.cct();
+ if (old_format)
+ *old_format = true;
+ int r = io_ctx.stat(md_oid(name, true), size, NULL);
+ if (r < 0) {
+ if (old_format)
+ *old_format = false;
+ r = io_ctx.stat(md_oid(name, false), size, NULL);
+ if (r < 0)
+ return r;
+ }
+
+ ldout(cct, 20) << "detect format of " << name << " : "
+ << (*old_format ? "old" : "new") << dendl;
+ return 0;
+ }
+
struct AioCompletion;
struct AioBlockCompletion : Context {
@@ -114,7 +150,8 @@ namespace librbd {
PerfCounters *perfcounter;
struct rbd_obj_header_ondisk header;
::SnapContext snapc;
- vector<snap_t> snaps;
+ vector<snap_t> snaps; // this mirrors snapc.snaps, but is in a
+ // format librados can understand
std::map<std::string, struct SnapInfo> snaps_by_name;
uint64_t snapid;
bool snap_exists; // false if our snapid was deleted
@@ -127,6 +164,13 @@ namespace librbd {
Mutex lock; // protects access to snapshot and header information
Mutex cache_lock; // used as client_lock for the ObjectCacher
+ bool old_format;
+ uint8_t order;
+ uint64_t size;
+ uint64_t features;
+ string object_prefix;
+ string header_oid;
+
ObjectCacher *object_cacher;
LibrbdWriteback *writeback_handler;
ObjectCacher::ObjectSet *object_set;
@@ -141,6 +185,8 @@ namespace librbd {
refresh_lock("librbd::ImageCtx::refresh_lock"),
lock("librbd::ImageCtx::lock"),
cache_lock("librbd::ImageCtx::cache_lock"),
+ old_format(true),
+ order(0), size(0), features(0),
object_cacher(NULL), writeback_handler(NULL), object_set(NULL)
{
md_ctx.dup(p);
@@ -185,6 +231,26 @@ namespace librbd {
}
}
+ int init() {
+ int r = detect_format(md_ctx, name, &old_format, NULL);
+ if (r < 0) {
+ lderr(cct) << "error finding header: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ if (!old_format) {
+ r = cls_client::get_immutable_metadata(&md_ctx, md_oid(name, false),
+ &object_prefix, &order);
+ if (r < 0) {
+ lderr(cct) << "error reading immutable metadata: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+ header_oid = md_oid(name, old_format);
+ return 0;
+ }
+
void perf_start(string name) {
PerfCountersBuilder plb(cct, name, l_librbd_first, l_librbd_last);
@@ -258,23 +324,17 @@ namespace librbd {
return -ENOENT;
}
- void add_snap(std::string snap_name, snap_t id, uint64_t size)
+ void add_snap(std::string snap_name, snap_t id, uint64_t size, uint64_t features)
{
- snapc.snaps.push_back(id);
snaps.push_back(id);
- struct SnapInfo info(id, size);
+ struct SnapInfo info(id, size, features);
snaps_by_name.insert(std::pair<std::string, struct SnapInfo>(snap_name, info));
}
- const string md_oid() const
- {
- return name + RBD_SUFFIX;
- }
-
uint64_t get_image_size() const
{
if (snapname.length() == 0) {
- return header.image_size;
+ return size;
} else {
map<std::string,SnapInfo>::const_iterator p = snaps_by_name.find(snapname);
if (p == snaps_by_name.end())
@@ -503,7 +563,8 @@ namespace librbd {
int snap_set(ImageCtx *ictx, const char *snap_name);
int list(IoCtx& io_ctx, std::vector<string>& names);
- int create(IoCtx& io_ctx, const char *imgname, uint64_t size, int *order);
+ int create(IoCtx& io_ctx, const char *imgname, uint64_t size, int *order,
+ bool old_format);
int rename(IoCtx& io_ctx, const char *srcname, const char *dstname);
int info(ImageCtx *ictx, image_info_t& info, size_t image_size);
int remove(IoCtx& io_ctx, const char *imgname, ProgressContext& prog_ctx);
@@ -522,8 +583,7 @@ namespace librbd {
int open_image(ImageCtx *ictx);
void close_image(ImageCtx *ictx);
- void trim_image(IoCtx& io_ctx, const rbd_obj_header_ondisk &header, uint64_t newsize,
- ProgressContext& prog_ctx);
+ void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx);
int read_rbd_info(IoCtx& io_ctx, const string& info_oid, struct rbd_info *info);
int touch_rbd_info(IoCtx& io_ctx, const string& info_oid);
@@ -536,12 +596,11 @@ namespace librbd {
int tmap_rm(IoCtx& io_ctx, const string& imgname);
int rollback_image(ImageCtx *ictx, uint64_t snapid, ProgressContext& prog_ctx);
void image_info(const ImageCtx& ictx, image_info_t& info, size_t info_size);
- string get_block_oid(const rbd_obj_header_ondisk &header, uint64_t num);
- uint64_t get_max_block(uint64_t size, int obj_order);
- uint64_t get_max_block(const rbd_obj_header_ondisk &header);
- uint64_t get_block_size(const rbd_obj_header_ondisk &header);
- uint64_t get_block_num(const rbd_obj_header_ondisk &header, uint64_t ofs);
- uint64_t get_block_ofs(const rbd_obj_header_ondisk &header, uint64_t ofs);
+ string get_block_oid(const string &object_prefix, uint64_t num, bool old_format);
+ uint64_t get_max_block(uint64_t size, uint8_t obj_order);
+ uint64_t get_block_size(uint8_t order);
+ uint64_t get_block_num(uint8_t order, uint64_t ofs);
+ uint64_t get_block_ofs(uint8_t order, uint64_t ofs);
int check_io(ImageCtx *ictx, uint64_t off, uint64_t len);
int init_rbd_info(struct rbd_info *info);
void init_rbd_header(struct rbd_obj_header_ondisk& ondisk,
@@ -610,9 +669,6 @@ void init_rbd_header(struct rbd_obj_header_ondisk& ondisk,
snprintf(ondisk.block_name, sizeof(ondisk.block_name), "rb.%x.%x", hi, lo);
- if (!*order)
- *order = RBD_DEFAULT_OBJ_ORDER;
-
ondisk.image_size = size;
ondisk.options.order = *order;
ondisk.options.crypt_type = RBD_CRYPT_NONE;
@@ -625,52 +681,47 @@ void init_rbd_header(struct rbd_obj_header_ondisk& ondisk,
void image_info(const ImageCtx& ictx, image_info_t& info, size_t infosize)
{
- int obj_order = ictx.header.options.order;
+ int obj_order = ictx.order;
info.size = ictx.get_image_size();
info.obj_size = 1 << obj_order;
info.num_objs = ictx.get_image_size() >> obj_order;
info.order = obj_order;
- memcpy(&info.block_name_prefix, &ictx.header.block_name, RBD_MAX_BLOCK_NAME_SIZE);
+ memcpy(&info.block_name_prefix, ictx.object_prefix.c_str(),
+ RBD_MAX_BLOCK_NAME_SIZE);
info.parent_pool = -1;
bzero(&info.parent_name, RBD_MAX_IMAGE_NAME_SIZE);
}
-string get_block_oid(const rbd_obj_header_ondisk &header, uint64_t num)
+string get_block_oid(const string &object_prefix, uint64_t num, bool old_format)
{
- char o[RBD_MAX_BLOCK_NAME_SIZE];
- snprintf(o, RBD_MAX_BLOCK_NAME_SIZE,
- "%s.%012" PRIx64, header.block_name, num);
- return o;
+ ostringstream oss;
+ int width = old_format ? 12 : 16;
+ oss << object_prefix << "."
+ << std::hex << std::setw(width) << std::setfill('0') << num;
+ return oss.str();
}
-uint64_t get_max_block(uint64_t size, int obj_order)
+uint64_t get_max_block(uint64_t size, uint8_t obj_order)
{
uint64_t block_size = 1 << obj_order;
uint64_t numseg = (size + block_size - 1) >> obj_order;
return numseg;
}
-uint64_t get_max_block(const rbd_obj_header_ondisk &header)
-{
- return get_max_block(header.image_size, header.options.order);
-}
-
-uint64_t get_block_ofs(const rbd_obj_header_ondisk &header, uint64_t ofs)
+uint64_t get_block_ofs(uint8_t order, uint64_t ofs)
{
- int obj_order = header.options.order;
- uint64_t block_size = 1 << obj_order;
+ uint64_t block_size = 1 << order;
return ofs & (block_size - 1);
}
-uint64_t get_block_size(const rbd_obj_header_ondisk &header)
+uint64_t get_block_size(uint8_t order)
{
- return 1 << header.options.order;
+ return 1 << order;
}
-uint64_t get_block_num(const rbd_obj_header_ondisk &header, uint64_t ofs)
+uint64_t get_block_num(uint8_t order, uint64_t ofs)
{
- int obj_order = header.options.order;
- uint64_t num = ofs >> obj_order;
+ uint64_t num = ofs >> order;
return num;
}
@@ -681,28 +732,29 @@ int init_rbd_info(struct rbd_info *info)
return 0;
}
-void trim_image(IoCtx& io_ctx, const rbd_obj_header_ondisk &header, uint64_t newsize,
- ProgressContext& prog_ctx)
+void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx)
{
- CephContext *cct = (CephContext *)io_ctx.cct();
- uint64_t bsize = get_block_size(header);
- uint64_t numseg = get_max_block(header);
- uint64_t start = get_block_num(header, newsize);
+ CephContext *cct = (CephContext *)ictx->data_ctx.cct();
+ uint64_t bsize = get_block_size(ictx->order);
+ uint64_t numseg = get_max_block(ictx->size, ictx->order);
+ uint64_t start = get_block_num(ictx->order, newsize);
- uint64_t block_ofs = get_block_ofs(header, newsize);
+ uint64_t block_ofs = get_block_ofs(ictx->order, newsize);
if (block_ofs) {
- ldout(cct, 2) << "trim_image object " << numseg << " truncate to " << block_ofs << dendl;
- string oid = get_block_oid(header, start);
+ ldout(cct, 2) << "trim_image object " << numseg << " truncate to "
+ << block_ofs << dendl;
+ string oid = get_block_oid(ictx->object_prefix, start, ictx->old_format);
librados::ObjectWriteOperation write_op;
write_op.truncate(block_ofs);
- io_ctx.operate(oid, &write_op);
+ ictx->data_ctx.operate(oid, &write_op);
start++;
}
if (start < numseg) {
- ldout(cct, 2) << "trim_image objects " << start << " to " << (numseg-1) << dendl;
- for (uint64_t i=start; i<numseg; i++) {
- string oid = get_block_oid(header, i);
- io_ctx.remove(oid);
+ ldout(cct, 2) << "trim_image objects " << start << " to "
+ << (numseg - 1) << dendl;
+ for (uint64_t i = start; i < numseg; ++i) {
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ ictx->data_ctx.remove(oid);
prog_ctx.update_progress(i * bsize, (numseg - start) * bsize);
}
}
@@ -737,32 +789,26 @@ int touch_rbd_info(IoCtx& io_ctx, const string& info_oid)
int rbd_assign_bid(IoCtx& io_ctx, const string& info_oid, uint64_t *id)
{
- bufferlist bl, out;
- *id = 0;
-
int r = touch_rbd_info(io_ctx, info_oid);
if (r < 0)
return r;
- r = io_ctx.exec(info_oid, "rbd", "assign_bid", bl, out);
+ r = cls_client::assign_bid(&io_ctx, info_oid, id);
if (r < 0)
return r;
- bufferlist::iterator iter = out.begin();
- ::decode(*id, iter);
-
return 0;
}
-
-int read_header_bl(IoCtx& io_ctx, const string& md_oid, bufferlist& header, uint64_t *ver)
+int read_header_bl(IoCtx& io_ctx, const string& header_oid,
+ bufferlist& header, uint64_t *ver)
{
int r;
uint64_t off = 0;
#define READ_SIZE 4096
do {
bufferlist bl;
- r = io_ctx.read(md_oid, bl, READ_SIZE, off);
+ r = io_ctx.read(header_oid, bl, READ_SIZE, off);
if (r < 0)
return r;
header.claim_append(bl);
@@ -801,10 +847,11 @@ int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver, ImageCtx *ic
return 0;
}
-int read_header(IoCtx& io_ctx, const string& md_oid, struct rbd_obj_header_ondisk *header, uint64_t *ver)
+int read_header(IoCtx& io_ctx, const string& header_oid,
+ struct rbd_obj_header_ondisk *header, uint64_t *ver)
{
bufferlist header_bl;
- int r = read_header_bl(io_ctx, md_oid, header_bl, ver);
+ int r = read_header_bl(io_ctx, header_oid, header_bl, ver);
if (r < 0)
return r;
if (header_bl.length() < (int)sizeof(*header))
@@ -814,12 +861,12 @@ int read_header(IoCtx& io_ctx, const string& md_oid, struct rbd_obj_header_ondis
return 0;
}
-int write_header(IoCtx& io_ctx, const string& md_oid, bufferlist& header)
+int write_header(IoCtx& io_ctx, const string& header_oid, bufferlist& header)
{
bufferlist bl;
- int r = io_ctx.write(md_oid, header, header.length(), 0);
+ int r = io_ctx.write(header_oid, header, header.length(), 0);
- notify_change(io_ctx, md_oid, NULL, NULL);
+ notify_change(io_ctx, header_oid, NULL, NULL);
return r;
}
@@ -846,12 +893,12 @@ int tmap_rm(IoCtx& io_ctx, const string& imgname)
int rollback_image(ImageCtx *ictx, uint64_t snapid, ProgressContext& prog_ctx)
{
assert(ictx->lock.is_locked());
- uint64_t numseg = get_max_block(ictx->header);
- uint64_t bsize = get_block_size(ictx->header);
+ uint64_t numseg = get_max_block(ictx->size, ictx->order);
+ uint64_t bsize = get_block_size(ictx->order);
for (uint64_t i = 0; i < numseg; i++) {
int r;
- string oid = get_block_oid(ictx->header, i);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
r = ictx->data_ctx.selfmanaged_snap_rollback(oid, snapid);
ldout(ictx->cct, 10) << "selfmanaged_snap_rollback on " << oid << " to " << snapid << " returned " << r << dendl;
prog_ctx.update_progress(i * bsize, numseg * bsize);
@@ -895,7 +942,7 @@ int snap_create(ImageCtx *ictx, const char *snap_name)
if (r < 0)
return r;
- notify_change(ictx->md_ctx, ictx->md_oid(), NULL, ictx);
+ notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
ictx->perfcounter->inc(l_librbd_snap_create);
return 0;
@@ -923,27 +970,42 @@ int snap_remove(ImageCtx *ictx, const char *snap_name)
if (r < 0)
return r;
- notify_change(ictx->md_ctx, ictx->md_oid(), NULL, ictx);
+ notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
ictx->perfcounter->inc(l_librbd_snap_remove);
return 0;
}
-int create(IoCtx& io_ctx, const char *imgname, uint64_t size, int *order)
+int create(IoCtx& io_ctx, const char *imgname, uint64_t size,
+ bool old_format, uint64_t features, int *order)
{
CephContext *cct = (CephContext *)io_ctx.cct();
- ldout(cct, 20) << "create " << &io_ctx << " name = " << imgname << " size = " << size << dendl;
+ ldout(cct, 20) << "create " << &io_ctx << " name = " << imgname
+ << " size = " << size << " old_format = " << old_format
+ << " features = " << features << " order = " << *order
+ << dendl;
- string md_oid = imgname;
- md_oid += RBD_SUFFIX;
- // make sure it doesn't already exist
- int r = io_ctx.stat(md_oid, NULL, NULL);
- if (r == 0) {
- lderr(cct) << "rbd image header " << md_oid << " already exists" << dendl;
+ if (features & ~RBD_FEATURES_ALL) {
+ lderr(cct) << "librbd does not support requested features." << dendl;
+ return -ENOSYS;
+ }
+
+ // make sure it doesn't already exist, in either format
+ int r = detect_format(io_ctx, imgname, NULL, NULL);
+ if (r != -ENOENT) {
+ lderr(cct) << "rbd image " << imgname << " already exists" << dendl;
return -EEXIST;
}
+ if (!order)
+ return -EINVAL;
+
+ if (*order && (*order > 255 || *order < 12)) {
+ lderr(cct) << "order must be in the range [12, 255]" << dendl;
+ return -EDOM;
+ }
+
uint64_t bid;
string dir_info = RBD_INFO;
r = rbd_assign_bid(io_ctx, dir_info, &bid);
@@ -952,23 +1014,33 @@ int create(IoCtx& io_ctx, const char *imgname, uint64_t size, int *order)
return r;
}
- struct rbd_obj_header_ondisk header;
- init_rbd_header(header, size, order, bid);
-
- bufferlist bl;
- bl.append((const char *)&header, sizeof(header));
-
ldout(cct, 2) << "adding rbd image to directory..." << dendl;
r = tmap_set(io_ctx, imgname);
if (r < 0) {
- lderr(cct) << "error adding img to directory: " << cpp_strerror(-r)<< dendl;
+ lderr(cct) << "error adding img to directory: " << cpp_strerror(r)<< dendl;
return r;
}
+ if (!*order)
+ *order = RBD_DEFAULT_OBJ_ORDER;
+
ldout(cct, 2) << "creating rbd image..." << dendl;
- r = io_ctx.write(md_oid, bl, bl.length(), 0);
+ string header_oid = md_oid(imgname, old_format);
+ if (old_format) {
+ struct rbd_obj_header_ondisk header;
+ init_rbd_header(header, size, order, bid);
+
+ bufferlist bl;
+ bl.append((const char *)&header, sizeof(header));
+ r = io_ctx.write(header_oid, bl, bl.length(), 0);
+ } else {
+ ostringstream oss;
+ oss << RBD_DATA_PREFIX << std::hex << bid;
+ r = cls_client::create_image(&io_ctx, header_oid, size, *order, features,
+ oss.str());
+ }
if (r < 0) {
- lderr(cct) << "error writing header: " << cpp_strerror(-r) << dendl;
+ lderr(cct) << "error writing header: " << cpp_strerror(r) << dendl;
return r;
}
@@ -979,45 +1051,77 @@ int create(IoCtx& io_ctx, const char *imgname, uint64_t size, int *order)
int rename(IoCtx& io_ctx, const char *srcname, const char *dstname)
{
CephContext *cct = (CephContext *)io_ctx.cct();
- ldout(cct, 20) << "rename " << &io_ctx << " " << srcname << " -> " << dstname << dendl;
-
- string md_oid = srcname;
- md_oid += RBD_SUFFIX;
- string dst_md_oid = dstname;
- dst_md_oid += RBD_SUFFIX;
- string dstname_str = dstname;
- string imgname_str = srcname;
- uint64_t ver;
- bufferlist header;
- int r = read_header_bl(io_ctx, md_oid, header, &ver);
+ ldout(cct, 20) << "rename " << &io_ctx << " " << srcname << " -> "
+ << dstname << dendl;
+
+ bool old_format;
+ uint64_t header_size;
+ int r = detect_format(io_ctx, srcname, &old_format, &header_size);
+ if (r < 0) {
+ lderr(cct) << "error finding header: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ string header_oid = md_oid(srcname, old_format);
+ string dst_header_oid = md_oid(dstname, old_format);
+ bufferlist headerbl;
+ map<string, bufferlist> omap_values;
+ r = io_ctx.read(header_oid, headerbl, header_size, 0);
if (r < 0) {
- lderr(cct) << "error reading header: " << md_oid << ": " << cpp_strerror(-r) << dendl;
+ lderr(cct) << "error reading header: " << header_oid << ": "
+ << cpp_strerror(-r) << dendl;
return r;
}
- r = io_ctx.stat(dst_md_oid, NULL, NULL);
+
+ int MAX_READ = 1024;
+ string last_read = "";
+ do {
+ map<string, bufferlist> outbl;
+ r = io_ctx.omap_get_vals(header_oid, last_read, MAX_READ, &outbl);
+ if (r < 0) {
+ lderr(cct) << "error reading header omap values: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ omap_values.insert(outbl.begin(), outbl.end());
+ if (outbl.size() > 0)
+ last_read = outbl.rbegin()->first;
+ } while (r == MAX_READ);
+
+ r = io_ctx.stat(dst_header_oid, NULL, NULL);
if (r == 0) {
- lderr(cct) << "rbd image header " << dst_md_oid << " already exists" << dendl;
+ lderr(cct) << "rbd image header " << dst_header_oid
+ << " already exists" << dendl;
return -EEXIST;
}
- r = write_header(io_ctx, dst_md_oid, header);
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ op.write_full(headerbl);
+ op.omap_set(omap_values);
+ r = io_ctx.operate(dst_header_oid, &op);
if (r < 0) {
- lderr(cct) << "error writing header: " << dst_md_oid << ": " << cpp_strerror(-r) << dendl;
+ lderr(cct) << "error writing header: " << dst_header_oid << ": "
+ << cpp_strerror(r) << dendl;
return r;
}
- r = tmap_set(io_ctx, dstname_str);
+
+ r = tmap_set(io_ctx, dstname);
if (r < 0) {
- io_ctx.remove(dst_md_oid);
- lderr(cct) << "can't add " << dst_md_oid << " to directory" << dendl;
+ io_ctx.remove(dst_header_oid);
+ lderr(cct) << "couldn't add " << dstname << " to directory: "
+ << cpp_strerror(r) << dendl;
return r;
}
- r = tmap_rm(io_ctx, imgname_str);
+ r = tmap_rm(io_ctx, srcname);
if (r < 0)
- lderr(cct) << "warning: couldn't remove old entry from directory (" << imgname_str << ")" << dendl;
+ lderr(cct) << "warning: couldn't remove old entry from directory ("
+ << srcname << ")" << dendl;
- r = io_ctx.remove(md_oid);
+ r = io_ctx.remove(header_oid);
if (r < 0 && r != -ENOENT)
- lderr(cct) << "warning: couldn't remove old metadata" << dendl;
- notify_change(io_ctx, md_oid, NULL, NULL);
+ lderr(cct) << "warning: couldn't remove old header object ("
+ << header_oid << ")" << dendl;
+ notify_change(io_ctx, header_oid, NULL, NULL);
return 0;
}
@@ -1036,46 +1140,27 @@ int info(ImageCtx *ictx, image_info_t& info, size_t infosize)
return 0;
}
-bool has_snaps(IoCtx& io_ctx, const std::string& md_oid)
-{
- CephContext *cct((CephContext *)io_ctx.cct());
- ldout(cct, 20) << "has_snaps " << &io_ctx << " " << md_oid << dendl;
-
- bufferlist bl, bl2;
- int r = io_ctx.exec(md_oid, "rbd", "snap_list", bl, bl2);
- if (r < 0) {
- lderr(cct) << "Error listing snapshots: " << cpp_strerror(-r) << dendl;
- return true;
- }
- uint32_t num_snaps;
- uint64_t snap_seq;
- bufferlist::iterator iter = bl2.begin();
- ::decode(snap_seq, iter);
- ::decode(num_snaps, iter);
- return num_snaps > 0;
-}
-
int remove(IoCtx& io_ctx, const char *imgname, ProgressContext& prog_ctx)
{
CephContext *cct((CephContext *)io_ctx.cct());
ldout(cct, 20) << "remove " << &io_ctx << " " << imgname << dendl;
- string md_oid = imgname;
- md_oid += RBD_SUFFIX;
-
- struct rbd_obj_header_ondisk header;
- int r = read_header(io_ctx, md_oid, &header, NULL);
+ ImageCtx *ictx = new ImageCtx(imgname, NULL, io_ctx);
+ int r = open_image(ictx);
if (r < 0) {
- ldout(cct, 2) << "error reading header: " << cpp_strerror(-r) << dendl;
- }
- if (r >= 0) {
- if (has_snaps(io_ctx, md_oid)) {
+ ldout(cct, 2) << "error opening image: " << cpp_strerror(-r) << dendl;
+ } else {
+ if (ictx->snaps.size()) {
lderr(cct) << "image has snapshots - not removing" << dendl;
+ close_image(ictx);
return -ENOTEMPTY;
}
- trim_image(io_ctx, header, 0, prog_ctx);
+ string header_oid = ictx->header_oid;
+ trim_image(ictx, 0, prog_ctx);
+ close_image(ictx);
+
ldout(cct, 2) << "removing header..." << dendl;
- r = io_ctx.remove(md_oid);
+ r = io_ctx.remove(header_oid);
if (r < 0 && r != -ENOENT) {
lderr(cct) << "error removing header: " << cpp_strerror(-r) << dendl;
return r;
@@ -1096,24 +1181,30 @@ int remove(IoCtx& io_ctx, const char *imgname, ProgressContext& prog_ctx)
int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
{
CephContext *cct = ictx->cct;
- if (size == ictx->header.image_size) {
- ldout(cct, 2) << "no change in size (" << ictx->header.image_size << " -> " << size << ")" << dendl;
+ if (size == ictx->size) {
+ ldout(cct, 2) << "no change in size (" << ictx->size << " -> " << size << ")" << dendl;
return 0;
}
- if (size > ictx->header.image_size) {
- ldout(cct, 2) << "expanding image " << ictx->header.image_size << " -> " << size << dendl;
- ictx->header.image_size = size;
+ if (size > ictx->size) {
+ ldout(cct, 2) << "expanding image " << ictx->size << " -> " << size << dendl;
+ // TODO: make ictx->set_size
} else {
- ldout(cct, 2) << "shrinking image " << ictx->header.image_size << " -> " << size << dendl;
- trim_image(ictx->data_ctx, ictx->header, size, prog_ctx);
- ictx->header.image_size = size;
+ ldout(cct, 2) << "shrinking image " << ictx->size << " -> " << size << dendl;
+ trim_image(ictx, size, prog_ctx);
}
+ ictx->size = size;
- // rewrite header
- bufferlist bl;
- bl.append((const char *)&(ictx->header), sizeof(ictx->header));
- int r = ictx->md_ctx.write(ictx->md_oid(), bl, bl.length(), 0);
+ int r;
+ if (ictx->old_format) {
+ // rewrite header
+ bufferlist bl;
+ ictx->header.image_size = size;
+ bl.append((const char *)&(ictx->header), sizeof(ictx->header));
+ r = ictx->md_ctx.write(ictx->header_oid, bl, bl.length(), 0);
+ } else {
+ r = cls_client::set_size(&(ictx->md_ctx), ictx->header_oid, size);
+ }
if (r == -ERANGE)
lderr(cct) << "operation might have conflicted with another client!" << dendl;
@@ -1121,7 +1212,7 @@ int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
lderr(cct) << "error writing header: " << cpp_strerror(-r) << dendl;
return r;
} else {
- notify_change(ictx->md_ctx, ictx->md_oid(), NULL, ictx);
+ notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
}
return 0;
@@ -1130,14 +1221,15 @@ int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
int resize(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
{
CephContext *cct = ictx->cct;
- ldout(cct, 20) << "resize " << ictx << " " << ictx->header.image_size << " -> " << size << dendl;
+ ldout(cct, 20) << "resize " << ictx << " " << ictx->size << " -> "
+ << size << dendl;
int r = ictx_check(ictx);
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
- if (size < ictx->header.image_size && ictx->object_cacher) {
+ if (size < ictx->size && ictx->object_cacher) {
// need to invalidate since we're deleting objects, and
// ObjectCacher doesn't track non-existent objects
ictx->invalidate_cache();
@@ -1176,7 +1268,6 @@ int add_snap(ImageCtx *ictx, const char *snap_name)
{
assert(ictx->lock.is_locked());
- bufferlist bl, bl2;
uint64_t snap_id;
int r = ictx->md_ctx.selfmanaged_snap_create(&snap_id);
@@ -1185,15 +1276,19 @@ int add_snap(ImageCtx *ictx, const char *snap_name)
return r;
}
- ::encode(snap_name, bl);
- ::encode(snap_id, bl);
+ if (ictx->old_format) {
+ r = cls_client::old_snapshot_add(&ictx->md_ctx, ictx->header_oid,
+ snap_id, snap_name);
+ } else {
+ r = cls_client::snapshot_add(&ictx->md_ctx, ictx->header_oid,
+ snap_id, snap_name);
+ }
- r = ictx->md_ctx.exec(ictx->md_oid(), "rbd", "snap_add", bl, bl2);
if (r < 0) {
- lderr(ictx->cct) << "rbd.snap_add execution failed failed: " << cpp_strerror(-r) << dendl;
+ lderr(ictx->cct) << "adding snapshot to header failed: "
+ << cpp_strerror(r) << dendl;
return r;
}
- notify_change(ictx->md_ctx, ictx->md_oid(), NULL, ictx);
return 0;
}
@@ -1202,12 +1297,19 @@ int rm_snap(ImageCtx *ictx, const char *snap_name)
{
assert(ictx->lock.is_locked());
- bufferlist bl, bl2;
- ::encode(snap_name, bl);
+ int r;
+ if (ictx->old_format) {
+ r = cls_client::old_snapshot_remove(&ictx->md_ctx,
+ ictx->header_oid, snap_name);
+ } else {
+ r = cls_client::snapshot_remove(&ictx->md_ctx,
+ ictx->header_oid,
+ ictx->get_snapid(snap_name));
+ }
- int r = ictx->md_ctx.exec(ictx->md_oid(), "rbd", "snap_remove", bl, bl2);
if (r < 0) {
- lderr(ictx->cct) << "rbd.snap_remove execution failed: " << cpp_strerror(-r) << dendl;
+ lderr(ictx->cct) << "removing snapshot from header failed: "
+ << cpp_strerror(r) << dendl;
return r;
}
@@ -1246,44 +1348,70 @@ int ictx_refresh(ImageCtx *ictx)
ictx->needs_refresh = false;
ictx->refresh_lock.Unlock();
- int r = read_header(ictx->md_ctx, ictx->md_oid(), &(ictx->header), NULL);
- if (r < 0) {
- lderr(cct) << "Error reading header: " << cpp_strerror(-r) << dendl;
- return r;
- }
- r = ictx->md_ctx.exec(ictx->md_oid(), "rbd", "snap_list", bl, bl2);
- if (r < 0) {
- lderr(cct) << "Error listing snapshots: " << cpp_strerror(-r) << dendl;
- return r;
- }
- r = 0;
+ int r;
+ ::SnapContext new_snapc;
+ bool new_snap;
+ vector<string> snap_names;
+ vector<uint64_t> snap_sizes;
+ vector<uint64_t> snap_features;
+ if (ictx->old_format) {
+ r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL);
+ if (r < 0) {
+ lderr(cct) << "Error reading header: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ r = cls_client::old_snapshot_list(&ictx->md_ctx, ictx->header_oid,
+ &snap_names, &snap_sizes, &new_snapc);
+ if (r < 0) {
+ lderr(cct) << "Error listing snapshots: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ ictx->order = ictx->header.options.order;
+ ictx->size = ictx->header.image_size;
+ ictx->object_prefix = ictx->header.block_name;
+ } else {
+ do {
+ uint64_t incompatible_features;
+ r = cls_client::get_mutable_metadata(&ictx->md_ctx, ictx->header_oid,
+ &ictx->size, &ictx->features,
+ &incompatible_features,
+ &new_snapc);
+ if (r < 0) {
+ lderr(cct) << "Error reading mutable metadata: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
- std::map<snap_t, std::string> old_snap_ids;
- for (std::map<std::string, struct SnapInfo>::iterator it =
- ictx->snaps_by_name.begin(); it != ictx->snaps_by_name.end(); ++it) {
- old_snap_ids[it->second.id] = it->first;
+ uint64_t unsupported = incompatible_features & ~RBD_FEATURES_ALL;
+ if (unsupported) {
+ lderr(ictx->cct) << "Image uses unsupported features: "
+ << unsupported << dendl;
+ return -ENOSYS;
+ }
+ r = cls_client::snapshot_list(&(ictx->md_ctx), ictx->header_oid,
+ new_snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features);
+ // -ENOENT here means we raced with snapshot deletion
+ if (r < 0 && r != -ENOENT) {
+ lderr(ictx->cct) << "snapc = " << new_snapc << dendl;
+ lderr(ictx->cct) << "Error listing snapshots: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ } while (r == -ENOENT);
}
- bool new_snap = false;
ictx->snaps.clear();
- ictx->snapc.snaps.clear();
ictx->snaps_by_name.clear();
-
- uint32_t num_snaps;
- bufferlist::iterator iter = bl2.begin();
- ::decode(ictx->snapc.seq, iter);
- ::decode(num_snaps, iter);
- for (uint32_t i=0; i < num_snaps; i++) {
- uint64_t id, image_size;
- string s;
- ::decode(id, iter);
- ::decode(image_size, iter);
- ::decode(s, iter);
- ictx->add_snap(s, id, image_size);
- std::map<snap_t, std::string>::const_iterator it = old_snap_ids.find(id);
- if (it == old_snap_ids.end()) {
+ for (size_t i = 0; i < new_snapc.snaps.size(); ++i) {
+ uint64_t features = ictx->old_format ? 0 : snap_features[i];
+ ictx->add_snap(snap_names[i], new_snapc.snaps[i].val,
+ snap_sizes[i], features);
+ std::vector<snap_t>::const_iterator it =
+ find(ictx->snaps.begin(), ictx->snaps.end(), new_snapc.snaps[i].val);
+ if (it == ictx->snaps.end()) {
new_snap = true;
- ldout(cct, 20) << "new snapshot " << s << " size " << image_size << dendl;
+ ldout(cct, 20) << "new snapshot id " << *it << " size " << snap_sizes[i]
+ << dendl;
}
}
@@ -1299,8 +1427,10 @@ int ictx_refresh(ImageCtx *ictx)
return -EIO;
}
+ ictx->snapc = new_snapc;
+
if (ictx->snapid != CEPH_NOSNAP &&
- ictx->get_snapid(ictx->snapname) == CEPH_NOSNAP) {
+ ictx->get_snapid(ictx->snapname) != ictx->snapid) {
lderr(cct) << "tried to read from a snapshot that no longer exists: "
<< ictx->snapname << dendl;
ictx->snap_exists = false;
@@ -1391,7 +1521,7 @@ int snap_rollback(ImageCtx *ictx, const char *snap_name, ProgressContext& prog_c
snap_t new_snapid = ictx->get_snapid(snap_name);
ldout(cct, 20) << "snapid is " << ictx->snapid << " new snapid is " << new_snapid << dendl;
- notify_change(ictx->md_ctx, ictx->md_oid(), NULL, ictx);
+ notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
ictx->perfcounter->inc(l_librbd_snap_rollback);
return r;
@@ -1426,8 +1556,9 @@ int copy(ImageCtx& ictx, IoCtx& dest_md_ctx, const char *destname,
uint64_t src_size = ictx.get_image_size();
int64_t r;
- int order = ictx.header.options.order;
- r = create(dest_md_ctx, destname, src_size, &order);
+ int order = ictx.order;
+ r = create(dest_md_ctx, destname, src_size, ictx.old_format,
+ ictx.features, &order);
if (r < 0) {
lderr(cct) << "header creation failed" << dendl;
return r;
@@ -1478,9 +1609,12 @@ int open_image(ImageCtx *ictx)
ldout(ictx->cct, 20) << "open_image: ictx = " << ictx
<< " name = '" << ictx->name << "' snap_name = '"
<< ictx->snapname << "'" << dendl;
+ int r = ictx->init();
+ if (r < 0)
+ return r;
ictx->lock.Lock();
- int r = ictx_refresh(ictx);
+ r = ictx_refresh(ictx);
ictx->lock.Unlock();
if (r < 0)
return r;
@@ -1489,11 +1623,9 @@ int open_image(ImageCtx *ictx)
ictx->data_ctx.snap_set_read(ictx->snapid);
WatchCtx *wctx = new WatchCtx(ictx);
- if (!wctx)
- return -ENOMEM;
ictx->wctx = wctx;
- r = ictx->md_ctx.watch(ictx->md_oid(), 0, &(wctx->cookie), wctx);
+ r = ictx->md_ctx.watch(ictx->header_oid, 0, &(wctx->cookie), wctx);
return r;
}
@@ -1506,7 +1638,7 @@ void close_image(ImageCtx *ictx)
flush(ictx);
ictx->lock.Lock();
ictx->wctx->invalidate();
- ictx->md_ctx.unwatch(ictx->md_oid(), ictx->wctx->cookie);
+ ictx->md_ctx.unwatch(ictx->header_oid, ictx->wctx->cookie);
delete ictx->wctx;
ictx->lock.Unlock();
delete ictx;
@@ -1531,9 +1663,9 @@ int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
int64_t ret;
int64_t total_read = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
@@ -1541,8 +1673,8 @@ int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_read);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
ictx->lock.Unlock();
uint64_t read_len = min(block_size - block_ofs, left);
uint64_t bytes_read;
@@ -1620,9 +1752,9 @@ ssize_t write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf)
size_t total_write = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
snapid_t snap = ictx->snapid;
ictx->lock.Unlock();
uint64_t left = len;
@@ -1634,8 +1766,8 @@ ssize_t write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf)
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
bl.append(buf + total_write, write_len);
@@ -1677,9 +1809,9 @@ int discard(ImageCtx *ictx, uint64_t off, uint64_t len)
size_t total_write = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
@@ -1690,8 +1822,8 @@ int discard(ImageCtx *ictx, uint64_t off, uint64_t len)
start_time = ceph_clock_now(ictx->cct);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
@@ -1890,9 +2022,9 @@ int aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
size_t total_write = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
snapid_t snap = ictx->snapid;
ictx->lock.Unlock();
uint64_t left = len;
@@ -1908,8 +2040,8 @@ int aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
c->init_time(ictx, AIO_TYPE_WRITE);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
@@ -1956,9 +2088,9 @@ int aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
size_t total_write = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
@@ -1974,8 +2106,8 @@ int aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
c->init_time(ictx, AIO_TYPE_DISCARD);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
AioBlockCompletion *block_completion = new AioBlockCompletion(cct, c, off, len, NULL);
@@ -2044,9 +2176,9 @@ int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
int64_t ret;
int total_read = 0;
ictx->lock.Lock();
- uint64_t start_block = get_block_num(ictx->header, off);
- uint64_t end_block = get_block_num(ictx->header, off + len - 1);
- uint64_t block_size = get_block_size(ictx->header);
+ uint64_t start_block = get_block_num(ictx->order, off);
+ uint64_t end_block = get_block_num(ictx->order, off + len - 1);
+ uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
@@ -2055,8 +2187,8 @@ int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
- string oid = get_block_oid(ictx->header, i);
- uint64_t block_ofs = get_block_ofs(ictx->header, off + total_read);
+ string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
+ uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
ictx->lock.Unlock();
uint64_t read_len = min(block_size - block_ofs, left);
@@ -2124,8 +2256,6 @@ int RBD::open(IoCtx& io_ctx, Image& image, const char *name)
int RBD::open(IoCtx& io_ctx, Image& image, const char *name, const char *snapname)
{
ImageCtx *ictx = new ImageCtx(name, snapname, io_ctx);
- if (!ictx)
- return -ENOMEM;
int r = librbd::open_image(ictx);
if (r < 0)
@@ -2137,8 +2267,13 @@ int RBD::open(IoCtx& io_ctx, Image& image, const char *name, const char *snapnam
int RBD::create(IoCtx& io_ctx, const char *name, uint64_t size, int *order)
{
- int r = librbd::create(io_ctx, name, size, order);
- return r;
+ return librbd::create(io_ctx, name, size, true, 0, order);
+}
+
+int RBD::create2(IoCtx& io_ctx, const char *name, uint64_t size,
+ uint64_t features, int *order)
+{
+ return librbd::create(io_ctx, name, size, false, features, order);
}
int RBD::remove(IoCtx& io_ctx, const char *name)
@@ -2392,7 +2527,16 @@ extern "C" int rbd_create(rados_ioctx_t p, const char *name, uint64_t size, int
{
librados::IoCtx io_ctx;
librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
- return librbd::create(io_ctx, name, size, order);
+ return librbd::create(io_ctx, name, size, true, 0, order);
+}
+
+extern "C" int rbd_create2(rados_ioctx_t p, const char *name,
+ uint64_t size, uint64_t features,
+ int *order)
+{
+ librados::IoCtx io_ctx;
+ librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
+ return librbd::create(io_ctx, name, size, false, features, order);
}
extern "C" int rbd_remove(rados_ioctx_t p, const char *name)
@@ -2444,8 +2588,6 @@ extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image, c
librados::IoCtx io_ctx;
librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
librbd::ImageCtx *ictx = new librbd::ImageCtx(name, snap_name, io_ctx);
- if (!ictx)
- return -ENOMEM;
int r = librbd::open_image(ictx);
*image = (rbd_image_t)ictx;
return r;
diff --git a/src/librbd/cls_rbd_client.cc b/src/librbd/cls_rbd_client.cc
new file mode 100644
index 00000000000..cb42eb7c178
--- /dev/null
+++ b/src/librbd/cls_rbd_client.cc
@@ -0,0 +1,318 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+
+#include "cls_rbd_client.h"
+
+#include <errno.h>
+
+namespace librbd {
+ namespace cls_client {
+ int get_immutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+ std::string *object_prefix, uint8_t *order)
+ {
+ assert(object_prefix);
+ assert(order);
+
+ librados::ObjectReadOperation op;
+ bufferlist bl, empty;
+ snapid_t snap = CEPH_NOSNAP;
+ ::encode(snap, bl);
+ op.exec("rbd", "get_size", bl);
+ op.exec("rbd", "get_object_prefix", empty);
+
+ bufferlist outbl;
+ ioctx->operate(oid, &op, &outbl);
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ uint64_t size;
+ ::decode(*order, iter);
+ ::decode(size, iter);
+ ::decode(*object_prefix, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int get_mutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t *size, uint64_t *features,
+ uint64_t *incompatible_features,
+ ::SnapContext *snapc)
+ {
+ assert(size);
+ assert(features);
+ assert(incompatible_features);
+ assert(snapc);
+
+ librados::ObjectReadOperation op;
+ bufferlist sizebl, featuresbl, empty;
+ snapid_t snap = CEPH_NOSNAP;
+ ::encode(snap, sizebl);
+ ::encode(snap, featuresbl);
+ op.exec("rbd", "get_size", sizebl);
+ op.exec("rbd", "get_features", featuresbl);
+ op.exec("rbd", "get_snapcontext", empty);
+
+ bufferlist outbl;
+ ioctx->operate(oid, &op, &outbl);
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ uint8_t order;
+ ::decode(order, iter);
+ ::decode(*size, iter);
+ ::decode(*features, iter);
+ ::decode(*incompatible_features, iter);
+ ::decode(*snapc, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int create_image(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t size, uint8_t order, uint64_t features,
+ const std::string &object_prefix)
+ {
+ bufferlist bl, bl2;
+ ::encode(size, bl);
+ ::encode(order, bl);
+ ::encode(features, bl);
+ ::encode(object_prefix, (bl));
+
+ return ioctx->exec(oid, "rbd", "create", bl, bl2);
+ }
+
+ int get_features(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, uint64_t *features)
+ {
+ bufferlist inbl, outbl;
+ ::encode(snap_id, inbl);
+
+ int r = ioctx->exec(oid, "rbd", "get_features", inbl, outbl);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode(*features, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int get_object_prefix(librados::IoCtx *ioctx, const std::string &oid,
+ std::string *object_prefix)
+ {
+ bufferlist inbl, outbl;
+ int r = ioctx->exec(oid, "rbd", "get_object_prefix", inbl, outbl);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode(*object_prefix, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int get_size(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, uint64_t *size, uint8_t *order)
+ {
+ bufferlist inbl, outbl;
+ ::encode(snap_id, inbl);
+
+ int r = ioctx->exec(oid, "rbd", "get_size", inbl, outbl);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode(*order, iter);
+ ::decode(*size, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int set_size(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t size)
+ {
+ bufferlist bl, bl2;
+ ::encode(size, bl);
+
+ return ioctx->exec(oid, "rbd", "set_size", bl, bl2);
+ }
+
+ int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, const std::string &snap_name)
+ {
+ bufferlist bl, bl2;
+ ::encode(snap_name, bl);
+ ::encode(snap_id, bl);
+
+ return ioctx->exec(oid, "rbd", "snapshot_add", bl, bl2);
+ }
+
+ int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id)
+ {
+ bufferlist bl, bl2;
+ ::encode(snap_id, bl);
+
+ return ioctx->exec(oid, "rbd", "snapshot_remove", bl, bl2);
+ }
+
+ int get_snapcontext(librados::IoCtx *ioctx, const std::string &oid,
+ ::SnapContext *snapc)
+ {
+ bufferlist inbl, outbl;
+
+ int r = ioctx->exec(oid, "rbd", "get_snapcontext", inbl, outbl);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode(*snapc, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ if (!snapc->is_valid())
+ return -EBADMSG;
+
+ return 0;
+ }
+
+ int snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+ const std::vector<snapid_t> &ids,
+ std::vector<string> *names,
+ std::vector<uint64_t> *sizes,
+ std::vector<uint64_t> *features)
+ {
+ names->clear();
+ names->resize(ids.size());
+ sizes->clear();
+ sizes->resize(ids.size());
+ features->clear();
+ features->resize(ids.size());
+ librados::ObjectReadOperation op;
+ for (vector<snapid_t>::const_iterator it = ids.begin();
+ it != ids.end(); ++it) {
+ bufferlist bl1, bl2, bl3;
+ uint64_t snap_id = it->val;
+ ::encode(snap_id, bl1);
+ op.exec("rbd", "get_snapshot_name", bl1);
+ ::encode(snap_id, bl2);
+ op.exec("rbd", "get_size", bl2);
+ ::encode(snap_id, bl3);
+ op.exec("rbd", "get_features", bl3);
+ }
+
+ bufferlist outbl;
+ int r = ioctx->operate(oid, &op, &outbl);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ for (size_t i = 0; i < ids.size(); ++i) {
+ uint8_t order;
+ uint64_t incompat_features;
+ ::decode((*names)[i], iter);
+ ::decode(order, iter);
+ ::decode((*sizes)[i], iter);
+ ::decode((*features)[i], iter);
+ ::decode(incompat_features, iter);
+ }
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int assign_bid(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t *id)
+ {
+ bufferlist bl, out;
+ int r = ioctx->exec(oid, "rbd", "assign_bid", bl, out);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(*id, iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+
+ int old_snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, const std::string &snap_name)
+ {
+ bufferlist bl, bl2;
+ ::encode(snap_name, bl);
+ ::encode(snap_id, bl);
+
+ return ioctx->exec(oid, "rbd", "snap_add", bl, bl2);
+ }
+
+ int old_snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+ const std::string &snap_name)
+ {
+ bufferlist bl, bl2;
+ ::encode(snap_name, bl);
+
+ return ioctx->exec(oid, "rbd", "snap_remove", bl, bl2);
+ }
+
+ int old_snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+ std::vector<string> *names,
+ std::vector<uint64_t> *sizes,
+ ::SnapContext *snapc)
+ {
+ bufferlist bl, outbl;
+ int r = ioctx->exec(oid, "rbd", "snap_list", bl, outbl);
+ if (r < 0)
+ return r;
+
+ bufferlist::iterator iter = outbl.begin();
+ uint32_t num_snaps;
+ try {
+ ::decode(snapc->seq, iter);
+ ::decode(num_snaps, iter);
+
+ names->resize(num_snaps);
+ sizes->resize(num_snaps);
+ snapc->snaps.resize(num_snaps);
+
+ for (uint32_t i = 0; i < num_snaps; ++i) {
+ ::decode(snapc->snaps[i], iter);
+ ::decode((*sizes)[i], iter);
+ ::decode((*names)[i], iter);
+ }
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+
+ return 0;
+ }
+ } // namespace cls_client
+} // namespace librbd
diff --git a/src/librbd/cls_rbd_client.h b/src/librbd/cls_rbd_client.h
new file mode 100644
index 00000000000..79654480546
--- /dev/null
+++ b/src/librbd/cls_rbd_client.h
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CLS_RBD_CLIENT_H
+#define CEPH_LIBRBD_CLS_RBD_CLIENT_H
+
+#include "common/snap_types.h"
+#include "include/rados.h"
+#include "include/rados/librados.hpp"
+#include "include/types.h"
+
+#include <string>
+#include <vector>
+
+namespace librbd {
+ namespace cls_client {
+
+ // high-level interface to the header
+ int get_immutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+ std::string *object_prefix, uint8_t *order);
+ int get_mutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t *size, uint64_t *features,
+ uint64_t *incompatible_features,
+ ::SnapContext *snapc);
+
+ // low-level interface (mainly for testing)
+ int create_image(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t size, uint8_t order, uint64_t features,
+ const std::string &object_prefix);
+ int get_features(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, uint64_t *features);
+ int get_object_prefix(librados::IoCtx *ioctx, const std::string &oid,
+ std::string *object_prefix);
+ int get_size(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, uint64_t *size, uint8_t *order);
+ int set_size(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t size);
+ int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, const std::string &snap_name);
+ int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id);
+ int get_snapcontext(librados::IoCtx *ioctx, const std::string &oid,
+ ::SnapContext *snapc);
+ int snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+ const std::vector<snapid_t> &ids,
+ std::vector<string> *names,
+ std::vector<uint64_t> *sizes,
+ std::vector<uint64_t> *features);
+ int assign_bid(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t *id);
+
+
+ // class operations on the old format, kept for
+ // backwards compatability
+ int old_snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+ uint64_t snap_id, const std::string &snap_name);
+ int old_snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+ const std::string &snap_name);
+ int old_snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+ std::vector<string> *names,
+ std::vector<uint64_t> *sizes,
+ ::SnapContext *snapc);
+ } // namespace cls_client
+} // namespace librbd
+#endif // CEPH_LIBRBD_CLS_RBD_CLIENT_H
diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc
index aa00581a0e1..e5db43d27d4 100644
--- a/src/objclass/class_api.cc
+++ b/src/objclass/class_api.cc
@@ -1,3 +1,5 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
#include "common/config.h"
@@ -249,7 +251,7 @@ int cls_cxx_snap_revert(cls_method_context_t hctx, snapid_t snapid)
return (*pctx)->pg->do_osd_ops(*pctx, ops);
}
-int cls_cxx_map_read_all_keys(cls_method_context_t hctx, map<string, bufferlist>* vals)
+int cls_cxx_map_get_all_vals(cls_method_context_t hctx, map<string, bufferlist>* vals)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
vector<OSDOp> ops(1);
@@ -280,8 +282,35 @@ int cls_cxx_map_read_all_keys(cls_method_context_t hctx, map<string, bufferlist>
return vals->size();
}
-int cls_cxx_map_read_keys(cls_method_context_t hctx, string& start_obj,
- string& filter_prefix, uint64_t max, map<string, bufferlist>* vals)
+int cls_cxx_map_get_keys(cls_method_context_t hctx, const string &start_obj,
+ uint64_t max_to_get, set<string> *keys)
+{
+ ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
+ vector<OSDOp> ops(1);
+ OSDOp& op = ops[0];
+ int ret;
+
+ ::encode(start_obj, op.indata);
+ ::encode(max_to_get, op.indata);
+
+ op.op.op = CEPH_OSD_OP_OMAPGETKEYS;
+
+ ret = (*pctx)->pg->do_osd_ops(*pctx, ops);
+ if (ret < 0)
+ return ret;
+
+ bufferlist::iterator iter = op.outdata.begin();
+ try {
+ ::decode(*keys, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return keys->size();
+}
+
+int cls_cxx_map_get_vals(cls_method_context_t hctx, const string &start_obj,
+ const string &filter_prefix, uint64_t max_to_get,
+ map<string, bufferlist> *vals)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
vector<OSDOp> ops(1);
@@ -292,7 +321,7 @@ int cls_cxx_map_read_keys(cls_method_context_t hctx, string& start_obj,
bufferlist inbl;
::encode(start_obj, op.indata);
- ::encode(max, op.indata);
+ ::encode(max_to_get, op.indata);
::encode(filter_prefix, op.indata);
op.op.op = CEPH_OSD_OP_OMAPGETVALS;
@@ -325,7 +354,9 @@ int cls_cxx_map_read_header(cls_method_context_t hctx, bufferlist *outbl)
return 0;
}
-int cls_cxx_map_read_key(cls_method_context_t hctx, string key, bufferlist *outbl)
+
+int cls_cxx_map_get_val(cls_method_context_t hctx, const string &key,
+ bufferlist *outbl)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
vector<OSDOp> ops(1);
@@ -357,7 +388,8 @@ int cls_cxx_map_read_key(cls_method_context_t hctx, string key, bufferlist *outb
return 0;
}
-int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inbl)
+int cls_cxx_map_set_val(cls_method_context_t hctx, const string &key,
+ bufferlist *inbl)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
vector<OSDOp> ops(1);
@@ -372,6 +404,20 @@ int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inb
return (*pctx)->pg->do_osd_ops(*pctx, ops);
}
+int cls_cxx_map_set_vals(cls_method_context_t hctx,
+ std::map<string, bufferlist> *map)
+{
+ ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
+ vector<OSDOp> ops(1);
+ OSDOp& op = ops[0];
+ bufferlist& update_bl = op.indata;
+ ::encode(*map, update_bl);
+
+ op.op.op = CEPH_OSD_OP_OMAPSETVALS;
+
+ return (*pctx)->pg->do_osd_ops(*pctx, ops);
+}
+
int cls_cxx_map_clear(cls_method_context_t hctx)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
@@ -395,7 +441,7 @@ int cls_cxx_map_write_header(cls_method_context_t hctx, bufferlist *inbl)
return (*pctx)->pg->do_osd_ops(*pctx, ops);
}
-int cls_cxx_map_remove_key(cls_method_context_t hctx, string key)
+int cls_cxx_map_remove_key(cls_method_context_t hctx, const string &key)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
vector<OSDOp> ops(1);
diff --git a/src/objclass/class_debug.cc b/src/objclass/class_debug.cc
index 8ebf82ccb13..7b52fbb7b17 100644
--- a/src/objclass/class_debug.cc
+++ b/src/objclass/class_debug.cc
@@ -1,3 +1,6 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include "common/config.h"
#include "common/debug.h"
@@ -11,7 +14,7 @@
#define dout_subsys ceph_subsys_objclass
-int cls_log(const char *format, ...)
+int cls_log(int level, const char *format, ...)
{
int size = 256, n;
va_list ap;
@@ -22,7 +25,7 @@ int cls_log(const char *format, ...)
va_end(ap);
#define MAX_SIZE 8196
if ((n > -1 && n < size) || size > MAX_SIZE) {
- dout(1) << buf << dendl;
+ dout(level) << buf << dendl;
return n;
}
size *= 2;
diff --git a/src/objclass/objclass.h b/src/objclass/objclass.h
index b0ec28bd6ef..5d25077c812 100644
--- a/src/objclass/objclass.h
+++ b/src/objclass/objclass.h
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#ifndef CEPH_OBJCLASS_H
#define CEPH_OBJCLASS_H
@@ -22,8 +25,9 @@ const char *__cls_name = #name;
#define CLS_METHOD_PUBLIC 0x4
-#define CLS_LOG(fmt, ...) \
- cls_log("<cls> %s:%d: " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
+#define CLS_LOG(level, fmt, ...) \
+ cls_log(level, "<cls> %s:%d: " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
+#define CLS_ERR(fmt, ...) CLS_LOG(0, fmt, ##__VA_ARGS__)
void __cls_init();
@@ -39,7 +43,7 @@ typedef struct {
} cls_deps_t;
/* class utils */
-extern int cls_log(const char *format, ...);
+extern int cls_log(int level, const char *format, ...);
extern void *cls_alloc(size_t size);
extern void cls_free(void *p);
@@ -94,15 +98,27 @@ extern int cls_cxx_write_full(cls_method_context_t hctx, bufferlist *bl);
extern int cls_cxx_replace(cls_method_context_t hctx, int ofs, int len, bufferlist *bl);
extern int cls_cxx_snap_revert(cls_method_context_t hctx, snapid_t snapid);
extern int cls_cxx_map_clear(cls_method_context_t hctx);
-extern int cls_cxx_map_read_all_keys(cls_method_context_t hctx, std::map<string, bufferlist> *keys);
-extern int cls_cxx_map_read_keys(cls_method_context_t hctx, string& start_after, string& filter_prefix,
- uint64_t max, std::map<string, bufferlist> *keys);
+extern int cls_cxx_map_get_all_vals(cls_method_context_t hctx,
+ std::map<string, bufferlist> *vals);
+extern int cls_cxx_map_get_keys(cls_method_context_t hctx,
+ const string &start_after,
+ uint64_t max_to_get,
+ std::set<string> *keys);
+extern int cls_cxx_map_get_vals(cls_method_context_t hctx,
+ const string &start_after,
+ const string &filter_prefix,
+ uint64_t max_to_get,
+ std::map<string, bufferlist> *vals);
extern int cls_cxx_map_read_header(cls_method_context_t hctx, bufferlist *outbl);
-extern int cls_cxx_map_read_key(cls_method_context_t hctx, string key, bufferlist *outbl);
-extern int cls_cxx_map_write_key(cls_method_context_t hctx, string key, bufferlist *inbl);
+extern int cls_cxx_map_get_val(cls_method_context_t hctx,
+ const string &key, bufferlist *outbl);
+extern int cls_cxx_map_set_val(cls_method_context_t hctx,
+ const string &key, bufferlist *inbl);
+extern int cls_cxx_map_set_vals(cls_method_context_t hctx,
+ std::map<string, bufferlist> *map);
extern int cls_cxx_map_write_header(cls_method_context_t hctx, bufferlist *inbl);
-extern int cls_cxx_map_remove_key(cls_method_context_t hctx, string key);
-extern int cls_cxx_map_update(cls_method_context_t hctx, bufferlist* inbl);
+extern int cls_cxx_map_remove_key(cls_method_context_t hctx, const string &key);
+extern int cls_cxx_map_update(cls_method_context_t hctx, bufferlist *inbl);
/* These are also defined in rados.h and librados.h. Keep them in sync! */
#define CEPH_OSD_TMAP_HDR 'h'
diff --git a/src/os/DBObjectMap.cc b/src/os/DBObjectMap.cc
index 561de6c2b77..584b8df721c 100644
--- a/src/os/DBObjectMap.cc
+++ b/src/os/DBObjectMap.cc
@@ -829,7 +829,7 @@ int DBObjectMap::get_values(const hobject_t &hoid,
Header header = lookup_map_header(hoid);
if (!header)
return -ENOENT;
- return scan(header, keys, 0, out);;
+ return scan(header, keys, 0, out);
}
int DBObjectMap::check_keys(const hobject_t &hoid,
diff --git a/src/pybind/rbd.py b/src/pybind/rbd.py
index 55a9d67b7f5..9fc537ea136 100644
--- a/src/pybind/rbd.py
+++ b/src/pybind/rbd.py
@@ -120,7 +120,8 @@ class RBD(object):
self.librbd.rbd_version(byref(major), byref(minor), byref(extra))
return (major.value, minor.value, extra.value)
- def create(self, ioctx, name, size, order=None):
+ def create(self, ioctx, name, size, order=None, old_format=True,
+ features=0):
"""
Create an rbd image.
@@ -132,14 +133,27 @@ class RBD(object):
:type size: int
:param order: the image is split into (2**order) byte objects
:type order: int
+ :param old_format: whether to create an old-style image that
+ is accessible by old clients, but can't
+ use more advanced features like layering.
+ :type old_format: bool
+ :param features: bitmask of features to enable
+ :type features: int
:raises: :class:`ImageExists`
"""
if order is None:
order = 0
if not isinstance(name, str):
raise TypeError('name must be a string')
- ret = self.librbd.rbd_create(ioctx.io, c_char_p(name), c_uint64(size),
- byref(c_int(order)))
+ if old_format:
+ ret = self.librbd.rbd_create(ioctx.io, c_char_p(name),
+ c_uint64(size),
+ byref(c_int(order)))
+ else:
+ ret = self.librbd.rbd_create2(ioctx.io, c_char_p(name),
+ c_uint64(size),
+ c_uint64(features),
+ byref(c_int(order)))
if ret < 0:
raise make_ex(ret, 'error creating image')
diff --git a/src/rados.cc b/src/rados.cc
index a562a485558..7b7248f446e 100644
--- a/src/rados.cc
+++ b/src/rados.cc
@@ -24,6 +24,7 @@ using namespace librados;
#include "global/global_init.h"
#include "common/Cond.h"
#include "common/debug.h"
+#include "common/errno.h"
#include "common/Formatter.h"
#include "common/obj_bencher.h"
#include "mds/inode_backtrace.h"
@@ -78,8 +79,13 @@ void usage(ostream& out)
" bench <seconds> write|seq|rand [-t concurrent_operations]\n"
" default is 16 concurrent IOs and 4 MB ops\n"
" load-gen [options] generate load on the cluster\n"
-" listomap <obj-name> list the keys in the object map\n"
-" getomap <obj-name> <key> show the value for the specified key in the object's object map"
+" listomapkeys <obj-name> list the keys in the object map\n"
+" getomapval <obj-name> <key> show the value for the specified key in the object's object map"
+" setomapval <obj-name> <key> <val>\n"
+" listomapvals <obj-name> <key> <val>\n"
+" rmomapkey <obj-name> <key> <val>\n"
+" getomapheader <obj-name>\n"
+" setomapheader <obj-name> <val>\n"
"\n"
"IMPORT AND EXPORT\n"
" import [options] <local-directory> <rados-pool>\n"
@@ -1104,6 +1110,129 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
iter != attrset.end(); ++iter) {
cout << iter->first << std::endl;
}
+ } else if (strcmp(nargs[0], "getomapheader") == 0) {
+ if (!pool_name || nargs.size() < 2)
+ usage_exit();
+
+ string oid(nargs[1]);
+
+ bufferlist header;
+ ret = io_ctx.omap_get_header(oid, &header);
+ if (ret < 0) {
+ cerr << "error getting omap header " << pool_name << "/" << oid
+ << ": " << cpp_strerror(ret) << std::endl;
+ return 1;
+ } else {
+ cout << "header (" << header.length() << " bytes) :\n";
+ header.hexdump(cout);
+ cout << std::endl;
+ ret = 0;
+ }
+ } else if (strcmp(nargs[0], "setomapheader") == 0) {
+ if (!pool_name || nargs.size() < 3)
+ usage_exit();
+
+ string oid(nargs[1]);
+ string val(nargs[2]);
+
+ bufferlist bl;
+ bl.append(val);
+
+ ret = io_ctx.omap_set_header(oid, bl);
+ if (ret < 0) {
+ cerr << "error setting omap value " << pool_name << "/" << oid
+ << ": " << cpp_strerror(ret) << std::endl;
+ return 1;
+ } else {
+ ret = 0;
+ }
+ } else if (strcmp(nargs[0], "setomapval") == 0) {
+ if (!pool_name || nargs.size() < 4)
+ usage_exit();
+
+ string oid(nargs[1]);
+ string key(nargs[2]);
+ string val(nargs[3]);
+
+ map<string, bufferlist> values;
+ bufferlist bl;
+ bl.append(val);
+ values[key] = bl;
+
+ ret = io_ctx.omap_set(oid, values);
+ if (ret < 0) {
+ cerr << "error setting omap value " << pool_name << "/" << oid << "/"
+ << key << ": " << cpp_strerror(ret) << std::endl;
+ return 1;
+ } else {
+ ret = 0;
+ }
+ } else if (strcmp(nargs[0], "getomapval") == 0) {
+ if (!pool_name || nargs.size() < 3)
+ usage_exit();
+
+ string oid(nargs[1]);
+ string key(nargs[2]);
+
+ map<string, bufferlist> values;
+ ret = io_ctx.omap_get_vals(oid, key, 1, &values);
+ if (ret < 0) {
+ cerr << "error getting omap value " << pool_name << "/" << oid << "/"
+ << key << ": " << cpp_strerror(ret) << std::endl;
+ return 1;
+ } else {
+ ret = 0;
+ }
+
+ if (values.size() && values.begin()->first == key) {
+ cout << " (length " << values.begin()->second.length() << ") : ";
+ values.begin()->second.hexdump(cout);
+ cout << std::endl;
+ } else {
+ cout << "No such key: " << pool_name << "/" << oid << "/" << key
+ << std::endl;
+ return 1;
+ }
+ } else if (strcmp(nargs[0], "rmomapkey") == 0) {
+ if (!pool_name || nargs.size() < 3)
+ usage_exit();
+
+ string oid(nargs[1]);
+ string key(nargs[2]);
+ set<string> keys;
+ keys.insert(key);
+
+ ret = io_ctx.omap_rm_keys(oid, keys);
+ if (ret < 0) {
+ cerr << "error removing omap key " << pool_name << "/" << oid << "/"
+ << key << ": " << cpp_strerror(ret) << std::endl;
+ return 1;
+ } else {
+ ret = 0;
+ }
+ } else if (strcmp(nargs[0], "listomapvals") == 0) {
+ if (!pool_name || nargs.size() < 2)
+ usage_exit();
+
+ string oid(nargs[1]);
+ string last_read = "";
+ int MAX_READ = 512;
+ do {
+ map<string, bufferlist> values;
+ ret = io_ctx.omap_get_vals(oid, last_read, MAX_READ, &values);
+ if (ret < 0) {
+ cerr << "error getting omap keys " << pool_name << "/" << oid << ": "
+ << cpp_strerror(ret) << std::endl;
+ return 1;
+ }
+ for (map<string, bufferlist>::const_iterator it = values.begin();
+ it != values.end(); ++it) {
+ cout << it->first << " (" << it->second.length() << " bytes) :\n";
+ it->second.hexdump(cout);
+ cout << std::endl;
+ }
+ } while (ret == MAX_READ);
+ ret = 0;
}
else if (strcmp(nargs[0], "rm") == 0) {
if (!pool_name || nargs.size() < 2)
@@ -1357,7 +1486,7 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
cout << "load-gen will run " << lg.run_length << " seconds" << std::endl;
lg.run();
lg.cleanup();
- } else if (strcmp(nargs[0], "listomap") == 0) {
+ } else if (strcmp(nargs[0], "listomapkeys") == 0) {
if (!pool_name || nargs.size() < 2)
usage_exit();
@@ -1366,8 +1495,8 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
read.omap_get_keys("", LONG_MAX, &out_keys, &ret);
io_ctx.operate(nargs[1], &read, NULL);
if (ret < 0) {
- cerr << "error getting omap key set " << pool_name << "/" << nargs[1] << ": "
- << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
+ cerr << "error getting omap key set " << pool_name << "/"
+ << nargs[1] << ": " << cpp_strerror(ret) << std::endl;
return 1;
}
@@ -1375,25 +1504,7 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
iter != out_keys.end(); ++iter) {
cout << *iter << std::endl;
}
- } else if (strcmp(nargs[0],"getomap") == 0){
- if (!pool_name || nargs.size() < 3)
- usage_exit();
- librados::ObjectReadOperation read;
- set<string> in_keys;
- map<string,bufferlist> out_map;
- in_keys.insert(nargs[2]);
- read.omap_get_vals_by_keys(in_keys, &out_map, &ret);
- io_ctx.operate(nargs[1], &read, NULL);
- if (ret < 0) {
- cerr << "error getting omap key set " << pool_name << "/" << nargs[1] << ": "
- << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
- return 1;
- }
- for (map<string,bufferlist>::iterator iter = out_map.begin();
- iter != out_map.end(); ++iter) {
- cout << iter->second <<std::endl;
- }
- } else {
+ } else {
cerr << "unrecognized command " << nargs[0] << std::endl;
usage_exit();
}
diff --git a/src/rbd.cc b/src/rbd.cc
index adc5ab37147..17a4bbae476 100644
--- a/src/rbd.cc
+++ b/src/rbd.cc
@@ -166,9 +166,14 @@ static int do_list(librbd::RBD &rbd, librados::IoCtx& io_ctx)
}
static int do_create(librbd::RBD &rbd, librados::IoCtx& io_ctx,
- const char *imgname, uint64_t size, int *order)
+ const char *imgname, uint64_t size, int *order,
+ bool old_format, uint64_t features)
{
- int r = rbd.create(io_ctx, imgname, size, order);
+ int r;
+ if (old_format)
+ r = rbd.create(io_ctx, imgname, size, order);
+ else
+ r = rbd.create2(io_ctx, imgname, size, features, order);
if (r < 0)
return r;
return 0;
@@ -403,11 +408,10 @@ done_img:
static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
const char *imgname, int *order, const char *path,
- int64_t size)
+ bool old_format, uint64_t features, int64_t size)
{
int fd, r;
struct stat stat_buf;
- string md_oid;
struct fiemap *fiemap;
MyProgressContext pc("Importing image");
@@ -442,10 +446,7 @@ static int do_import(librbd::RBD &rbd, librados::IoCtx& io_ctx,
assert(imgname);
- md_oid = imgname;
- md_oid += RBD_SUFFIX;
-
- r = do_create(rbd, io_ctx, imgname, size, order);
+ r = do_create(rbd, io_ctx, imgname, size, order, old_format, features);
if (r < 0) {
cerr << "image creation failed" << std::endl;
return r;
@@ -592,10 +593,22 @@ static int do_watch(librados::IoCtx& pp, const char *imgname)
uint64_t cookie;
RbdWatchCtx ctx(imgname);
- md_oid = imgname;
- md_oid += RBD_SUFFIX;
+ string old_header_oid = imgname;
+ old_header_oid += RBD_SUFFIX;
+ string new_header_oid = RBD_HEADER_PREFIX;
+ new_header_oid += imgname;
+ bool old_format = true;
- int r = pp.watch(md_oid, 0, &cookie, &ctx);
+ int r = pp.stat(old_header_oid, NULL, NULL);
+ if (r < 0) {
+ r = pp.stat(new_header_oid, NULL, NULL);
+ if (r < 0)
+ return r;
+ old_format = false;
+ }
+
+ r = pp.watch(old_format ? old_header_oid : new_header_oid,
+ 0, &cookie, &ctx);
if (r < 0) {
cerr << "watch failed" << std::endl;
return r;
@@ -943,6 +956,7 @@ int main(int argc, const char **argv)
const char *poolname = NULL;
uint64_t size = 0; // in bytes
int order = 0;
+ bool old_format = true;
const char *imgname = NULL, *snapname = NULL, *destname = NULL, *dest_poolname = NULL, *path = NULL, *secretfile = NULL, *user = NULL, *devpath = NULL;
std::string val;
@@ -955,6 +969,8 @@ int main(int argc, const char **argv)
} else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
usage();
exit(0);
+ } else if (ceph_argparse_flag(args, i, "--new-format", (char*)NULL)) {
+ old_format = false;
} else if (ceph_argparse_witharg(args, i, &val, "-p", "--pool", (char*)NULL)) {
poolname = strdup(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--dest-pool", (char*)NULL)) {
@@ -1186,7 +1202,7 @@ int main(int argc, const char **argv)
usage();
exit(1);
}
- r = do_create(rbd, io_ctx, imgname, size, &order);
+ r = do_create(rbd, io_ctx, imgname, size, &order, old_format, 0);
if (r < 0) {
cerr << "create error: " << cpp_strerror(-r) << std::endl;
exit(1);
@@ -1311,11 +1327,12 @@ int main(int argc, const char **argv)
break;
case OPT_IMPORT:
- if (!path) {
+ if (!path) {
cerr << "pathname should be specified" << std::endl;
exit(1);
}
- r = do_import(rbd, dest_io_ctx, destname, &order, path, size);
+ r = do_import(rbd, dest_io_ctx, destname, &order, path,
+ old_format, 0, size);
if (r < 0) {
cerr << "import failed: " << cpp_strerror(-r) << std::endl;
exit(1);
diff --git a/src/test/pybind/test_rbd.py b/src/test/pybind/test_rbd.py
index 94176dff612..6d250e0153f 100644
--- a/src/test/pybind/test_rbd.py
+++ b/src/test/pybind/test_rbd.py
@@ -1,5 +1,6 @@
import random
import struct
+import os
from nose import with_setup
from nose.tools import eq_ as eq, assert_raises
@@ -29,7 +30,12 @@ def tearDown():
rados.shutdown()
def create_image():
- RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER)
+ features = os.getenv("RBD_FEATURES")
+ if features is not None:
+ RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=False,
+ features=int(features))
+ else:
+ RBD().create(ioctx, IMG_NAME, IMG_SIZE, IMG_ORDER, old_format=True)
def remove_image():
RBD().remove(ioctx, IMG_NAME)
diff --git a/src/test/rbd/test_cls_rbd.cc b/src/test/rbd/test_cls_rbd.cc
new file mode 100644
index 00000000000..a99051cb739
--- /dev/null
+++ b/src/test/rbd/test_cls_rbd.cc
@@ -0,0 +1,337 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/snap_types.h"
+#include "include/encoding.h"
+#include "include/rados.h"
+#include "include/rados/librados.h"
+#include "include/types.h"
+#include "librbd/cls_rbd_client.h"
+
+#include "gtest/gtest.h"
+#include "test/rados-api/test.h"
+
+#include <errno.h>
+#include <string>
+#include <vector>
+
+using namespace std;
+using ::librbd::cls_client::create_image;
+using ::librbd::cls_client::get_features;
+using ::librbd::cls_client::get_size;
+using ::librbd::cls_client::get_object_prefix;
+using ::librbd::cls_client::set_size;
+using ::librbd::cls_client::snapshot_add;
+using ::librbd::cls_client::snapshot_remove;
+using ::librbd::cls_client::get_snapcontext;
+using ::librbd::cls_client::snapshot_list;
+
+TEST(cls_rbd, create)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ string oid = "testobj";
+ uint64_t size = 20 << 30;
+ uint64_t features = 0;
+ uint8_t order = 22;
+ string object_prefix = "foo";
+
+ ASSERT_EQ(0, create_image(&ioctx, oid, size, order,
+ features, object_prefix));
+ ASSERT_EQ(-EEXIST, create_image(&ioctx, oid, size, order,
+ features, object_prefix));
+ ASSERT_EQ(0, ioctx.remove(oid));
+
+ ASSERT_EQ(-EINVAL, create_image(&ioctx, oid, size, order,
+ features, ""));
+ ASSERT_EQ(-ENOENT, ioctx.remove(oid));
+
+ ASSERT_EQ(0, create_image(&ioctx, oid, 0, order,
+ features, object_prefix));
+ ASSERT_EQ(0, ioctx.remove(oid));
+
+ ASSERT_EQ(-ENOSYS, create_image(&ioctx, oid, size, order,
+ -1, object_prefix));
+ ASSERT_EQ(-ENOENT, ioctx.remove(oid));
+
+ bufferlist inbl, outbl;
+ ASSERT_EQ(-EINVAL, ioctx.exec(oid, "rbd", "create", inbl, outbl));
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_features)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ uint64_t features;
+ ASSERT_EQ(-ENOENT, get_features(&ioctx, "foo", CEPH_NOSNAP, &features));
+
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+ ASSERT_EQ(0, get_features(&ioctx, "foo", CEPH_NOSNAP, &features));
+ ASSERT_EQ(0u, features);
+
+ ASSERT_EQ(-ENOENT, get_features(&ioctx, "foo", 1, &features));
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_object_prefix)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ string object_prefix;
+ ASSERT_EQ(-ENOENT, get_object_prefix(&ioctx, "foo", &object_prefix));
+
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+ ASSERT_EQ(0, get_object_prefix(&ioctx, "foo", &object_prefix));
+ ASSERT_EQ("foo", object_prefix);
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_size)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ uint64_t size;
+ uint8_t order;
+ ASSERT_EQ(-ENOENT, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(0u, size);
+ ASSERT_EQ(22, order);
+ ASSERT_EQ(0, ioctx.remove("foo"));
+
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 2 << 22, 0, 0, "foo"));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(2u << 22, size);
+ ASSERT_EQ(0, order);
+
+ ASSERT_EQ(-ENOENT, get_size(&ioctx, "foo", 1, &size, &order));
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, set_size)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ ASSERT_EQ(-ENOENT, set_size(&ioctx, "foo", 5));
+
+ uint64_t size;
+ uint8_t order;
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(0u, size);
+ ASSERT_EQ(22, order);
+
+ ASSERT_EQ(0, set_size(&ioctx, "foo", 0));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(0u, size);
+ ASSERT_EQ(22, order);
+
+ ASSERT_EQ(0, set_size(&ioctx, "foo", 3 << 22));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(3u << 22, size);
+ ASSERT_EQ(22, order);
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, snapshots)
+{
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ string pool_name = get_temp_pool_name();
+
+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+ ASSERT_EQ(-ENOENT, snapshot_add(&ioctx, "foo", 0, "snap1"));
+
+ ASSERT_EQ(0, create_image(&ioctx, "foo", 10, 22, 0, "foo"));
+
+ vector<string> snap_names;
+ vector<uint64_t> snap_sizes;
+ vector<uint64_t> snap_features;
+ SnapContext snapc;
+
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(0u, snapc.snaps.size());
+ ASSERT_EQ(0u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(0u, snap_names.size());
+ ASSERT_EQ(0u, snap_sizes.size());
+ ASSERT_EQ(0u, snap_features.size());
+
+ ASSERT_EQ(0, snapshot_add(&ioctx, "foo", 0, "snap1"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(0u, snapc.snaps[0]);
+ ASSERT_EQ(0u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(1u, snap_names.size());
+ ASSERT_EQ("snap1", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+
+ // snap with same id and name
+ ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 0, "snap1"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(0u, snapc.snaps[0]);
+ ASSERT_EQ(0u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(1u, snap_names.size());
+ ASSERT_EQ("snap1", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+
+ // snap with same id, different name
+ ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 0, "snap2"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(0u, snapc.snaps[0]);
+ ASSERT_EQ(0u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(1u, snap_names.size());
+ ASSERT_EQ("snap1", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+
+ // snap with different id, same name
+ ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 1, "snap1"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(0u, snapc.snaps[0]);
+ ASSERT_EQ(0u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(snap_names.size(), 1u);
+ ASSERT_EQ(snap_names[0], "snap1");
+ ASSERT_EQ(snap_sizes[0], 10u);
+ ASSERT_EQ(snap_features[0], 0u);
+
+ // snap with different id, different name
+ ASSERT_EQ(0, snapshot_add(&ioctx, "foo", 1, "snap2"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(2u, snapc.snaps.size());
+ ASSERT_EQ(1u, snapc.snaps[0]);
+ ASSERT_EQ(0u, snapc.snaps[1]);
+ ASSERT_EQ(1u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(2u, snap_names.size());
+ ASSERT_EQ("snap2", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+ ASSERT_EQ("snap1", snap_names[1]);
+ ASSERT_EQ(10u, snap_sizes[1]);
+ ASSERT_EQ(0u, snap_features[1]);
+
+ ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", 0));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(1u, snapc.snaps[0]);
+ ASSERT_EQ(1u, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(1u, snap_names.size());
+ ASSERT_EQ("snap2", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+
+ uint64_t size;
+ uint8_t order;
+ ASSERT_EQ(0, set_size(&ioctx, "foo", 0));
+ ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+ ASSERT_EQ(0u, size);
+ ASSERT_EQ(22u, order);
+
+ uint64_t large_snap_id = 1ull << 63;
+ ASSERT_EQ(0, snapshot_add(&ioctx, "foo", large_snap_id, "snap3"));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(2u, snapc.snaps.size());
+ ASSERT_EQ(large_snap_id, snapc.snaps[0]);
+ ASSERT_EQ(1u, snapc.snaps[1]);
+ ASSERT_EQ(large_snap_id, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(2u, snap_names.size());
+ ASSERT_EQ("snap3", snap_names[0]);
+ ASSERT_EQ(0u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+ ASSERT_EQ("snap2", snap_names[1]);
+ ASSERT_EQ(10u, snap_sizes[1]);
+ ASSERT_EQ(0u, snap_features[1]);
+
+ ASSERT_EQ(0, get_size(&ioctx, "foo", large_snap_id, &size, &order));
+ ASSERT_EQ(0u, size);
+ ASSERT_EQ(22u, order);
+
+ ASSERT_EQ(0, get_size(&ioctx, "foo", 1, &size, &order));
+ ASSERT_EQ(10u, size);
+ ASSERT_EQ(22u, order);
+
+ ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", large_snap_id));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(1u, snapc.snaps.size());
+ ASSERT_EQ(1u, snapc.snaps[0]);
+ ASSERT_EQ(large_snap_id, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(1u, snap_names.size());
+ ASSERT_EQ("snap2", snap_names[0]);
+ ASSERT_EQ(10u, snap_sizes[0]);
+ ASSERT_EQ(0u, snap_features[0]);
+
+ ASSERT_EQ(-ENOENT, snapshot_remove(&ioctx, "foo", large_snap_id));
+ ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", 1));
+ ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+ ASSERT_EQ(0u, snapc.snaps.size());
+ ASSERT_EQ(large_snap_id, snapc.seq);
+ ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+ &snap_sizes, &snap_features));
+ ASSERT_EQ(0u, snap_names.size());
+ ASSERT_EQ(0u, snap_sizes.size());
+ ASSERT_EQ(0u, snap_features.size());
+
+ ioctx.close();
+ ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
diff --git a/src/test/test_librbd.cc b/src/test/test_librbd.cc
index 89c5797de5e..1b6ac07f66a 100644
--- a/src/test/test_librbd.cc
+++ b/src/test/test_librbd.cc
@@ -28,12 +28,65 @@
#include <unistd.h>
#include <iostream>
#include <algorithm>
+#include <sstream>
-#include "rados-api/test.cc"
+#include "rados-api/test.h"
#include "common/errno.h"
using namespace std;
+static int get_features(bool *old_format, uint64_t *features)
+{
+ const char *c = getenv("RBD_FEATURES");
+ if (c) {
+ stringstream ss;
+ ss << c;
+ ss >> *features;
+ if (ss.fail())
+ return -EINVAL;
+ *old_format = false;
+ cout << "using new format!" << std::endl;
+ } else {
+ *old_format = true;
+ cout << "using old format" << std::endl;
+ }
+
+ return 0;
+}
+
+static int create_image(rados_ioctx_t ioctx, const char *name,
+ uint64_t size, int *order)
+{
+ bool old_format;
+ uint64_t features;
+ int r = get_features(&old_format, &features);
+ if (r < 0)
+ return r;
+
+ if (old_format) {
+ return rbd_create(ioctx, name, size, order);
+ } else {
+ return rbd_create2(ioctx, name, size, features, order);
+ }
+}
+
+static int create_image_pp(librbd::RBD &rbd,
+ librados::IoCtx &ioctx,
+ const char *name,
+ uint64_t size, int *order) {
+ bool old_format;
+ uint64_t features;
+ int r = get_features(&old_format, &features);
+ if (r < 0)
+ return r;
+
+ if (old_format) {
+ return rbd.create(ioctx, name, size, order);
+ } else {
+ return rbd.create2(ioctx, name, size, features, order);
+ }
+}
+
TEST(LibRBD, CreateAndStat)
{
rados_t cluster;
@@ -48,7 +101,7 @@ TEST(LibRBD, CreateAndStat)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
ASSERT_EQ(0, rbd_stat(image, &info, sizeof(info)));
printf("image has size %llu and order %d\n", (unsigned long long) info.size, info.order);
@@ -77,7 +130,7 @@ TEST(LibRBD, CreateAndStatPP)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
ASSERT_EQ(0, image.stat(info, sizeof(info)));
ASSERT_EQ(info.size, size);
@@ -102,7 +155,7 @@ TEST(LibRBD, ResizeAndStat)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
ASSERT_EQ(0, rbd_resize(image, size * 4));
@@ -136,7 +189,7 @@ TEST(LibRBD, ResizeAndStatPP)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
ASSERT_EQ(0, image.resize(size * 4));
@@ -211,9 +264,9 @@ TEST(LibRBD, TestCreateLsDelete)
const char *name2 = "testimg2";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(1, test_ls(ioctx, 1, name));
- ASSERT_EQ(0, rbd_create(ioctx, name2, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name2, size, &order));
ASSERT_EQ(2, test_ls(ioctx, 2, name, name2));
ASSERT_EQ(0, rbd_remove(ioctx, name));
ASSERT_EQ(1, test_ls(ioctx, 1, name2));
@@ -271,7 +324,7 @@ TEST(LibRBD, TestCreateLsDeletePP)
const char *name2 = "testimg2";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name));
ASSERT_EQ(0, rbd.create(ioctx, name2, size, &order));
ASSERT_EQ(2, test_ls_pp(rbd, ioctx, 2, name, name2));
@@ -308,7 +361,7 @@ TEST(LibRBD, TestCopy)
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
ASSERT_EQ(1, test_ls(ioctx, 1, name));
ASSERT_EQ(0, rbd_copy(image, ioctx, name2));
@@ -352,7 +405,7 @@ TEST(LibRBD, TestCopyPP)
uint64_t size = 2 << 20;
PrintProgress pp;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name));
ASSERT_EQ(0, image.copy(ioctx, name2));
@@ -422,7 +475,7 @@ TEST(LibRBD, TestCreateLsDeleteSnap)
uint64_t size = 2 << 20;
uint64_t size2 = 4 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
ASSERT_EQ(0, rbd_snap_create(image, "snap1"));
@@ -500,7 +553,7 @@ TEST(LibRBD, TestCreateLsDeleteSnapPP)
uint64_t size = 2 << 20;
uint64_t size2 = 4 << 20;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
ASSERT_EQ(0, image.snap_create("snap1"));
@@ -628,7 +681,7 @@ TEST(LibRBD, TestIO)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
char test_data[TEST_IO_SIZE + 1];
@@ -778,7 +831,7 @@ TEST(LibRBD, TestIOPP)
const char *name = "testimg";
uint64_t size = 2 << 20;
- ASSERT_EQ(0, rbd.create(ioctx, name, size, &order));
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
char test_data[TEST_IO_SIZE + 1];
@@ -833,7 +886,7 @@ TEST(LibRBD, TestIOToSnapshot)
const char *name = "testimg";
uint64_t isize = 2 << 20;
- ASSERT_EQ(0, rbd_create(ioctx, name, isize, &order));
+ ASSERT_EQ(0, create_image(ioctx, name, isize, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
int i, r;
diff --git a/src/vstart.sh b/src/vstart.sh
index d444c48d81d..4fbbe4dc8b0 100755
--- a/src/vstart.sh
+++ b/src/vstart.sh
@@ -165,7 +165,8 @@ else
debug osd = 25
debug monc = 20
debug journal = 20
- debug filestore = 20'
+ debug filestore = 20
+ debug objclass = 20'
CMDSDEBUG='
lockdep = 1
debug ms = 1