summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <samuel.just@dreamhost.com>2011-09-02 16:03:16 -0700
committerSamuel Just <samuel.just@dreamhost.com>2011-09-02 16:03:16 -0700
commit524b0d022de9100b69d237d71f0cb262abe16021 (patch)
tree5856bfe08ed3558cfe0cfdf78811d19c8147f3fa
parentb1b18084b04314f8c0e108d7d8b33c56d0a73fc1 (diff)
parentc16f2603aa156eba3909e96213ea705c13160b06 (diff)
downloadceph-524b0d022de9100b69d237d71f0cb262abe16021.tar.gz
Merge branch 'hobject_change'
-rw-r--r--src/include/object.h16
-rw-r--r--src/os/CollectionIndex.h3
-rw-r--r--src/os/FileStore.h2
-rw-r--r--src/os/FlatIndex.h2
-rw-r--r--src/os/HashIndex.h7
-rw-r--r--src/os/IndexManager.cc13
-rw-r--r--src/os/LFNIndex.cc163
-rw-r--r--src/os/LFNIndex.h39
-rw-r--r--src/osd/OSD.cc3
-rw-r--r--src/osd/PG.cc1
-rw-r--r--src/osd/PG.h4
-rw-r--r--src/osd/ReplicatedPG.cc66
-rw-r--r--src/osd/ReplicatedPG.h3
-rw-r--r--src/osd/osd_types.cc7
-rw-r--r--src/test/store_test.cc4
15 files changed, 272 insertions, 61 deletions
diff --git a/src/include/object.h b/src/include/object.h
index bc56445fd1e..706caa6bffc 100644
--- a/src/include/object.h
+++ b/src/include/object.h
@@ -261,14 +261,15 @@ namespace __gnu_cxx {
struct hobject_t {
object_t oid;
+ string key;
snapid_t snap;
uint32_t hash;
hobject_t() : snap(0), hash(0) {}
- hobject_t(object_t oid, snapid_t snap, uint32_t hash) :
- oid(oid), snap(snap), hash(hash) {}
- hobject_t(const sobject_t &soid, uint32_t hash) :
- oid(soid.oid), snap(soid.snap), hash(hash) {}
+ hobject_t(object_t oid, const string &key, snapid_t snap, uint32_t hash) :
+ oid(oid), key(key), snap(snap), hash(hash) {}
+ hobject_t(const sobject_t &soid, const string &key, uint32_t hash) :
+ oid(soid.oid), key(key), snap(soid.snap), hash(hash) {}
/* Do not use when a particular hash function is needed */
explicit hobject_t(const sobject_t &o) :
@@ -279,9 +280,11 @@ struct hobject_t {
void swap(hobject_t &o) {
hobject_t temp(o);
o.oid = oid;
+ o.key = key;
o.snap = snap;
o.hash = hash;
oid = temp.oid;
+ key = temp.key;
snap = temp.snap;
hash = temp.hash;
}
@@ -291,8 +294,9 @@ struct hobject_t {
}
void encode(bufferlist& bl) const {
- __u8 version = 0;
+ __u8 version = 1;
::encode(version, bl);
+ ::encode(key, bl);
::encode(oid, bl);
::encode(snap, bl);
::encode(hash, bl);
@@ -300,6 +304,8 @@ struct hobject_t {
void decode(bufferlist::iterator& bl) {
__u8 version;
::decode(version, bl);
+ if (version >= 1)
+ ::decode(key, bl);
::decode(oid, bl);
::decode(snap, bl);
::decode(hash, bl);
diff --git a/src/os/CollectionIndex.h b/src/os/CollectionIndex.h
index f8846a78cd6..2fca26fb0a6 100644
--- a/src/os/CollectionIndex.h
+++ b/src/os/CollectionIndex.h
@@ -58,6 +58,9 @@ protected:
/// Type of returned paths
typedef std::tr1::shared_ptr<Path> IndexedPath;
+ static const uint32_t FLAT_INDEX_TAG = 0;
+ static const uint32_t HASH_INDEX_TAG = 1;
+ static const uint32_t HASH_INDEX_TAG_2 = 2;
/**
* For tracking Filestore collection versions.
*
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index 7f01c897407..d17d5e6b82f 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -39,7 +39,7 @@ using namespace __gnu_cxx;
// fake attributes in memory, if we need to.
class FileStore : public JournalingObjectStore {
- static const uint32_t on_disk_version = 1;
+ static const uint32_t on_disk_version = 2;
string basedir, journalpath;
std::string current_fn;
std::string current_op_seq_fn;
diff --git a/src/os/FlatIndex.h b/src/os/FlatIndex.h
index d94edf3f69e..53e27f5ec08 100644
--- a/src/os/FlatIndex.h
+++ b/src/os/FlatIndex.h
@@ -35,7 +35,7 @@ public:
FlatIndex(string base_path) : base_path(base_path) {}
/// @see CollectionIndex
- uint32_t collection_version() { return 0; }
+ uint32_t collection_version() { return FLAT_INDEX_TAG; }
/// @see CollectionIndex
void set_ref(std::tr1::shared_ptr<CollectionIndex> ref);
diff --git a/src/os/HashIndex.h b/src/os/HashIndex.h
index fb8fde599ab..bbcbc9904ad 100644
--- a/src/os/HashIndex.h
+++ b/src/os/HashIndex.h
@@ -128,12 +128,13 @@ public:
HashIndex(
const char *base_path, ///< [in] Path to the index root.
int merge_at, ///< [in] Merge threshhold.
- int split_at) ///< [in] Split threshhold.
- : LFNIndex(base_path), merge_threshold(merge_at),
+ int split_at, ///< [in] Split threshhold.
+ uint32_t index_version)///< [in] Index version
+ : LFNIndex(base_path, index_version), merge_threshold(merge_at),
split_threshold(split_at) {}
/// @see CollectionIndex
- uint32_t collection_version() { return 1; }
+ uint32_t collection_version() { return index_version; }
/// @see CollectionIndex
int cleanup();
diff --git a/src/os/IndexManager.cc b/src/os/IndexManager.cc
index 251599b1175..504cd8f5217 100644
--- a/src/os/IndexManager.cc
+++ b/src/os/IndexManager.cc
@@ -70,7 +70,8 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
if (r < 0)
return r;
HashIndex index(path, g_conf->filestore_merge_threshold,
- g_conf->filestore_split_multiple);
+ g_conf->filestore_split_multiple,
+ CollectionIndex::HASH_INDEX_TAG_2);
return index.init();
}
@@ -84,15 +85,16 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
return r;
switch (version) {
- case 0: {
+ case CollectionIndex::FLAT_INDEX_TAG: {
*index = Index(new FlatIndex(path),
RemoveOnDelete(c, this));
return 0;
}
- case 1: {
+ case CollectionIndex::HASH_INDEX_TAG: // fall through
+ case CollectionIndex::HASH_INDEX_TAG_2: {
// Must be a HashIndex
*index = Index(new HashIndex(path, g_conf->filestore_merge_threshold,
- g_conf->filestore_split_multiple),
+ g_conf->filestore_split_multiple, version),
RemoveOnDelete(c, this));
return 0;
}
@@ -102,7 +104,8 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
} else {
// No need to check
*index = Index(new HashIndex(path, g_conf->filestore_merge_threshold,
- g_conf->filestore_split_multiple),
+ g_conf->filestore_split_multiple,
+ CollectionIndex::HASH_INDEX_TAG_2),
RemoveOnDelete(c, this));
return 0;
}
diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc
index d6903032e3c..126a4b9edbd 100644
--- a/src/os/LFNIndex.cc
+++ b/src/os/LFNIndex.cc
@@ -256,6 +256,23 @@ int LFNIndex::get_mangled_name(const vector<string> &from,
return lfn_get_name(from, hoid, mangled_name, 0, exists);
}
+static int get_hobject_from_oinfo(const char *dir, const char *file,
+ hobject_t *o) {
+ char path[PATH_MAX];
+ bufferptr bp(PATH_MAX);
+ snprintf(path, sizeof(path), "%s/%s", dir, file);
+ // Hack, user.ceph._ is the attribute used to store the object info
+ int r = do_getxattr(path, "user.ceph._", bp.c_str(), bp.length());
+ if (r < 0)
+ return r;
+ bufferlist bl;
+ bl.push_back(bp);
+ object_info_t oi(bl);
+ *o = oi.soid;
+ return 0;
+}
+
+
int LFNIndex::list_objects(const vector<string> &to_list, int max_objs,
long *handle, map<string, hobject_t> *out) {
string to_list_path = get_full_path_subdir(to_list);
@@ -295,6 +312,9 @@ int LFNIndex::list_objects(const vector<string> &to_list, int max_objs,
if (!lfn_must_hash(long_name)) {
assert(long_name == short_name);
}
+ if (index_version == HASH_INDEX_TAG)
+ get_hobject_from_oinfo(to_list_path.c_str(), short_name.c_str(), &obj);
+
out->insert(pair<string, hobject_t>(short_name, obj));
++listed;
} else {
@@ -413,7 +433,7 @@ int LFNIndex::remove_attr_path(const vector<string> &path,
return do_removexattr(full_path.c_str(), mangled_attr_name.c_str());
}
-string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) {
+string LFNIndex::lfn_generate_object_name_keyless(const hobject_t &hoid) {
char s[FILENAME_MAX_LEN];
char *end = s + sizeof(s);
char *t = s;
@@ -449,7 +469,55 @@ string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) {
snprintf(t, end - t, "_%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash);
return string(s);
- }
+}
+
+static void append_escaped(string::const_iterator begin,
+ string::const_iterator end,
+ string *out) {
+ for (string::const_iterator i = begin; i != end; ++i) {
+ if (*i == '\\') {
+ out->append("\\\\");
+ } else if (*i == '/') {
+ out->append("\\s");
+ } else if (*i == '_') {
+ out->append("\\u");
+ } else {
+ out->append(i, i+1);
+ }
+ }
+}
+
+string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) {
+ if (index_version == HASH_INDEX_TAG)
+ return lfn_generate_object_name_keyless(hoid);
+
+ string full_name;
+ string::const_iterator i = hoid.oid.name.begin();
+ if (hoid.oid.name.substr(0, 4) == "DIR_") {
+ full_name.append("\\d");
+ i += 4;
+ } else if (hoid.oid.name[0] == '.') {
+ full_name.append("\\.");
+ ++i;
+ }
+ append_escaped(i, hoid.oid.name.end(), &full_name);
+ full_name.append("_");
+ append_escaped(hoid.key.begin(), hoid.key.end(), &full_name);
+ full_name.append("_");
+
+ char snap_with_hash[PATH_MAX];
+ char *t = snap_with_hash;
+ char *end = t + sizeof(snap_with_hash);
+ if (hoid.snap == CEPH_NOSNAP)
+ t += snprintf(t, end - t, "head");
+ else if (hoid.snap == CEPH_SNAPDIR)
+ t += snprintf(t, end - t, "snapdir");
+ else
+ t += snprintf(t, end - t, "%llx", (long long unsigned)hoid.snap);
+ snprintf(t, end - t, "_%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash);
+ full_name += string(snap_with_hash);
+ return full_name;
+}
int LFNIndex::lfn_get_name(const vector<string> &path,
const hobject_t &hoid,
@@ -487,7 +555,7 @@ int LFNIndex::lfn_get_name(const vector<string> &path,
for ( ; ; ++i) {
candidate = lfn_get_short_name(hoid, i);
candidate_path = get_full_path(path, candidate);
- r = do_getxattr(candidate_path.c_str(), LFN_ATTR.c_str(), buf, sizeof(buf));
+ r = do_getxattr(candidate_path.c_str(), get_lfn_attr().c_str(), buf, sizeof(buf));
if (r < 0) {
if (errno != ENODATA && errno != ENOENT)
return -errno;
@@ -528,7 +596,7 @@ int LFNIndex::lfn_created(const vector<string> &path,
return 0;
string full_path = get_full_path(path, mangled_name);
string full_name = lfn_generate_object_name(hoid);
- return do_setxattr(full_path.c_str(), LFN_ATTR.c_str(),
+ return do_setxattr(full_path.c_str(), get_lfn_attr().c_str(),
full_name.c_str(), full_name.size());
}
@@ -585,15 +653,15 @@ int LFNIndex::lfn_unlink(const vector<string> &path,
}
int LFNIndex::lfn_translate(const vector<string> &path,
- const string &short_name,
- hobject_t *out) {
+ const string &short_name,
+ hobject_t *out) {
if (!lfn_is_hashed_filename(short_name)) {
return lfn_parse_object_name(short_name, out);
}
// Get lfn_attr
string full_path = get_full_path(path, short_name);
char attr[PATH_MAX];
- int r = do_getxattr(full_path.c_str(), LFN_ATTR.c_str(), attr, sizeof(attr) - 1);
+ int r = do_getxattr(full_path.c_str(), get_lfn_attr().c_str(), attr, sizeof(attr) - 1);
if (r < 0)
return -errno;
if (r < (int)sizeof(attr))
@@ -666,13 +734,90 @@ static int parse_object(const char *s, hobject_t& o)
return 0;
}
-bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
+bool LFNIndex::lfn_parse_object_name_keyless(const string &long_name, hobject_t *out) {
bool r = parse_object(long_name.c_str(), *out);
if (!r) return r;
string temp = lfn_generate_object_name(*out);
return r;
}
+static bool append_unescaped(string::const_iterator begin,
+ string::const_iterator end,
+ string *out) {
+ for (string::const_iterator i = begin; i != end; ++i) {
+ if (*i == '\\') {
+ ++i;
+ if (*i == '\\')
+ out->append("\\");
+ else if (*i == 's')
+ out->append("/");
+ else if (*i == 'u')
+ out->append("_");
+ else
+ return false;
+ } else {
+ out->append(i, i+1);
+ }
+ }
+ return true;
+}
+
+bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
+ if (index_version == HASH_INDEX_TAG)
+ return lfn_parse_object_name_keyless(long_name, out);
+
+ string::const_iterator current = long_name.begin();
+ if (*current == '\\') {
+ ++current;
+ if (current == long_name.end()) {
+ return false;
+ } else if (*current == 'd') {
+ out->oid.name.append("DIR_");
+ ++current;
+ } else if (*current == '.') {
+ out->oid.name.append(".");
+ ++current;
+ } else {
+ --current;
+ }
+ }
+
+ string::const_iterator end = current;
+ for (; end != long_name.end() && *end != '_'; ++end);
+ if (end == long_name.end())
+ return false;
+ if (!append_unescaped(current, end, &(out->oid.name)))
+ return false;
+
+ current = ++end;
+ for (; end != long_name.end() && *end != '_'; ++end);
+ if (end == long_name.end())
+ return false;
+ if (!append_unescaped(current, end, &(out->oid.name)))
+ return false;
+
+ current = ++end;
+ for (; end != long_name.end() && *end != '_'; ++end);
+ if (end == long_name.end())
+ return false;
+ string snap(current, end);
+
+ current = ++end;
+ for (; end != long_name.end() && *end != '_'; ++end);
+ if (end != long_name.end())
+ return false;
+ string hash(current, end);
+
+ if (snap == "head")
+ out->snap = CEPH_NOSNAP;
+ else if (snap == "snapdir")
+ out->snap = CEPH_SNAPDIR;
+ else
+ out->snap = strtoull(snap.c_str(), NULL, 16);
+ sscanf(hash.c_str(), "%X", &out->hash);
+ return true;
+}
+
bool LFNIndex::lfn_is_hashed_filename(const string &name) {
if (name.size() < (unsigned)FILENAME_SHORT_LEN) {
return 0;
@@ -776,7 +921,7 @@ string LFNIndex::demangle_path_component(const string &component) {
}
int LFNIndex::decompose_full_path(const char *in, vector<string> *out,
- hobject_t *hoid, string *shortname) {
+ hobject_t *hoid, string *shortname) {
const char *beginning = in + get_base_path().size();
const char *end = beginning;
while (1) {
diff --git a/src/os/LFNIndex.h b/src/os/LFNIndex.h
index dedd8465a28..e0321b45dbf 100644
--- a/src/os/LFNIndex.h
+++ b/src/os/LFNIndex.h
@@ -76,10 +76,26 @@ class LFNIndex : public CollectionIndex {
/// For reference counting the collection @see Path
std::tr1::weak_ptr<CollectionIndex> self_ref;
+protected:
+ const uint32_t index_version;
+
+private:
+ string lfn_attribute;
+
public:
/// Constructor
- LFNIndex(const char *base_path) ///< [in] path to Index root
- : base_path(base_path) {}
+ LFNIndex(
+ const char *base_path, ///< [in] path to Index root
+ uint32_t index_version)
+ : base_path(base_path), index_version(index_version) {
+ if (index_version == HASH_INDEX_TAG) {
+ lfn_attribute = LFN_ATTR;
+ } else {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "%d", index_version);
+ lfn_attribute = LFN_ATTR + string(buf);
+ }
+ }
/// Virtual destructor
virtual ~LFNIndex() {}
@@ -306,6 +322,14 @@ protected:
private:
/* lfn translation functions */
+
+ /**
+ * Gets the version specific lfn attribute tag
+ */
+ const string &get_lfn_attr() const {
+ return lfn_attribute;
+ }
+
/**
* Gets the filename corresponsing to hoid in path.
*
@@ -359,11 +383,22 @@ private:
); ///< @return True if short_name is a subdir, false otherwise
/// Generate object name
+ string lfn_generate_object_name_keyless(
+ const hobject_t &hoid ///< [in] Object for which to generate.
+ ); ///< @return Generated object name.
+
+ /// Generate object name
string lfn_generate_object_name(
const hobject_t &hoid ///< [in] Object for which to generate.
); ///< @return Generated object name.
/// Parse object name
+ bool lfn_parse_object_name_keyless(
+ const string &long_name, ///< [in] Name to parse
+ hobject_t *out ///< [out] Resulting Object
+ ); ///< @return True if successfull, False otherwise.
+
+ /// Parse object name
bool lfn_parse_object_name(
const string &long_name, ///< [in] Name to parse
hobject_t *out ///< [out] Resulting Object
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index de70ae21eb9..85d922cab11 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -5067,7 +5067,8 @@ void OSD::handle_op(MOSDOp *op)
if ((op->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) {
// missing object?
- hobject_t head(op->get_oid(), CEPH_NOSNAP, op->get_pg().ps());
+ hobject_t head(op->get_oid(), op->get_object_locator().key,
+ CEPH_NOSNAP, op->get_pg().ps());
if (pg->is_missing_object(head)) {
pg->wait_for_missing_object(head, op);
pg->unlock();
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index c0f383b6e0f..d77639c2a3b 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -2250,6 +2250,7 @@ void PG::read_log(ObjectStore *store)
++i) {
if (i->oid == e.soid.oid && i->snap == e.soid.snap) {
e.soid.hash = i->hash;
+ e.soid.key = i->key;
found = true;
break;
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 61e8778fd14..d5f810d5130 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -393,7 +393,7 @@ public:
}
void encode(bufferlist &bl) const {
- __u8 struct_v = 2;
+ __u8 struct_v = 3;
::encode(struct_v, bl);
::encode(op, bl);
::encode(soid, bl);
@@ -417,6 +417,8 @@ public:
} else {
::decode(soid, bl);
}
+ if (struct_v < 3)
+ invalid_hash = true;
::decode(version, bl);
::decode(prior_version, bl);
::decode(reqid, bl);
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index b9a50f95936..73b42acf133 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -404,7 +404,9 @@ void ReplicatedPG::do_op(MOSDOp *op)
ObjectContext *obc;
bool can_create = op->may_write();
snapid_t snapid;
- int r = find_object_context(hobject_t(op->get_oid(), op->get_snapid(), op->get_pg().ps()),
+ int r = find_object_context(hobject_t(op->get_oid(),
+ op->get_object_locator().key,
+ op->get_snapid(), op->get_pg().ps()),
op->get_object_locator(),
&obc, can_create, &snapid);
@@ -416,7 +418,8 @@ void ReplicatedPG::do_op(MOSDOp *op)
if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
// missing the specific snap we need; requeue and wait.
assert(!can_create); // only happens on a read
- hobject_t soid(op->get_oid(), snapid, op->get_pg().ps());
+ hobject_t soid(op->get_oid(), op->get_object_locator().key,
+ snapid, op->get_pg().ps());
wait_for_missing_object(soid, op);
return;
}
@@ -476,17 +479,19 @@ void ReplicatedPG::do_op(MOSDOp *op)
map<hobject_t,ObjectContext*> src_obc;
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
OSDOp& osd_op = *p;
- hobject_t toid(osd_op.soid, op->get_pg().ps());
+ hobject_t toid(osd_op.soid, op->get_object_locator().key, op->get_pg().ps());
if (osd_op.soid.oid.name.length()) {
if (!src_obc.count(toid)) {
ObjectContext *sobc;
snapid_t ssnapid;
- int r = find_object_context(hobject_t(toid.oid, toid.snap, op->get_pg().ps()),
+ int r = find_object_context(hobject_t(toid.oid, op->get_object_locator().key,
+ toid.snap, op->get_pg().ps()),
op->get_object_locator(),
&sobc, false, &ssnapid);
if (r == -EAGAIN) {
// missing the specific snap we need; requeue and wait.
- hobject_t wait_oid(osd_op.soid.oid, ssnapid, op->get_pg().ps());
+ hobject_t wait_oid(osd_op.soid.oid, op->get_object_locator().key,
+ ssnapid, op->get_pg().ps());
wait_for_missing_object(wait_oid, op);
} else if (r) {
osd->reply_op_error(op, r);
@@ -816,7 +821,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
// load clone info
bufferlist bl;
ObjectContext *obc = 0;
- int r = find_object_context(hobject_t(coid.oid, sn, coid.hash),
+ int r = find_object_context(hobject_t(coid.oid, coid.key, sn, coid.hash),
OLOC_BLANK, &obc, false, NULL);
if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) {
if (obc) put_object_context(obc);
@@ -829,7 +834,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
// get snap set context
if (!obc->ssc)
- obc->ssc = get_snapset_context(coid.oid, coid.hash, false);
+ obc->ssc = get_snapset_context(coid.oid, coid.key, coid.hash, false);
SnapSetContext *ssc = obc->ssc;
assert(ssc);
SnapSet& snapset = ssc->snapset;
@@ -926,7 +931,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
// save head snapset
dout(10) << coid << " new snapset " << snapset << dendl;
- hobject_t snapoid(coid.oid, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
+ hobject_t snapoid(coid.oid, coid.key, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false);
assert(ctx->snapset_obc->registered);
if (snapset.clones.empty() && !snapset.head_exists) {
@@ -1157,7 +1162,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
ObjectContext *src_obc = 0;
if (ceph_osd_op_type_multi(op.op)) {
- src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.hash)];
+ src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.key, soid.hash)];
assert(src_obc);
}
@@ -1678,8 +1683,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
result = -EINVAL;
break;
}
- t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.hash), obs.oi.soid,
- op.clonerange.src_offset, op.clonerange.length, op.clonerange.offset);
+ t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.key,
+ obs.oi.soid.hash),
+ obs.oi.soid, op.clonerange.src_offset,
+ op.clonerange.length, op.clonerange.offset);
+
write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges,
op.clonerange.offset, op.clonerange.length, false);
@@ -2096,7 +2104,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl;
ObjectContext *rollback_to;
- int ret = find_object_context(hobject_t(soid.oid, snapid, soid.hash),
+ int ret = find_object_context(hobject_t(soid.oid, oi.oloc.key, snapid, soid.hash),
oi.oloc, &rollback_to, false, &cloneid);
if (ret) {
if (-ENOENT == ret) {
@@ -2109,7 +2117,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
/* a different problem, like degraded pool
* with not-yet-restored object. We shouldn't have been able
* to get here; recovery should have completed first! */
- hobject_t rollback_target(soid.oid, cloneid, soid.hash);
+ hobject_t rollback_target(soid.oid, soid.key, cloneid, soid.hash);
assert(is_missing_object(rollback_target));
dout(20) << "_rollback_to attempted to roll back to a missing object "
<< rollback_target << " (requested snapid: ) " << snapid << dendl;
@@ -2546,7 +2554,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
ctx->op_t.setattr(coll, soid, SS_ATTR, bss);
if (!head_existed) {
// if we logically recreated the head, remove old _snapdir object
- hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash);
+ hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false);
if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
@@ -2563,7 +2571,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
}
} else if (ctx->new_snapset.clones.size()) {
// save snapset on _snap
- hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash);
+ hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
dout(10) << " final snapset " << ctx->new_snapset
<< " in " << snapoid << dendl;
ctx->at_version.version++;
@@ -3017,13 +3025,13 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s
SnapSetContext *ssc = NULL;
if (can_create)
- ssc = get_snapset_context(soid.oid, soid.hash, true);
+ ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
obc = new ObjectContext(oi, true, ssc);
}
register_object_context(obc);
if (can_create && !obc->ssc)
- obc->ssc = get_snapset_context(soid.oid, soid.hash, true);
+ obc->ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
if (r >= 0) {
obc->obs.oi.decode(bv);
@@ -3058,7 +3066,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
snapid_t *psnapid)
{
// want the head?
- hobject_t head(oid.oid, CEPH_NOSNAP, oid.hash);
+ hobject_t head(oid.oid, oid.key, CEPH_NOSNAP, oid.hash);
if (oid.snap == CEPH_NOSNAP) {
ObjectContext *obc = get_object_context(head, oloc, can_create);
if (!obc)
@@ -3067,13 +3075,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
*pobc = obc;
if (can_create && !obc->ssc)
- obc->ssc = get_snapset_context(oid.oid, oid.hash, true);
+ obc->ssc = get_snapset_context(oid.oid, oid.key, oid.hash, true);
return 0;
}
// we want a snap
- SnapSetContext *ssc = get_snapset_context(oid.oid, oid.hash, can_create);
+ SnapSetContext *ssc = get_snapset_context(oid.oid, oid.key, oid.hash, can_create);
if (!ssc)
return -ENOENT;
@@ -3115,7 +3123,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
put_snapset_context(ssc);
return -ENOENT;
}
- hobject_t soid(oid.oid, ssc->snapset.clones[k], oid.hash);
+ hobject_t soid(oid.oid, oid.key, ssc->snapset.clones[k], oid.hash);
put_snapset_context(ssc); // we're done with ssc
ssc = 0;
@@ -3179,7 +3187,9 @@ void ReplicatedPG::put_object_contexts(map<hobject_t,ObjectContext*>& obcv)
obcv.clear();
}
-ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, ps_t seed,
+ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
+ const string& key,
+ ps_t seed,
bool can_create)
{
SnapSetContext *ssc;
@@ -3188,11 +3198,11 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t&
ssc = p->second;
} else {
bufferlist bv;
- hobject_t head(oid, CEPH_NOSNAP, seed);
+ hobject_t head(oid, key, CEPH_NOSNAP, seed);
int r = osd->store->getattr(coll, head, SS_ATTR, bv);
if (r < 0) {
// try _snapset
- hobject_t snapdir(oid, CEPH_SNAPDIR, seed);
+ hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed);
r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
if (r < 0 && !can_create)
return NULL;
@@ -3602,7 +3612,7 @@ int ReplicatedPG::pull(const hobject_t& soid)
}
// check snapset
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+ SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
dout(10) << " snapset " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, missing,
data_subset, clone_subsets);
@@ -3710,7 +3720,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
return push_start(snapdir, peer);
}
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+ SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
data_subset, clone_subsets);
@@ -3718,7 +3728,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
} else if (soid.snap == CEPH_NOSNAP) {
// pushing head or unversioned object.
// base this on partially on replica's clones?
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+ SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets);
put_snapset_context(ssc);
@@ -4062,7 +4072,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
if (soid.snap && soid.snap < CEPH_NOSNAP) {
// clone. make sure we have enough data.
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+ SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
assert(ssc);
clone_subsets.clear(); // forget what pusher said; recalculate cloning.
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index faadd3f03bf..f9295e4e13c 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -510,7 +510,8 @@ protected:
ObjectContext **pobc,
bool can_create, snapid_t *psnapid=NULL);
- SnapSetContext *get_snapset_context(const object_t& oid, ps_t seed, bool can_create);
+ SnapSetContext *get_snapset_context(const object_t& oid, const string &key,
+ ps_t seed, bool can_create);
void register_snapset_context(SnapSetContext *ssc) {
if (!ssc->registered) {
ssc->registered = true;
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 75a93ca73fd..2bdf099d0cf 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -574,7 +574,7 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid,
void object_info_t::encode(bufferlist& bl) const
{
- const __u8 v = 6;
+ const __u8 v = 7;
::encode(v, bl);
::encode(soid, bl);
::encode(oloc, bl);
@@ -603,12 +603,15 @@ void object_info_t::decode(bufferlist::iterator& bl)
sobject_t obj;
::decode(obj, bl);
::decode(oloc, bl);
- soid = hobject_t(obj.oid, obj.snap, 0);
+ soid = hobject_t(obj.oid, oloc.key, obj.snap, 0);
soid.hash = legacy_object_locator_to_ps(soid.oid, oloc);
} else if (v >= 6) {
::decode(soid, bl);
::decode(oloc, bl);
+ if (v == 6)
+ soid.key = oloc.key;
}
+
if (v >= 5)
::decode(category, bl);
::decode(version, bl);
diff --git a/src/test/store_test.cc b/src/test/store_test.cc
index a3c9f529e56..10fb6564dc2 100644
--- a/src/test/store_test.cc
+++ b/src/test/store_test.cc
@@ -253,7 +253,7 @@ public:
// hash
//boost::binomial_distribution<uint32_t> bin(0xFFFFFF, 0.5);
++seq;
- return hobject_t(name, CEPH_NOSNAP, rand());
+ return hobject_t(name, string(), CEPH_NOSNAP, rand());
}
};
@@ -469,7 +469,7 @@ TEST_F(StoreTest, HashCollisionTest) {
if (!(i % 5)) {
cerr << "Object " << i << std::endl;
}
- hobject_t hoid(string(buf) + base, CEPH_NOSNAP, 0);
+ hobject_t hoid(string(buf) + base, string(), CEPH_NOSNAP, 0);
{
ObjectStore::Transaction t;
t.touch(cid, hoid);