summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYan, Zheng <zheng.z.yan@intel.com>2013-09-19 11:07:17 +0800
committerYan, Zheng <zheng.z.yan@intel.com>2013-09-24 08:45:55 +0800
commitd39e16ad3c360551ae591d91ec5aeb03cc2cfd25 (patch)
treeedc6845ff09cc0f5ba524b1121226ea5681008ac
parent623e31c1a230fe40edf7e669a8d73e9a55c07258 (diff)
downloadceph-d39e16ad3c360551ae591d91ec5aeb03cc2cfd25.tar.gz
mds: delete orphan dirfrags after fragmentating directory
delete old dirfrags after the EFragment::OP_COMMIT event is logged. Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
-rw-r--r--src/mds/MDCache.cc105
-rw-r--r--src/mds/MDCache.h16
-rw-r--r--src/mds/journal.cc20
3 files changed, 115 insertions, 26 deletions
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index fabcd0c5bc7..bb59c6a21fa 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -11051,6 +11051,20 @@ public:
}
};
+class C_MDC_FragmentCommit : public Context {
+ MDCache *mdcache;
+ dirfrag_t basedirfrag;
+ list<CDir*> resultfrags;
+public:
+ C_MDC_FragmentCommit(MDCache *m, dirfrag_t f, list<CDir*>& l) :
+ mdcache(m), basedirfrag(f) {
+ resultfrags.swap(l);
+ }
+ virtual void finish(int r) {
+ mdcache->_fragment_committed(basedirfrag, resultfrags);
+ }
+};
+
void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
{
dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
@@ -11106,6 +11120,10 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr)
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
+ list<frag_t> old_frags;
+ for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p)
+ old_frags.push_back((*p)->get_frag());
+
// refragment
list<Context*> waiters;
adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
@@ -11149,10 +11167,12 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr)
le->metablob.add_dir(dir, false);
// freeze and store them too
+ dir->auth_pin(this);
dir->state_set(CDir::STATE_FRAGMENTING);
dir->commit(0, gather.new_sub(), true); // ignore authpinnability
}
+ add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, old_frags);
mds->mdlog->submit_entry(le, gather.new_sub());
mds->mdlog->flush();
gather.activate();
@@ -11166,11 +11186,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag
<< " bits " << info.bits << " on " << *diri << dendl;
-
- // journal commit
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(),
- info.basefrag, info.bits);
- mds->mdlog->start_submit_entry(le);
// tell peers
CDir *first = *info.resultfrags.begin();
@@ -11199,9 +11214,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
-
- // unmark, unfreeze
- dir->state_clear(CDir::STATE_FRAGMENTING);
for (CDir::map_t::iterator p = dir->items.begin();
p != dir->items.end();
@@ -11212,13 +11224,65 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr)
dn->put(CDentry::PIN_FRAGMENTING);
}
+ // unfreeze
dir->unfreeze_dir();
}
+ // journal commit
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT,
+ diri->ino(), info.basefrag, info.bits);
+ dirfrag_t basedirfrag(diri->ino(), info.basefrag);
+ mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag, info.resultfrags));
+
fragment_requests.erase(mdr->reqid);
request_finish(mdr);
}
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_committed " << basedirfrag << dendl;
+ assert(uncommitted_fragments.count(basedirfrag));
+ ufragment &uf = uncommitted_fragments[basedirfrag];
+
+ // remove old frags
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags));
+
+ SnapContext nullsnapc;
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+ for (list<frag_t>::iterator p = uf.old_frags.begin();
+ p != uf.old_frags.end();
+ ++p) {
+ object_t oid = CInode::get_object_name(basedirfrag.ino, *p, "");
+ ObjectOperation op;
+ if (*p == frag_t()) {
+ // backtrace object
+ dout(10) << " truncate orphan dirfrag " << oid << dendl;
+ op.truncate(0);
+ } else {
+ dout(10) << " removing orphan dirfrag " << oid << dendl;
+ op.remove();
+ }
+ mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+ }
+
+ assert(gather.has_subs());
+ gather.activate();
+}
+
+void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_finish " << basedirfrag << dendl;
+ assert(uncommitted_fragments.count(basedirfrag));
+
+ // unmark & auth_unpin
+ for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) {
+ (*p)->state_clear(CDir::STATE_FRAGMENTING);
+ (*p)->auth_unpin(this);
+ }
+
+ finish_uncommitted_fragment(basedirfrag);
+}
/* This function DOES put the passed message before returning */
void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
@@ -11264,23 +11328,40 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
notify->put();
}
+void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags)
+{
+ dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl;
+ assert(!uncommitted_fragments.count(basedirfrag));
+ ufragment& uf = uncommitted_fragments[basedirfrag];
+ uf.old_frags.swap(old_frags);
+ uf.bits = bits;
+}
+
+void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag)
+{
+ dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag << dendl;
+ if (uncommitted_fragments.count(basedirfrag)) {
+ uncommitted_fragments.erase(basedirfrag);
+ }
+}
void MDCache::rollback_uncommitted_fragments()
{
dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
- for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+ for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin();
p != uncommitted_fragments.end();
++p) {
+ ufragment &uf = p->second;
CInode *diri = get_inode(p->first.ino);
assert(diri);
- dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+ dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl;
list<CDir*> resultfrags;
list<Context*> waiters;
- adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+ adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits);
mds->mdlog->start_submit_entry(le);
}
uncommitted_fragments.clear();
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index cb219360ccf..8560ce481fb 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -942,10 +942,14 @@ protected:
// -- fragmenting --
-public:
- set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations
-
private:
+ struct ufragment {
+ int bits;
+ list<frag_t> old_frags;
+ ufragment() : bits(0) {}
+ };
+ map<dirfrag_t, ufragment> uncommitted_fragments;
+
struct fragment_info_t {
frag_t basefrag;
int bits;
@@ -981,6 +985,9 @@ private:
void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
void dispatch_fragment_dir(MDRequest *mdr);
void fragment_logged_and_stored(MDRequest *mdr);
+ void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags);
+ void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags);
+
public:
void rollback_uncommitted_fragments();
private:
@@ -988,9 +995,12 @@ private:
friend class C_MDC_FragmentFrozen;
friend class C_MDC_FragmentMarking;
friend class C_MDC_FragmentLoggedAndStored;
+ friend class C_MDC_FragmentCommit;
void handle_fragment_notify(MMDSFragmentNotify *m);
+ void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag);
+ void finish_uncommitted_fragment(dirfrag_t basedirfrag);
// -- updates --
//int send_inode_updates(CInode *in);
diff --git a/src/mds/journal.cc b/src/mds/journal.cc
index aeff07eb905..49fdb9ce5da 100644
--- a/src/mds/journal.cc
+++ b/src/mds/journal.cc
@@ -2381,6 +2381,7 @@ void EFragment::replay(MDS *mds)
list<CDir*> resultfrags;
list<Context*> waiters;
+ list<frag_t> old_frags;
pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits);
// in may be NULL if it wasn't in our cache yet. if it's a prepare
@@ -2390,26 +2391,23 @@ void EFragment::replay(MDS *mds)
switch (op) {
case OP_PREPARE:
- mds->mdcache->uncommitted_fragments.insert(desc);
+ mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, old_frags);
// fall-thru
case OP_ONESHOT:
if (in)
mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
break;
- case OP_COMMIT:
- mds->mdcache->uncommitted_fragments.erase(desc);
- break;
-
case OP_ROLLBACK:
- if (mds->mdcache->uncommitted_fragments.count(desc)) {
- mds->mdcache->uncommitted_fragments.erase(desc);
- assert(in);
+ if (in)
mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
- } else {
- dout(10) << " no record of prepare for " << desc << dendl;
- }
+ // fall-thru
+ case OP_COMMIT:
+ mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag));
break;
+
+ default:
+ assert(0);
}
metablob.replay(mds, _segment);
if (in && g_conf->mds_debug_frag)