diff options
author | Yan, Zheng <zheng.z.yan@intel.com> | 2013-09-19 11:07:17 +0800 |
---|---|---|
committer | Yan, Zheng <zheng.z.yan@intel.com> | 2013-10-04 17:29:20 +0800 |
commit | 4634d3ce2d14a31d7e0a27985e5b0eac51c84d22 (patch) | |
tree | d77c98ad2b50cbd58badb86c3f59e9512d5dea1b | |
parent | cd4cd3f27ce6957901fdddc71f68f50d529546ea (diff) | |
download | ceph-4634d3ce2d14a31d7e0a27985e5b0eac51c84d22.tar.gz |
mds: delete orphan dirfrags after fragmentating directory
delete old dirfrags after the EFragment::OP_COMMIT event is logged.
Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
-rw-r--r-- | src/mds/MDCache.cc | 105 | ||||
-rw-r--r-- | src/mds/MDCache.h | 16 | ||||
-rw-r--r-- | src/mds/journal.cc | 20 |
3 files changed, 115 insertions, 26 deletions
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 91fadb73df9..8e9dd89db71 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -11051,6 +11051,20 @@ public: } }; +class C_MDC_FragmentCommit : public Context { + MDCache *mdcache; + dirfrag_t basedirfrag; + list<CDir*> resultfrags; +public: + C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) : + mdcache(m), basedirfrag(ino, f) { + resultfrags.swap(l); + } + virtual void finish(int r) { + mdcache->_fragment_committed(basedirfrag, resultfrags); + } +}; + void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) { dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits @@ -11111,6 +11125,10 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr) info.basefrag, info.bits); mds->mdlog->start_entry(le); + list<frag_t> old_frags; + for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p) + old_frags.push_back((*p)->get_frag()); + // refragment list<Context*> waiters; adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits, @@ -11149,10 +11167,12 @@ void MDCache::dispatch_fragment_dir(MDRequest *mdr) le->metablob.add_dir(dir, false); // freeze and store them too + dir->auth_pin(this); dir->state_set(CDir::STATE_FRAGMENTING); dir->commit(0, gather.new_sub(), true); // ignore authpinnability } + add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, old_frags); mds->mdlog->submit_entry(le, gather.new_sub()); mds->mdlog->flush(); gather.activate(); @@ -11166,11 +11186,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr) dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag << " bits " << info.bits << " on " << *diri << dendl; - - // journal commit - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), - info.basefrag, info.bits); - mds->mdlog->start_submit_entry(le); // tell peers CDir *first = *info.resultfrags.begin(); @@ -11199,9 +11214,6 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr) ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; - - // unmark, unfreeze - dir->state_clear(CDir::STATE_FRAGMENTING); for (CDir::map_t::iterator p = dir->items.begin(); p != dir->items.end(); @@ -11212,13 +11224,65 @@ void MDCache::fragment_logged_and_stored(MDRequest *mdr) dn->put(CDentry::PIN_FRAGMENTING); } + // unfreeze dir->unfreeze_dir(); } + // journal commit + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, + diri->ino(), info.basefrag, info.bits); + mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag, + info.resultfrags)); + fragment_requests.erase(mdr->reqid); request_finish(mdr); } +void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_committed " << basedirfrag << dendl; + assert(uncommitted_fragments.count(basedirfrag)); + ufragment &uf = uncommitted_fragments[basedirfrag]; + + // remove old frags + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags)); + + SnapContext nullsnapc; + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + for (list<frag_t>::iterator p = uf.old_frags.begin(); + p != uf.old_frags.end(); + ++p) { + object_t oid = CInode::get_object_name(basedirfrag.ino, *p, ""); + ObjectOperation op; + if (*p == frag_t()) { + // backtrace object + dout(10) << " truncate orphan dirfrag " << oid << dendl; + op.truncate(0); + } else { + dout(10) << " removing orphan dirfrag " << oid << dendl; + op.remove(); + } + mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context), + 0, NULL, gather.new_sub()); + } + + assert(gather.has_subs()); + gather.activate(); +} + +void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_finish " << basedirfrag << dendl; + assert(uncommitted_fragments.count(basedirfrag)); + + // unmark & auth_unpin + for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) { + (*p)->state_clear(CDir::STATE_FRAGMENTING); + (*p)->auth_unpin(this); + } + + finish_uncommitted_fragment(basedirfrag); +} /* This function DOES put the passed message before returning */ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) @@ -11264,23 +11328,40 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) notify->put(); } +void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags) +{ + dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl; + assert(!uncommitted_fragments.count(basedirfrag)); + ufragment& uf = uncommitted_fragments[basedirfrag]; + uf.old_frags = old_frags; + uf.bits = bits; +} + +void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag) +{ + dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag << dendl; + if (uncommitted_fragments.count(basedirfrag)) { + uncommitted_fragments.erase(basedirfrag); + } +} void MDCache::rollback_uncommitted_fragments() { dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl; - for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin(); + for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin(); p != uncommitted_fragments.end(); ++p) { + ufragment &uf = p->second; CInode *diri = get_inode(p->first.ino); assert(diri); - dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl; + dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl; list<CDir*> resultfrags; list<Context*> waiters; - adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true); + adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true); if (g_conf->mds_debug_frag) diri->verify_dirfrags(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits); mds->mdlog->start_submit_entry(le); } uncommitted_fragments.clear(); diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index cb219360ccf..8560ce481fb 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -942,10 +942,14 @@ protected: // -- fragmenting -- -public: - set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations - private: + struct ufragment { + int bits; + list<frag_t> old_frags; + ufragment() : bits(0) {} + }; + map<dirfrag_t, ufragment> uncommitted_fragments; + struct fragment_info_t { frag_t basefrag; int bits; @@ -981,6 +985,9 @@ private: void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs); void dispatch_fragment_dir(MDRequest *mdr); void fragment_logged_and_stored(MDRequest *mdr); + void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags); + void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags); + public: void rollback_uncommitted_fragments(); private: @@ -988,9 +995,12 @@ private: friend class C_MDC_FragmentFrozen; friend class C_MDC_FragmentMarking; friend class C_MDC_FragmentLoggedAndStored; + friend class C_MDC_FragmentCommit; void handle_fragment_notify(MMDSFragmentNotify *m); + void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag); + void finish_uncommitted_fragment(dirfrag_t basedirfrag); // -- updates -- //int send_inode_updates(CInode *in); diff --git a/src/mds/journal.cc b/src/mds/journal.cc index aeff07eb905..49fdb9ce5da 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -2381,6 +2381,7 @@ void EFragment::replay(MDS *mds) list<CDir*> resultfrags; list<Context*> waiters; + list<frag_t> old_frags; pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits); // in may be NULL if it wasn't in our cache yet. if it's a prepare @@ -2390,26 +2391,23 @@ void EFragment::replay(MDS *mds) switch (op) { case OP_PREPARE: - mds->mdcache->uncommitted_fragments.insert(desc); + mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, old_frags); // fall-thru case OP_ONESHOT: if (in) mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true); break; - case OP_COMMIT: - mds->mdcache->uncommitted_fragments.erase(desc); - break; - case OP_ROLLBACK: - if (mds->mdcache->uncommitted_fragments.count(desc)) { - mds->mdcache->uncommitted_fragments.erase(desc); - assert(in); + if (in) mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true); - } else { - dout(10) << " no record of prepare for " << desc << dendl; - } + // fall-thru + case OP_COMMIT: + mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag)); break; + + default: + assert(0); } metablob.replay(mds, _segment); if (in && g_conf->mds_debug_frag) |