summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYan, Zheng <zheng.z.yan@intel.com>2013-09-19 09:55:31 +0800
committerYan, Zheng <zheng.z.yan@intel.com>2013-10-04 17:29:20 +0800
commitcd4cd3f27ce6957901fdddc71f68f50d529546ea (patch)
tree5e45ba2c9f0be9859e51a44c5720e1d0c740df21
parentd3ba8da597384516c85a9af928971a585843aeb4 (diff)
downloadceph-cd4cd3f27ce6957901fdddc71f68f50d529546ea.tar.gz
mds: start internal MDS request for fragmentating directory
Start internal MDS request for fragmentating directory operation. With MDS request, we can easily acquire locks required by the fragmentating directory operation. (The old way to get locks is 'try lock' style, which is not reliable) Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
-rw-r--r--src/common/ceph_strings.cc1
-rw-r--r--src/include/ceph_fs.h3
-rw-r--r--src/mds/MDCache.cc167
-rw-r--r--src/mds/MDCache.h12
4 files changed, 95 insertions, 88 deletions
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc
index 47648ce19b3..221fb059740 100644
--- a/src/common/ceph_strings.cc
+++ b/src/common/ceph_strings.cc
@@ -183,6 +183,7 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_RMSNAP: return "rmsnap";
case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
+ case CEPH_MDS_OP_FRAGMENTDIR: return "fragmentdir";
}
return "???";
}
diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
index ba0b5eb0f19..47ec1f14f6e 100644
--- a/src/include/ceph_fs.h
+++ b/src/include/ceph_fs.h
@@ -333,6 +333,9 @@ enum {
CEPH_MDS_OP_MKSNAP = 0x01400,
CEPH_MDS_OP_RMSNAP = 0x01401,
CEPH_MDS_OP_LSSNAP = 0x00402,
+
+ // internal op
+ CEPH_MDS_OP_FRAGMENTDIR= 0x01500,
};
extern const char *ceph_mds_op_name(int op);
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index 9dc1229fbb9..91fadb73df9 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -8666,9 +8666,9 @@ void MDCache::dispatch_request(MDRequest *mdr)
mds->server->dispatch_slave_request(mdr);
} else {
switch (mdr->internal_op) {
-
- // ...
-
+ case CEPH_MDS_OP_FRAGMENTDIR:
+ dispatch_fragment_dir(mdr);
+ break;
default:
assert(0);
}
@@ -10862,17 +10862,6 @@ public:
}
};
-
-bool MDCache::can_fragment_lock(CInode *diri)
-{
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(7) << "can_fragment: can't wrlock dftlock" << dendl;
- mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL);
- return false;
- }
- return true;
-}
-
bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
{
if (mds->mdsmap->is_degraded()) {
@@ -10920,11 +10909,6 @@ void MDCache::split_dir(CDir *dir, int bits)
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- mds->balancer->queue_split(dir);
- return;
- }
C_GatherBuilder gather(g_ceph_context,
new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits));
@@ -10952,11 +10936,6 @@ void MDCache::merge_dir(CInode *diri, frag_t frag)
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- //dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- //mds->mdbalancer->split_queue.insert(dir->dirfrag());
- return;
- }
CDir *first = dirs.front();
int bits = first->get_frag().bits() - frag.bits();
@@ -11064,64 +11043,88 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs)
class C_MDC_FragmentLoggedAndStored : public Context {
MDCache *mdcache;
- Mutation *mut;
- list<CDir*> resultfrags;
- frag_t basefrag;
- int bits;
+ MDRequest *mdr;
public:
- C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
- mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
+ C_MDC_FragmentLoggedAndStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
virtual void finish(int r) {
- mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits);
+ mdcache->fragment_logged_and_stored(mdr);
}
};
void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
{
- CInode *diri = dirs.front()->get_inode();
+ dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
+ << " on " << dirs.front()->get_inode() << dendl;
- if (bits > 0) {
+ if (bits > 0)
assert(dirs.size() == 1);
- } else {
- assert(bits < 0);
- }
+ else if (bits < 0)
+ assert(dirs.size() > 1);
+ else
+ assert(0);
- dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
- << " on " << *diri << dendl;
+ MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ info.basefrag = basefrag;
+ info.bits = bits;
+ info.dirs = dirs;
- // wrlock dirfragtreelock
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl;
- fragment_unmark_unfreeze_dirs(dirs);
- return;
+ dispatch_fragment_dir(mdr);
+}
+
+void MDCache::dispatch_fragment_dir(MDRequest *mdr)
+{
+ assert(fragment_requests.count(mdr->reqid));
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ CInode *diri = info.dirs.front()->get_inode();
+
+ dout(10) << "dispatch_fragment_dir " << info.resultfrags << " "
+ << info.basefrag << " bits " << info.bits << " on " << *diri << dendl;
+
+ // avoid freeze dir deadlock
+ if (!mdr->is_auth_pinned(diri)) {
+ if (!diri->can_auth_pin()) {
+ dout(10) << " can't auth_pin " << *diri << ", requeuing dir "
+ << info.dirs.front()->dirfrag() << dendl;
+ if (info.bits > 0)
+ mds->balancer->queue_split(info.dirs.front());
+ else
+ mds->balancer->queue_merge(info.dirs.front());
+ fragment_unmark_unfreeze_dirs(info.dirs);
+ fragment_requests.erase(mdr->reqid);
+ request_finish(mdr);
+ return;
+ }
+ mdr->auth_pin(diri);
}
- diri->dirfragtreelock.get_wrlock(true);
+ set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ wrlocks.insert(&diri->dirfragtreelock);
// prevent a racing gather on any other scatterlocks too
- diri->nestlock.get_wrlock(true);
- diri->filelock.get_wrlock(true);
+ wrlocks.insert(&diri->nestlock);
+ wrlocks.insert(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
+ mdr->ls = mds->mdlog->get_current_segment();
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(),
+ info.basefrag, info.bits);
+ mds->mdlog->start_entry(le);
// refragment
- list<CDir*> resultfrags;
list<Context*> waiters;
- adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false);
+ adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
+ info.resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
mds->queue_waiters(waiters);
- // journal
- Mutation *mut = new Mutation;
-
- mut->ls = mds->mdlog->get_current_segment();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
-
- le->metablob.add_dir_context(*resultfrags.begin());
+ le->metablob.add_dir_context(*info.resultfrags.begin());
// dft lock
mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock);
- mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
- mut->add_updated_lock(&diri->dirfragtreelock);
+ mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
+ mdr->add_updated_lock(&diri->dirfragtreelock);
/*
// filelock
@@ -11136,12 +11139,10 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
*/
// freeze, journal, and store resulting frags
- C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentLoggedAndStored(this, mut,
- resultfrags, basefrag, bits));
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentLoggedAndStored(this, mdr));
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
@@ -11157,26 +11158,28 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
gather.activate();
}
-void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
+void MDCache::fragment_logged_and_stored(MDRequest *mdr)
{
- CInode *diri = resultfrags.front()->get_inode();
+ assert(fragment_requests.count(mdr->reqid));
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ CInode *diri = info.resultfrags.front()->get_inode();
- dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits
- << " on " << *diri << dendl;
+ dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag
+ << " bits " << info.bits << " on " << *diri << dendl;
// journal commit
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
- mds->mdlog->submit_entry(le);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(),
+ info.basefrag, info.bits);
+ mds->mdlog->start_submit_entry(le);
// tell peers
- CDir *first = *resultfrags.begin();
+ CDir *first = *info.resultfrags.begin();
for (map<int,int>::iterator p = first->replica_map.begin();
p != first->replica_map.end();
++p) {
if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN)
continue;
- MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits);
+ MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits);
/*
// freshly replicate new dirs to peers
@@ -11187,20 +11190,12 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags
mds->send_message_mds(notify, p->first);
}
- mut->apply(); // mark scatterlock
- mds->locker->drop_locks(mut);
- mut->cleanup();
- delete mut;
-
- // drop dft wrlock
- bool need_issue = false;
- mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue);
+ mdr->apply(); // mark scatterlock
+ mds->locker->drop_locks(mdr);
// unfreeze resulting frags
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
@@ -11220,8 +11215,8 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags
dir->unfreeze_dir();
}
- if (need_issue)
- mds->locker->issue_caps(diri);
+ fragment_requests.erase(mdr->reqid);
+ request_finish(mdr);
}
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index d8f2a9486fb..cb219360ccf 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -946,6 +946,14 @@ public:
set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations
private:
+ struct fragment_info_t {
+ frag_t basefrag;
+ int bits;
+ list<CDir*> dirs;
+ list<CDir*> resultfrags;
+ };
+ map<metareqid_t, fragment_info_t> fragment_requests;
+
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& frags, list<Context*>& waiters, bool replay);
void adjust_dir_fragments(CInode *diri,
@@ -960,7 +968,6 @@ private:
friend class EFragment;
- bool can_fragment_lock(CInode *diri);
bool can_fragment(CInode *diri, list<CDir*>& dirs);
public:
@@ -972,7 +979,8 @@ private:
void fragment_mark_and_complete(list<CDir*>& dirs);
void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits);
void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
- void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
+ void dispatch_fragment_dir(MDRequest *mdr);
+ void fragment_logged_and_stored(MDRequest *mdr);
public:
void rollback_uncommitted_fragments();
private: