summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-09-03 15:13:40 -0700
committerSage Weil <sage@inktank.com>2013-09-11 15:19:18 -0700
commit17c5d765d7c7573f875f6b3ba66e3b6813110a06 (patch)
treec130c8d67a2a260c56406e6de2f7530a3b1b40a8
parentef7cffc34f3bad4ffc090361ad9030a47584a3bf (diff)
downloadceph-17c5d765d7c7573f875f6b3ba66e3b6813110a06.tar.gz
os/FileStore: implement collection_move_rename
This is similar to a collection_add + collection_move sequence in that we apply the same replay guards. The difference is that we roll it up into a single operation, change the filename, and make the omap content carry over by calling DBObjectMap->clone (as there is no rename function or collection awareness in the DBObjectMap). Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/os/FileStore.cc99
-rw-r--r--src/os/FileStore.h6
-rw-r--r--src/test/filestore/store_test.cc59
3 files changed, 154 insertions, 10 deletions
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 8b27246dfdf..66581bcde38 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -299,7 +299,8 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobje
}
int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
- const SequencerPosition &spos)
+ const SequencerPosition &spos,
+ bool force_clear_omap)
{
Index index;
int r = get_index(cid, &index);
@@ -315,14 +316,17 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
return r;
}
- struct stat st;
- r = ::stat(path->path(), &st);
- if (r < 0) {
- r = -errno;
- assert(!m_filestore_fail_eio || r != -EIO);
- return r;
+ if (!force_clear_omap) {
+ struct stat st;
+ r = ::stat(path->path(), &st);
+ if (r < 0) {
+ r = -errno;
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return r;
+ }
+ force_clear_omap = true;
}
- if (st.st_nlink == 1) {
+ if (force_clear_omap) {
dout(20) << __func__ << ": clearing omap on " << o
<< " in cid " << cid << dendl;
r = object_map->clear(o, &spos);
@@ -2176,6 +2180,16 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
}
break;
+ case Transaction::OP_COLL_MOVE_RENAME:
+ {
+ coll_t oldcid = i.get_cid();
+ hobject_t oldoid = i.get_oid();
+ coll_t newcid = i.get_cid();
+ hobject_t newoid = i.get_oid();
+ r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
+ }
+ break;
+
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = i.get_cid();
@@ -4086,7 +4100,7 @@ int FileStore::_destroy_collection(coll_t c)
int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
- const SequencerPosition& spos)
+ const SequencerPosition& spos)
{
dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
@@ -4133,6 +4147,73 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
return r;
}
+int FileStore::_collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+ coll_t c, const hobject_t& o,
+ const SequencerPosition& spos)
+{
+ dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
+ int r;
+ int dstcmp, srccmp;
+
+ dstcmp = _check_replay_guard(c, o, spos);
+ if (dstcmp < 0)
+ goto out_rm_src;
+
+ // check the src name too; it might have a newer guard, and we don't
+ // want to clobber it
+ srccmp = _check_replay_guard(oldcid, oldoid, spos);
+ if (srccmp < 0)
+ return 0;
+
+ {
+ // open guard on object so we don't any previous operations on the
+ // new name that will modify the source inode.
+ FDRef fd;
+ r = lfn_open(oldcid, oldoid, 0, &fd);
+ if (r < 0) {
+ // the source collection/object does not exist. If we are replaying, we
+ // should be safe, so just return 0 and move on.
+ assert(replaying);
+ dout(10) << __func__ << " " << c << "/" << o << " from "
+ << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
+ return 0;
+ }
+ if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
+ _set_replay_guard(**fd, spos, &o, true);
+ }
+
+ r = lfn_link(oldcid, c, oldoid, o);
+ if (replaying && !backend->can_checkpoint() &&
+ r == -EEXIST) // crashed between link() and set_replay_guard()
+ r = 0;
+
+ _inject_failure();
+
+ // the name changed; link the omap content
+ r = object_map->clone(oldoid, o, &spos);
+ if (r == -ENOENT)
+ r = 0;
+
+ _inject_failure();
+
+ // close guard on object so we don't do this again
+ if (r == 0) {
+ _close_replay_guard(**fd, spos);
+ }
+ lfn_close(fd);
+ }
+
+ out_rm_src:
+ // remove source
+ if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
+ r = lfn_unlink(oldcid, oldoid, spos, true);
+ }
+
+ dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid
+ << " = " << r << dendl;
+ return r;
+}
+
void FileStore::_inject_failure()
{
if (m_filestore_kill_at.read()) {
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index 19178be4154..4f58df4d698 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -299,7 +299,8 @@ public:
Index *index = 0);
void lfn_close(FDRef fd);
int lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) ;
- int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
+ int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos,
+ bool force_clear_omap=false);
public:
FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false);
@@ -499,6 +500,9 @@ public:
int _destroy_collection(coll_t c);
int _collection_add(coll_t c, coll_t ocid, const hobject_t& o,
const SequencerPosition& spos);
+ int _collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+ coll_t c, const hobject_t& o,
+ const SequencerPosition& spos);
void dump_start(const std::string& file);
void dump_stop();
void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr);
diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc
index 80c775052ec..92104960127 100644
--- a/src/test/filestore/store_test.cc
+++ b/src/test/filestore/store_test.cc
@@ -898,6 +898,65 @@ TEST_F(StoreTest, TwoHash) {
ASSERT_EQ(r, 0);
}
+TEST_F(StoreTest, MoveRename) {
+ coll_t temp_cid("mytemp");
+ hobject_t temp_oid("tmp_oid", "", CEPH_NOSNAP, 0, 0, "");
+ coll_t cid("dest");
+ hobject_t oid("dest_oid", "", CEPH_NOSNAP, 0, 0, "");
+ int r;
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid);
+ t.touch(cid, oid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(cid, oid));
+ bufferlist data, attr;
+ map<string, bufferlist> omap;
+ data.append("data payload");
+ attr.append("attr value");
+ omap["omap_key"].append("omap value");
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(temp_cid);
+ t.touch(temp_cid, temp_oid);
+ t.write(temp_cid, temp_oid, 0, data.length(), data);
+ t.setattr(temp_cid, temp_oid, "attr", attr);
+ t.omap_setkeys(temp_cid, temp_oid, omap);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(temp_cid, temp_oid));
+ {
+ ObjectStore::Transaction t;
+ t.remove(cid, oid);
+ t.collection_move_rename(temp_cid, temp_oid, cid, oid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ASSERT_TRUE(store->exists(cid, oid));
+ ASSERT_FALSE(store->exists(temp_cid, temp_oid));
+ {
+ bufferlist newdata;
+ r = store->read(cid, oid, 0, 1000, newdata);
+ ASSERT_GE(r, 0);
+ ASSERT_TRUE(newdata.contents_equal(data));
+ bufferlist newattr;
+ r = store->getattr(cid, oid, "attr", newattr);
+ ASSERT_GE(r, 0);
+ ASSERT_TRUE(newattr.contents_equal(attr));
+ set<string> keys;
+ keys.insert("omap_key");
+ map<string, bufferlist> newomap;
+ r = store->omap_get_values(cid, oid, keys, &newomap);
+ ASSERT_GE(r, 0);
+ ASSERT_EQ(1u, newomap.size());
+ ASSERT_TRUE(newomap.count("omap_key"));
+ ASSERT_TRUE(newomap["omap_key"].contents_equal(omap["omap_key"]));
+ }
+}
+
//
// support tests for qa/workunits/filestore/filestore.sh
//