From cd4cd3f27ce6957901fdddc71f68f50d529546ea Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 19 Sep 2013 09:55:31 +0800 Subject: mds: start internal MDS request for fragmentating directory Start internal MDS request for fragmentating directory operation. With MDS request, we can easily acquire locks required by the fragmentating directory operation. (The old way to get locks is 'try lock' style, which is not reliable) Signed-off-by: Yan, Zheng --- src/common/ceph_strings.cc | 1 + src/include/ceph_fs.h | 3 + src/mds/MDCache.cc | 167 ++++++++++++++++++++++----------------------- src/mds/MDCache.h | 12 +++- 4 files changed, 95 insertions(+), 88 deletions(-) diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc index 47648ce19b3..221fb059740 100644 --- a/src/common/ceph_strings.cc +++ b/src/common/ceph_strings.cc @@ -183,6 +183,7 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_RMSNAP: return "rmsnap"; case CEPH_MDS_OP_SETFILELOCK: return "setfilelock"; case CEPH_MDS_OP_GETFILELOCK: return "getfilelock"; + case CEPH_MDS_OP_FRAGMENTDIR: return "fragmentdir"; } return "???"; } diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index ba0b5eb0f19..47ec1f14f6e 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -333,6 +333,9 @@ enum { CEPH_MDS_OP_MKSNAP = 0x01400, CEPH_MDS_OP_RMSNAP = 0x01401, CEPH_MDS_OP_LSSNAP = 0x00402, + + // internal op + CEPH_MDS_OP_FRAGMENTDIR= 0x01500, }; extern const char *ceph_mds_op_name(int op); diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 9dc1229fbb9..91fadb73df9 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -8666,9 +8666,9 @@ void MDCache::dispatch_request(MDRequest *mdr) mds->server->dispatch_slave_request(mdr); } else { switch (mdr->internal_op) { - - // ... - + case CEPH_MDS_OP_FRAGMENTDIR: + dispatch_fragment_dir(mdr); + break; default: assert(0); } @@ -10862,17 +10862,6 @@ public: } }; - -bool MDCache::can_fragment_lock(CInode *diri) -{ - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(7) << "can_fragment: can't wrlock dftlock" << dendl; - mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL); - return false; - } - return true; -} - bool MDCache::can_fragment(CInode *diri, list& dirs) { if (mds->mdsmap->is_degraded()) { @@ -10920,11 +10909,6 @@ void MDCache::split_dir(CDir *dir, int bits) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - mds->balancer->queue_split(dir); - return; - } C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); @@ -10952,11 +10936,6 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - //dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - //mds->mdbalancer->split_queue.insert(dir->dirfrag()); - return; - } CDir *first = dirs.front(); int bits = first->get_frag().bits() - frag.bits(); @@ -11064,64 +11043,88 @@ void MDCache::fragment_unmark_unfreeze_dirs(list& dirs) class C_MDC_FragmentLoggedAndStored : public Context { MDCache *mdcache; - Mutation *mut; - list resultfrags; - frag_t basefrag; - int bits; + MDRequest *mdr; public: - C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list& r, frag_t bf, int bi) : - mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} + C_MDC_FragmentLoggedAndStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} virtual void finish(int r) { - mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits); + mdcache->fragment_logged_and_stored(mdr); } }; void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) { - CInode *diri = dirs.front()->get_inode(); + dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits + << " on " << dirs.front()->get_inode() << dendl; - if (bits > 0) { + if (bits > 0) assert(dirs.size() == 1); - } else { - assert(bits < 0); - } + else if (bits < 0) + assert(dirs.size() > 1); + else + assert(0); - dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits - << " on " << *diri << dendl; + MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR); + fragment_info_t &info = fragment_requests[mdr->reqid]; + info.basefrag = basefrag; + info.bits = bits; + info.dirs = dirs; - // wrlock dirfragtreelock - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl; - fragment_unmark_unfreeze_dirs(dirs); - return; + dispatch_fragment_dir(mdr); +} + +void MDCache::dispatch_fragment_dir(MDRequest *mdr) +{ + assert(fragment_requests.count(mdr->reqid)); + fragment_info_t &info = fragment_requests[mdr->reqid]; + CInode *diri = info.dirs.front()->get_inode(); + + dout(10) << "dispatch_fragment_dir " << info.resultfrags << " " + << info.basefrag << " bits " << info.bits << " on " << *diri << dendl; + + // avoid freeze dir deadlock + if (!mdr->is_auth_pinned(diri)) { + if (!diri->can_auth_pin()) { + dout(10) << " can't auth_pin " << *diri << ", requeuing dir " + << info.dirs.front()->dirfrag() << dendl; + if (info.bits > 0) + mds->balancer->queue_split(info.dirs.front()); + else + mds->balancer->queue_merge(info.dirs.front()); + fragment_unmark_unfreeze_dirs(info.dirs); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); + return; + } + mdr->auth_pin(diri); } - diri->dirfragtreelock.get_wrlock(true); + set rdlocks, wrlocks, xlocks; + wrlocks.insert(&diri->dirfragtreelock); // prevent a racing gather on any other scatterlocks too - diri->nestlock.get_wrlock(true); - diri->filelock.get_wrlock(true); + wrlocks.insert(&diri->nestlock); + wrlocks.insert(&diri->filelock); + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + + mdr->ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), + info.basefrag, info.bits); + mds->mdlog->start_entry(le); // refragment - list resultfrags; list waiters; - adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false); + adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits, + info.resultfrags, waiters, false); if (g_conf->mds_debug_frag) diri->verify_dirfrags(); mds->queue_waiters(waiters); - // journal - Mutation *mut = new Mutation; - - mut->ls = mds->mdlog->get_current_segment(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - - le->metablob.add_dir_context(*resultfrags.begin()); + le->metablob.add_dir_context(*info.resultfrags.begin()); // dft lock mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock); - mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); - mut->add_updated_lock(&diri->dirfragtreelock); + mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); + mdr->add_updated_lock(&diri->dirfragtreelock); /* // filelock @@ -11136,12 +11139,10 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) */ // freeze, journal, and store resulting frags - C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentLoggedAndStored(this, mut, - resultfrags, basefrag, bits)); + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentLoggedAndStored(this, mdr)); - for (list::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; @@ -11157,26 +11158,28 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) gather.activate(); } -void MDCache::fragment_logged_and_stored(Mutation *mut, list& resultfrags, frag_t basefrag, int bits) +void MDCache::fragment_logged_and_stored(MDRequest *mdr) { - CInode *diri = resultfrags.front()->get_inode(); + assert(fragment_requests.count(mdr->reqid)); + fragment_info_t &info = fragment_requests[mdr->reqid]; + CInode *diri = info.resultfrags.front()->get_inode(); - dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits - << " on " << *diri << dendl; + dout(10) << "fragment_logged_and_stored " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; // journal commit - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - mds->mdlog->submit_entry(le); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), + info.basefrag, info.bits); + mds->mdlog->start_submit_entry(le); // tell peers - CDir *first = *resultfrags.begin(); + CDir *first = *info.resultfrags.begin(); for (map::iterator p = first->replica_map.begin(); p != first->replica_map.end(); ++p) { if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN) continue; - MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits); + MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits); /* // freshly replicate new dirs to peers @@ -11187,20 +11190,12 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list& resultfrags mds->send_message_mds(notify, p->first); } - mut->apply(); // mark scatterlock - mds->locker->drop_locks(mut); - mut->cleanup(); - delete mut; - - // drop dft wrlock - bool need_issue = false; - mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue); + mdr->apply(); // mark scatterlock + mds->locker->drop_locks(mdr); // unfreeze resulting frags - for (list::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; @@ -11220,8 +11215,8 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list& resultfrags dir->unfreeze_dir(); } - if (need_issue) - mds->locker->issue_caps(diri); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index d8f2a9486fb..cb219360ccf 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -946,6 +946,14 @@ public: set< pair > uncommitted_fragments; // prepared but uncommitted refragmentations private: + struct fragment_info_t { + frag_t basefrag; + int bits; + list dirs; + list resultfrags; + }; + map fragment_requests; + void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits, list& frags, list& waiters, bool replay); void adjust_dir_fragments(CInode *diri, @@ -960,7 +968,6 @@ private: friend class EFragment; - bool can_fragment_lock(CInode *diri); bool can_fragment(CInode *diri, list& dirs); public: @@ -972,7 +979,8 @@ private: void fragment_mark_and_complete(list& dirs); void fragment_frozen(list& dirs, frag_t basefrag, int bits); void fragment_unmark_unfreeze_dirs(list& dirs); - void fragment_logged_and_stored(Mutation *mut, list& resultfrags, frag_t basefrag, int bits); + void dispatch_fragment_dir(MDRequest *mdr); + void fragment_logged_and_stored(MDRequest *mdr); public: void rollback_uncommitted_fragments(); private: -- cgit v1.2.1