diff options
author | Samuel Just <samuel.just@dreamhost.com> | 2011-09-02 16:03:16 -0700 |
---|---|---|
committer | Samuel Just <samuel.just@dreamhost.com> | 2011-09-02 16:03:16 -0700 |
commit | 524b0d022de9100b69d237d71f0cb262abe16021 (patch) | |
tree | 5856bfe08ed3558cfe0cfdf78811d19c8147f3fa | |
parent | b1b18084b04314f8c0e108d7d8b33c56d0a73fc1 (diff) | |
parent | c16f2603aa156eba3909e96213ea705c13160b06 (diff) | |
download | ceph-524b0d022de9100b69d237d71f0cb262abe16021.tar.gz |
Merge branch 'hobject_change'
-rw-r--r-- | src/include/object.h | 16 | ||||
-rw-r--r-- | src/os/CollectionIndex.h | 3 | ||||
-rw-r--r-- | src/os/FileStore.h | 2 | ||||
-rw-r--r-- | src/os/FlatIndex.h | 2 | ||||
-rw-r--r-- | src/os/HashIndex.h | 7 | ||||
-rw-r--r-- | src/os/IndexManager.cc | 13 | ||||
-rw-r--r-- | src/os/LFNIndex.cc | 163 | ||||
-rw-r--r-- | src/os/LFNIndex.h | 39 | ||||
-rw-r--r-- | src/osd/OSD.cc | 3 | ||||
-rw-r--r-- | src/osd/PG.cc | 1 | ||||
-rw-r--r-- | src/osd/PG.h | 4 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 66 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 3 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 7 | ||||
-rw-r--r-- | src/test/store_test.cc | 4 |
15 files changed, 272 insertions, 61 deletions
diff --git a/src/include/object.h b/src/include/object.h index bc56445fd1e..706caa6bffc 100644 --- a/src/include/object.h +++ b/src/include/object.h @@ -261,14 +261,15 @@ namespace __gnu_cxx { struct hobject_t { object_t oid; + string key; snapid_t snap; uint32_t hash; hobject_t() : snap(0), hash(0) {} - hobject_t(object_t oid, snapid_t snap, uint32_t hash) : - oid(oid), snap(snap), hash(hash) {} - hobject_t(const sobject_t &soid, uint32_t hash) : - oid(soid.oid), snap(soid.snap), hash(hash) {} + hobject_t(object_t oid, const string &key, snapid_t snap, uint32_t hash) : + oid(oid), key(key), snap(snap), hash(hash) {} + hobject_t(const sobject_t &soid, const string &key, uint32_t hash) : + oid(soid.oid), key(key), snap(soid.snap), hash(hash) {} /* Do not use when a particular hash function is needed */ explicit hobject_t(const sobject_t &o) : @@ -279,9 +280,11 @@ struct hobject_t { void swap(hobject_t &o) { hobject_t temp(o); o.oid = oid; + o.key = key; o.snap = snap; o.hash = hash; oid = temp.oid; + key = temp.key; snap = temp.snap; hash = temp.hash; } @@ -291,8 +294,9 @@ struct hobject_t { } void encode(bufferlist& bl) const { - __u8 version = 0; + __u8 version = 1; ::encode(version, bl); + ::encode(key, bl); ::encode(oid, bl); ::encode(snap, bl); ::encode(hash, bl); @@ -300,6 +304,8 @@ struct hobject_t { void decode(bufferlist::iterator& bl) { __u8 version; ::decode(version, bl); + if (version >= 1) + ::decode(key, bl); ::decode(oid, bl); ::decode(snap, bl); ::decode(hash, bl); diff --git a/src/os/CollectionIndex.h b/src/os/CollectionIndex.h index f8846a78cd6..2fca26fb0a6 100644 --- a/src/os/CollectionIndex.h +++ b/src/os/CollectionIndex.h @@ -58,6 +58,9 @@ protected: /// Type of returned paths typedef std::tr1::shared_ptr<Path> IndexedPath; + static const uint32_t FLAT_INDEX_TAG = 0; + static const uint32_t HASH_INDEX_TAG = 1; + static const uint32_t HASH_INDEX_TAG_2 = 2; /** * For tracking Filestore collection versions. * diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 7f01c897407..d17d5e6b82f 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -39,7 +39,7 @@ using namespace __gnu_cxx; // fake attributes in memory, if we need to. class FileStore : public JournalingObjectStore { - static const uint32_t on_disk_version = 1; + static const uint32_t on_disk_version = 2; string basedir, journalpath; std::string current_fn; std::string current_op_seq_fn; diff --git a/src/os/FlatIndex.h b/src/os/FlatIndex.h index d94edf3f69e..53e27f5ec08 100644 --- a/src/os/FlatIndex.h +++ b/src/os/FlatIndex.h @@ -35,7 +35,7 @@ public: FlatIndex(string base_path) : base_path(base_path) {} /// @see CollectionIndex - uint32_t collection_version() { return 0; } + uint32_t collection_version() { return FLAT_INDEX_TAG; } /// @see CollectionIndex void set_ref(std::tr1::shared_ptr<CollectionIndex> ref); diff --git a/src/os/HashIndex.h b/src/os/HashIndex.h index fb8fde599ab..bbcbc9904ad 100644 --- a/src/os/HashIndex.h +++ b/src/os/HashIndex.h @@ -128,12 +128,13 @@ public: HashIndex( const char *base_path, ///< [in] Path to the index root. int merge_at, ///< [in] Merge threshhold. - int split_at) ///< [in] Split threshhold. - : LFNIndex(base_path), merge_threshold(merge_at), + int split_at, ///< [in] Split threshhold. + uint32_t index_version)///< [in] Index version + : LFNIndex(base_path, index_version), merge_threshold(merge_at), split_threshold(split_at) {} /// @see CollectionIndex - uint32_t collection_version() { return 1; } + uint32_t collection_version() { return index_version; } /// @see CollectionIndex int cleanup(); diff --git a/src/os/IndexManager.cc b/src/os/IndexManager.cc index 251599b1175..504cd8f5217 100644 --- a/src/os/IndexManager.cc +++ b/src/os/IndexManager.cc @@ -70,7 +70,8 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) { if (r < 0) return r; HashIndex index(path, g_conf->filestore_merge_threshold, - g_conf->filestore_split_multiple); + g_conf->filestore_split_multiple, + CollectionIndex::HASH_INDEX_TAG_2); return index.init(); } @@ -84,15 +85,16 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) { return r; switch (version) { - case 0: { + case CollectionIndex::FLAT_INDEX_TAG: { *index = Index(new FlatIndex(path), RemoveOnDelete(c, this)); return 0; } - case 1: { + case CollectionIndex::HASH_INDEX_TAG: // fall through + case CollectionIndex::HASH_INDEX_TAG_2: { // Must be a HashIndex *index = Index(new HashIndex(path, g_conf->filestore_merge_threshold, - g_conf->filestore_split_multiple), + g_conf->filestore_split_multiple, version), RemoveOnDelete(c, this)); return 0; } @@ -102,7 +104,8 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) { } else { // No need to check *index = Index(new HashIndex(path, g_conf->filestore_merge_threshold, - g_conf->filestore_split_multiple), + g_conf->filestore_split_multiple, + CollectionIndex::HASH_INDEX_TAG_2), RemoveOnDelete(c, this)); return 0; } diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc index d6903032e3c..126a4b9edbd 100644 --- a/src/os/LFNIndex.cc +++ b/src/os/LFNIndex.cc @@ -256,6 +256,23 @@ int LFNIndex::get_mangled_name(const vector<string> &from, return lfn_get_name(from, hoid, mangled_name, 0, exists); } +static int get_hobject_from_oinfo(const char *dir, const char *file, + hobject_t *o) { + char path[PATH_MAX]; + bufferptr bp(PATH_MAX); + snprintf(path, sizeof(path), "%s/%s", dir, file); + // Hack, user.ceph._ is the attribute used to store the object info + int r = do_getxattr(path, "user.ceph._", bp.c_str(), bp.length()); + if (r < 0) + return r; + bufferlist bl; + bl.push_back(bp); + object_info_t oi(bl); + *o = oi.soid; + return 0; +} + + int LFNIndex::list_objects(const vector<string> &to_list, int max_objs, long *handle, map<string, hobject_t> *out) { string to_list_path = get_full_path_subdir(to_list); @@ -295,6 +312,9 @@ int LFNIndex::list_objects(const vector<string> &to_list, int max_objs, if (!lfn_must_hash(long_name)) { assert(long_name == short_name); } + if (index_version == HASH_INDEX_TAG) + get_hobject_from_oinfo(to_list_path.c_str(), short_name.c_str(), &obj); + out->insert(pair<string, hobject_t>(short_name, obj)); ++listed; } else { @@ -413,7 +433,7 @@ int LFNIndex::remove_attr_path(const vector<string> &path, return do_removexattr(full_path.c_str(), mangled_attr_name.c_str()); } -string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) { +string LFNIndex::lfn_generate_object_name_keyless(const hobject_t &hoid) { char s[FILENAME_MAX_LEN]; char *end = s + sizeof(s); char *t = s; @@ -449,7 +469,55 @@ string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) { snprintf(t, end - t, "_%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash); return string(s); - } +} + +static void append_escaped(string::const_iterator begin, + string::const_iterator end, + string *out) { + for (string::const_iterator i = begin; i != end; ++i) { + if (*i == '\\') { + out->append("\\\\"); + } else if (*i == '/') { + out->append("\\s"); + } else if (*i == '_') { + out->append("\\u"); + } else { + out->append(i, i+1); + } + } +} + +string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) { + if (index_version == HASH_INDEX_TAG) + return lfn_generate_object_name_keyless(hoid); + + string full_name; + string::const_iterator i = hoid.oid.name.begin(); + if (hoid.oid.name.substr(0, 4) == "DIR_") { + full_name.append("\\d"); + i += 4; + } else if (hoid.oid.name[0] == '.') { + full_name.append("\\."); + ++i; + } + append_escaped(i, hoid.oid.name.end(), &full_name); + full_name.append("_"); + append_escaped(hoid.key.begin(), hoid.key.end(), &full_name); + full_name.append("_"); + + char snap_with_hash[PATH_MAX]; + char *t = snap_with_hash; + char *end = t + sizeof(snap_with_hash); + if (hoid.snap == CEPH_NOSNAP) + t += snprintf(t, end - t, "head"); + else if (hoid.snap == CEPH_SNAPDIR) + t += snprintf(t, end - t, "snapdir"); + else + t += snprintf(t, end - t, "%llx", (long long unsigned)hoid.snap); + snprintf(t, end - t, "_%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash); + full_name += string(snap_with_hash); + return full_name; +} int LFNIndex::lfn_get_name(const vector<string> &path, const hobject_t &hoid, @@ -487,7 +555,7 @@ int LFNIndex::lfn_get_name(const vector<string> &path, for ( ; ; ++i) { candidate = lfn_get_short_name(hoid, i); candidate_path = get_full_path(path, candidate); - r = do_getxattr(candidate_path.c_str(), LFN_ATTR.c_str(), buf, sizeof(buf)); + r = do_getxattr(candidate_path.c_str(), get_lfn_attr().c_str(), buf, sizeof(buf)); if (r < 0) { if (errno != ENODATA && errno != ENOENT) return -errno; @@ -528,7 +596,7 @@ int LFNIndex::lfn_created(const vector<string> &path, return 0; string full_path = get_full_path(path, mangled_name); string full_name = lfn_generate_object_name(hoid); - return do_setxattr(full_path.c_str(), LFN_ATTR.c_str(), + return do_setxattr(full_path.c_str(), get_lfn_attr().c_str(), full_name.c_str(), full_name.size()); } @@ -585,15 +653,15 @@ int LFNIndex::lfn_unlink(const vector<string> &path, } int LFNIndex::lfn_translate(const vector<string> &path, - const string &short_name, - hobject_t *out) { + const string &short_name, + hobject_t *out) { if (!lfn_is_hashed_filename(short_name)) { return lfn_parse_object_name(short_name, out); } // Get lfn_attr string full_path = get_full_path(path, short_name); char attr[PATH_MAX]; - int r = do_getxattr(full_path.c_str(), LFN_ATTR.c_str(), attr, sizeof(attr) - 1); + int r = do_getxattr(full_path.c_str(), get_lfn_attr().c_str(), attr, sizeof(attr) - 1); if (r < 0) return -errno; if (r < (int)sizeof(attr)) @@ -666,13 +734,90 @@ static int parse_object(const char *s, hobject_t& o) return 0; } -bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) { +bool LFNIndex::lfn_parse_object_name_keyless(const string &long_name, hobject_t *out) { bool r = parse_object(long_name.c_str(), *out); if (!r) return r; string temp = lfn_generate_object_name(*out); return r; } +static bool append_unescaped(string::const_iterator begin, + string::const_iterator end, + string *out) { + for (string::const_iterator i = begin; i != end; ++i) { + if (*i == '\\') { + ++i; + if (*i == '\\') + out->append("\\"); + else if (*i == 's') + out->append("/"); + else if (*i == 'u') + out->append("_"); + else + return false; + } else { + out->append(i, i+1); + } + } + return true; +} + +bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) { + if (index_version == HASH_INDEX_TAG) + return lfn_parse_object_name_keyless(long_name, out); + + string::const_iterator current = long_name.begin(); + if (*current == '\\') { + ++current; + if (current == long_name.end()) { + return false; + } else if (*current == 'd') { + out->oid.name.append("DIR_"); + ++current; + } else if (*current == '.') { + out->oid.name.append("."); + ++current; + } else { + --current; + } + } + + string::const_iterator end = current; + for (; end != long_name.end() && *end != '_'; ++end); + if (end == long_name.end()) + return false; + if (!append_unescaped(current, end, &(out->oid.name))) + return false; + + current = ++end; + for (; end != long_name.end() && *end != '_'; ++end); + if (end == long_name.end()) + return false; + if (!append_unescaped(current, end, &(out->oid.name))) + return false; + + current = ++end; + for (; end != long_name.end() && *end != '_'; ++end); + if (end == long_name.end()) + return false; + string snap(current, end); + + current = ++end; + for (; end != long_name.end() && *end != '_'; ++end); + if (end != long_name.end()) + return false; + string hash(current, end); + + if (snap == "head") + out->snap = CEPH_NOSNAP; + else if (snap == "snapdir") + out->snap = CEPH_SNAPDIR; + else + out->snap = strtoull(snap.c_str(), NULL, 16); + sscanf(hash.c_str(), "%X", &out->hash); + return true; +} + bool LFNIndex::lfn_is_hashed_filename(const string &name) { if (name.size() < (unsigned)FILENAME_SHORT_LEN) { return 0; @@ -776,7 +921,7 @@ string LFNIndex::demangle_path_component(const string &component) { } int LFNIndex::decompose_full_path(const char *in, vector<string> *out, - hobject_t *hoid, string *shortname) { + hobject_t *hoid, string *shortname) { const char *beginning = in + get_base_path().size(); const char *end = beginning; while (1) { diff --git a/src/os/LFNIndex.h b/src/os/LFNIndex.h index dedd8465a28..e0321b45dbf 100644 --- a/src/os/LFNIndex.h +++ b/src/os/LFNIndex.h @@ -76,10 +76,26 @@ class LFNIndex : public CollectionIndex { /// For reference counting the collection @see Path std::tr1::weak_ptr<CollectionIndex> self_ref; +protected: + const uint32_t index_version; + +private: + string lfn_attribute; + public: /// Constructor - LFNIndex(const char *base_path) ///< [in] path to Index root - : base_path(base_path) {} + LFNIndex( + const char *base_path, ///< [in] path to Index root + uint32_t index_version) + : base_path(base_path), index_version(index_version) { + if (index_version == HASH_INDEX_TAG) { + lfn_attribute = LFN_ATTR; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "%d", index_version); + lfn_attribute = LFN_ATTR + string(buf); + } + } /// Virtual destructor virtual ~LFNIndex() {} @@ -306,6 +322,14 @@ protected: private: /* lfn translation functions */ + + /** + * Gets the version specific lfn attribute tag + */ + const string &get_lfn_attr() const { + return lfn_attribute; + } + /** * Gets the filename corresponsing to hoid in path. * @@ -359,11 +383,22 @@ private: ); ///< @return True if short_name is a subdir, false otherwise /// Generate object name + string lfn_generate_object_name_keyless( + const hobject_t &hoid ///< [in] Object for which to generate. + ); ///< @return Generated object name. + + /// Generate object name string lfn_generate_object_name( const hobject_t &hoid ///< [in] Object for which to generate. ); ///< @return Generated object name. /// Parse object name + bool lfn_parse_object_name_keyless( + const string &long_name, ///< [in] Name to parse + hobject_t *out ///< [out] Resulting Object + ); ///< @return True if successfull, False otherwise. + + /// Parse object name bool lfn_parse_object_name( const string &long_name, ///< [in] Name to parse hobject_t *out ///< [out] Resulting Object diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index de70ae21eb9..85d922cab11 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5067,7 +5067,8 @@ void OSD::handle_op(MOSDOp *op) if ((op->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) { // missing object? - hobject_t head(op->get_oid(), CEPH_NOSNAP, op->get_pg().ps()); + hobject_t head(op->get_oid(), op->get_object_locator().key, + CEPH_NOSNAP, op->get_pg().ps()); if (pg->is_missing_object(head)) { pg->wait_for_missing_object(head, op); pg->unlock(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index c0f383b6e0f..d77639c2a3b 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2250,6 +2250,7 @@ void PG::read_log(ObjectStore *store) ++i) { if (i->oid == e.soid.oid && i->snap == e.soid.snap) { e.soid.hash = i->hash; + e.soid.key = i->key; found = true; break; } diff --git a/src/osd/PG.h b/src/osd/PG.h index 61e8778fd14..d5f810d5130 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -393,7 +393,7 @@ public: } void encode(bufferlist &bl) const { - __u8 struct_v = 2; + __u8 struct_v = 3; ::encode(struct_v, bl); ::encode(op, bl); ::encode(soid, bl); @@ -417,6 +417,8 @@ public: } else { ::decode(soid, bl); } + if (struct_v < 3) + invalid_hash = true; ::decode(version, bl); ::decode(prior_version, bl); ::decode(reqid, bl); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b9a50f95936..73b42acf133 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -404,7 +404,9 @@ void ReplicatedPG::do_op(MOSDOp *op) ObjectContext *obc; bool can_create = op->may_write(); snapid_t snapid; - int r = find_object_context(hobject_t(op->get_oid(), op->get_snapid(), op->get_pg().ps()), + int r = find_object_context(hobject_t(op->get_oid(), + op->get_object_locator().key, + op->get_snapid(), op->get_pg().ps()), op->get_object_locator(), &obc, can_create, &snapid); @@ -416,7 +418,8 @@ void ReplicatedPG::do_op(MOSDOp *op) if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) { // missing the specific snap we need; requeue and wait. assert(!can_create); // only happens on a read - hobject_t soid(op->get_oid(), snapid, op->get_pg().ps()); + hobject_t soid(op->get_oid(), op->get_object_locator().key, + snapid, op->get_pg().ps()); wait_for_missing_object(soid, op); return; } @@ -476,17 +479,19 @@ void ReplicatedPG::do_op(MOSDOp *op) map<hobject_t,ObjectContext*> src_obc; for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) { OSDOp& osd_op = *p; - hobject_t toid(osd_op.soid, op->get_pg().ps()); + hobject_t toid(osd_op.soid, op->get_object_locator().key, op->get_pg().ps()); if (osd_op.soid.oid.name.length()) { if (!src_obc.count(toid)) { ObjectContext *sobc; snapid_t ssnapid; - int r = find_object_context(hobject_t(toid.oid, toid.snap, op->get_pg().ps()), + int r = find_object_context(hobject_t(toid.oid, op->get_object_locator().key, + toid.snap, op->get_pg().ps()), op->get_object_locator(), &sobc, false, &ssnapid); if (r == -EAGAIN) { // missing the specific snap we need; requeue and wait. - hobject_t wait_oid(osd_op.soid.oid, ssnapid, op->get_pg().ps()); + hobject_t wait_oid(osd_op.soid.oid, op->get_object_locator().key, + ssnapid, op->get_pg().ps()); wait_for_missing_object(wait_oid, op); } else if (r) { osd->reply_op_error(op, r); @@ -816,7 +821,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // load clone info bufferlist bl; ObjectContext *obc = 0; - int r = find_object_context(hobject_t(coid.oid, sn, coid.hash), + int r = find_object_context(hobject_t(coid.oid, coid.key, sn, coid.hash), OLOC_BLANK, &obc, false, NULL); if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) { if (obc) put_object_context(obc); @@ -829,7 +834,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // get snap set context if (!obc->ssc) - obc->ssc = get_snapset_context(coid.oid, coid.hash, false); + obc->ssc = get_snapset_context(coid.oid, coid.key, coid.hash, false); SnapSetContext *ssc = obc->ssc; assert(ssc); SnapSet& snapset = ssc->snapset; @@ -926,7 +931,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // save head snapset dout(10) << coid << " new snapset " << snapset << dendl; - hobject_t snapoid(coid.oid, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash); + hobject_t snapoid(coid.oid, coid.key, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash); ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false); assert(ctx->snapset_obc->registered); if (snapset.clones.empty() && !snapset.head_exists) { @@ -1157,7 +1162,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops, ObjectContext *src_obc = 0; if (ceph_osd_op_type_multi(op.op)) { - src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.hash)]; + src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.key, soid.hash)]; assert(src_obc); } @@ -1678,8 +1683,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops, result = -EINVAL; break; } - t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.hash), obs.oi.soid, - op.clonerange.src_offset, op.clonerange.length, op.clonerange.offset); + t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.key, + obs.oi.soid.hash), + obs.oi.soid, op.clonerange.src_offset, + op.clonerange.length, op.clonerange.offset); + write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges, op.clonerange.offset, op.clonerange.length, false); @@ -2096,7 +2104,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl; ObjectContext *rollback_to; - int ret = find_object_context(hobject_t(soid.oid, snapid, soid.hash), + int ret = find_object_context(hobject_t(soid.oid, oi.oloc.key, snapid, soid.hash), oi.oloc, &rollback_to, false, &cloneid); if (ret) { if (-ENOENT == ret) { @@ -2109,7 +2117,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) /* a different problem, like degraded pool * with not-yet-restored object. We shouldn't have been able * to get here; recovery should have completed first! */ - hobject_t rollback_target(soid.oid, cloneid, soid.hash); + hobject_t rollback_target(soid.oid, soid.key, cloneid, soid.hash); assert(is_missing_object(rollback_target)); dout(20) << "_rollback_to attempted to roll back to a missing object " << rollback_target << " (requested snapid: ) " << snapid << dendl; @@ -2546,7 +2554,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->op_t.setattr(coll, soid, SS_ATTR, bss); if (!head_existed) { // if we logically recreated the head, remove old _snapdir object - hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash); + hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash); ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false); if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) { @@ -2563,7 +2571,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) } } else if (ctx->new_snapset.clones.size()) { // save snapset on _snap - hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash); + hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash); dout(10) << " final snapset " << ctx->new_snapset << " in " << snapoid << dendl; ctx->at_version.version++; @@ -3017,13 +3025,13 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s SnapSetContext *ssc = NULL; if (can_create) - ssc = get_snapset_context(soid.oid, soid.hash, true); + ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true); obc = new ObjectContext(oi, true, ssc); } register_object_context(obc); if (can_create && !obc->ssc) - obc->ssc = get_snapset_context(soid.oid, soid.hash, true); + obc->ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true); if (r >= 0) { obc->obs.oi.decode(bv); @@ -3058,7 +3066,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, snapid_t *psnapid) { // want the head? - hobject_t head(oid.oid, CEPH_NOSNAP, oid.hash); + hobject_t head(oid.oid, oid.key, CEPH_NOSNAP, oid.hash); if (oid.snap == CEPH_NOSNAP) { ObjectContext *obc = get_object_context(head, oloc, can_create); if (!obc) @@ -3067,13 +3075,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, *pobc = obc; if (can_create && !obc->ssc) - obc->ssc = get_snapset_context(oid.oid, oid.hash, true); + obc->ssc = get_snapset_context(oid.oid, oid.key, oid.hash, true); return 0; } // we want a snap - SnapSetContext *ssc = get_snapset_context(oid.oid, oid.hash, can_create); + SnapSetContext *ssc = get_snapset_context(oid.oid, oid.key, oid.hash, can_create); if (!ssc) return -ENOENT; @@ -3115,7 +3123,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, put_snapset_context(ssc); return -ENOENT; } - hobject_t soid(oid.oid, ssc->snapset.clones[k], oid.hash); + hobject_t soid(oid.oid, oid.key, ssc->snapset.clones[k], oid.hash); put_snapset_context(ssc); // we're done with ssc ssc = 0; @@ -3179,7 +3187,9 @@ void ReplicatedPG::put_object_contexts(map<hobject_t,ObjectContext*>& obcv) obcv.clear(); } -ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, ps_t seed, +ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, + const string& key, + ps_t seed, bool can_create) { SnapSetContext *ssc; @@ -3188,11 +3198,11 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& ssc = p->second; } else { bufferlist bv; - hobject_t head(oid, CEPH_NOSNAP, seed); + hobject_t head(oid, key, CEPH_NOSNAP, seed); int r = osd->store->getattr(coll, head, SS_ATTR, bv); if (r < 0) { // try _snapset - hobject_t snapdir(oid, CEPH_SNAPDIR, seed); + hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed); r = osd->store->getattr(coll, snapdir, SS_ATTR, bv); if (r < 0 && !can_create) return NULL; @@ -3602,7 +3612,7 @@ int ReplicatedPG::pull(const hobject_t& soid) } // check snapset - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); dout(10) << " snapset " << ssc->snapset << dendl; calc_clone_subsets(ssc->snapset, soid, missing, data_subset, clone_subsets); @@ -3710,7 +3720,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in return push_start(snapdir, peer); } - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; calc_clone_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets); @@ -3718,7 +3728,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in } else if (soid.snap == CEPH_NOSNAP) { // pushing head or unversioned object. // base this on partially on replica's clones? - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets); put_snapset_context(ssc); @@ -4062,7 +4072,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) if (soid.snap && soid.snap < CEPH_NOSNAP) { // clone. make sure we have enough data. - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); assert(ssc); clone_subsets.clear(); // forget what pusher said; recalculate cloning. diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index faadd3f03bf..f9295e4e13c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -510,7 +510,8 @@ protected: ObjectContext **pobc, bool can_create, snapid_t *psnapid=NULL); - SnapSetContext *get_snapset_context(const object_t& oid, ps_t seed, bool can_create); + SnapSetContext *get_snapset_context(const object_t& oid, const string &key, + ps_t seed, bool can_create); void register_snapset_context(SnapSetContext *ssc) { if (!ssc->registered) { ssc->registered = true; diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 75a93ca73fd..2bdf099d0cf 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -574,7 +574,7 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid, void object_info_t::encode(bufferlist& bl) const { - const __u8 v = 6; + const __u8 v = 7; ::encode(v, bl); ::encode(soid, bl); ::encode(oloc, bl); @@ -603,12 +603,15 @@ void object_info_t::decode(bufferlist::iterator& bl) sobject_t obj; ::decode(obj, bl); ::decode(oloc, bl); - soid = hobject_t(obj.oid, obj.snap, 0); + soid = hobject_t(obj.oid, oloc.key, obj.snap, 0); soid.hash = legacy_object_locator_to_ps(soid.oid, oloc); } else if (v >= 6) { ::decode(soid, bl); ::decode(oloc, bl); + if (v == 6) + soid.key = oloc.key; } + if (v >= 5) ::decode(category, bl); ::decode(version, bl); diff --git a/src/test/store_test.cc b/src/test/store_test.cc index a3c9f529e56..10fb6564dc2 100644 --- a/src/test/store_test.cc +++ b/src/test/store_test.cc @@ -253,7 +253,7 @@ public: // hash //boost::binomial_distribution<uint32_t> bin(0xFFFFFF, 0.5); ++seq; - return hobject_t(name, CEPH_NOSNAP, rand()); + return hobject_t(name, string(), CEPH_NOSNAP, rand()); } }; @@ -469,7 +469,7 @@ TEST_F(StoreTest, HashCollisionTest) { if (!(i % 5)) { cerr << "Object " << i << std::endl; } - hobject_t hoid(string(buf) + base, CEPH_NOSNAP, 0); + hobject_t hoid(string(buf) + base, string(), CEPH_NOSNAP, 0); { ObjectStore::Transaction t; t.touch(cid, hoid); |