diff options
author | Sage Weil <sage@inktank.com> | 2013-09-03 15:13:40 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-09-11 15:19:18 -0700 |
commit | 17c5d765d7c7573f875f6b3ba66e3b6813110a06 (patch) | |
tree | c130c8d67a2a260c56406e6de2f7530a3b1b40a8 | |
parent | ef7cffc34f3bad4ffc090361ad9030a47584a3bf (diff) | |
download | ceph-17c5d765d7c7573f875f6b3ba66e3b6813110a06.tar.gz |
os/FileStore: implement collection_move_rename
This is similar to a collection_add + collection_move sequence in that we
apply the same replay guards. The difference is that we roll it up into
a single operation, change the filename, and make the omap content carry
over by calling DBObjectMap->clone (as there is no rename function or
collection awareness in the DBObjectMap).
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/os/FileStore.cc | 99 | ||||
-rw-r--r-- | src/os/FileStore.h | 6 | ||||
-rw-r--r-- | src/test/filestore/store_test.cc | 59 |
3 files changed, 154 insertions, 10 deletions
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 8b27246dfdf..66581bcde38 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -299,7 +299,8 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobje } int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, - const SequencerPosition &spos) + const SequencerPosition &spos, + bool force_clear_omap) { Index index; int r = get_index(cid, &index); @@ -315,14 +316,17 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o, return r; } - struct stat st; - r = ::stat(path->path(), &st); - if (r < 0) { - r = -errno; - assert(!m_filestore_fail_eio || r != -EIO); - return r; + if (!force_clear_omap) { + struct stat st; + r = ::stat(path->path(), &st); + if (r < 0) { + r = -errno; + assert(!m_filestore_fail_eio || r != -EIO); + return r; + } + force_clear_omap = true; } - if (st.st_nlink == 1) { + if (force_clear_omap) { dout(20) << __func__ << ": clearing omap on " << o << " in cid " << cid << dendl; r = object_map->clear(o, &spos); @@ -2176,6 +2180,16 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n } break; + case Transaction::OP_COLL_MOVE_RENAME: + { + coll_t oldcid = i.get_cid(); + hobject_t oldoid = i.get_oid(); + coll_t newcid = i.get_cid(); + hobject_t newoid = i.get_oid(); + r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos); + } + break; + case Transaction::OP_COLL_SETATTR: { coll_t cid = i.get_cid(); @@ -4086,7 +4100,7 @@ int FileStore::_destroy_collection(coll_t c) int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, - const SequencerPosition& spos) + const SequencerPosition& spos) { dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl; @@ -4133,6 +4147,73 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o, return r; } +int FileStore::_collection_move_rename(coll_t oldcid, const hobject_t& oldoid, + coll_t c, const hobject_t& o, + const SequencerPosition& spos) +{ + dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl; + int r; + int dstcmp, srccmp; + + dstcmp = _check_replay_guard(c, o, spos); + if (dstcmp < 0) + goto out_rm_src; + + // check the src name too; it might have a newer guard, and we don't + // want to clobber it + srccmp = _check_replay_guard(oldcid, oldoid, spos); + if (srccmp < 0) + return 0; + + { + // open guard on object so we don't any previous operations on the + // new name that will modify the source inode. + FDRef fd; + r = lfn_open(oldcid, oldoid, 0, &fd); + if (r < 0) { + // the source collection/object does not exist. If we are replaying, we + // should be safe, so just return 0 and move on. + assert(replaying); + dout(10) << __func__ << " " << c << "/" << o << " from " + << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl; + return 0; + } + if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress" + _set_replay_guard(**fd, spos, &o, true); + } + + r = lfn_link(oldcid, c, oldoid, o); + if (replaying && !backend->can_checkpoint() && + r == -EEXIST) // crashed between link() and set_replay_guard() + r = 0; + + _inject_failure(); + + // the name changed; link the omap content + r = object_map->clone(oldoid, o, &spos); + if (r == -ENOENT) + r = 0; + + _inject_failure(); + + // close guard on object so we don't do this again + if (r == 0) { + _close_replay_guard(**fd, spos); + } + lfn_close(fd); + } + + out_rm_src: + // remove source + if (_check_replay_guard(oldcid, oldoid, spos) > 0) { + r = lfn_unlink(oldcid, oldoid, spos, true); + } + + dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid + << " = " << r << dendl; + return r; +} + void FileStore::_inject_failure() { if (m_filestore_kill_at.read()) { diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 19178be4154..4f58df4d698 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -299,7 +299,8 @@ public: Index *index = 0); void lfn_close(FDRef fd); int lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) ; - int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos); + int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos, + bool force_clear_omap=false); public: FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false); @@ -499,6 +500,9 @@ public: int _destroy_collection(coll_t c); int _collection_add(coll_t c, coll_t ocid, const hobject_t& o, const SequencerPosition& spos); + int _collection_move_rename(coll_t oldcid, const hobject_t& oldoid, + coll_t c, const hobject_t& o, + const SequencerPosition& spos); void dump_start(const std::string& file); void dump_stop(); void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr); diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc index 80c775052ec..92104960127 100644 --- a/src/test/filestore/store_test.cc +++ b/src/test/filestore/store_test.cc @@ -898,6 +898,65 @@ TEST_F(StoreTest, TwoHash) { ASSERT_EQ(r, 0); } +TEST_F(StoreTest, MoveRename) { + coll_t temp_cid("mytemp"); + hobject_t temp_oid("tmp_oid", "", CEPH_NOSNAP, 0, 0, ""); + coll_t cid("dest"); + hobject_t oid("dest_oid", "", CEPH_NOSNAP, 0, 0, ""); + int r; + { + ObjectStore::Transaction t; + t.create_collection(cid); + t.touch(cid, oid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(cid, oid)); + bufferlist data, attr; + map<string, bufferlist> omap; + data.append("data payload"); + attr.append("attr value"); + omap["omap_key"].append("omap value"); + { + ObjectStore::Transaction t; + t.create_collection(temp_cid); + t.touch(temp_cid, temp_oid); + t.write(temp_cid, temp_oid, 0, data.length(), data); + t.setattr(temp_cid, temp_oid, "attr", attr); + t.omap_setkeys(temp_cid, temp_oid, omap); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(temp_cid, temp_oid)); + { + ObjectStore::Transaction t; + t.remove(cid, oid); + t.collection_move_rename(temp_cid, temp_oid, cid, oid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ASSERT_TRUE(store->exists(cid, oid)); + ASSERT_FALSE(store->exists(temp_cid, temp_oid)); + { + bufferlist newdata; + r = store->read(cid, oid, 0, 1000, newdata); + ASSERT_GE(r, 0); + ASSERT_TRUE(newdata.contents_equal(data)); + bufferlist newattr; + r = store->getattr(cid, oid, "attr", newattr); + ASSERT_GE(r, 0); + ASSERT_TRUE(newattr.contents_equal(attr)); + set<string> keys; + keys.insert("omap_key"); + map<string, bufferlist> newomap; + r = store->omap_get_values(cid, oid, keys, &newomap); + ASSERT_GE(r, 0); + ASSERT_EQ(1u, newomap.size()); + ASSERT_TRUE(newomap.count("omap_key")); + ASSERT_TRUE(newomap["omap_key"].contents_equal(omap["omap_key"])); + } +} + // // support tests for qa/workunits/filestore/filestore.sh // |