diff options
Diffstat (limited to 'src/mds/MDCache.cc')
-rw-r--r-- | src/mds/MDCache.cc | 445 |
1 files changed, 336 insertions, 109 deletions
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index b1f1f0ad4c9..ae59c26ee13 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -1984,8 +1984,8 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob, } bool stop = false; - if (!pin->is_auth() || pin->is_ambiguous_auth()) { - dout(10) << "predirty_journal_parents !auth or ambig on " << *pin << dendl; + if (!pin->can_auth_pin() || pin->is_ambiguous_auth()) { + dout(10) << "predirty_journal_parents can't auth pin or ambig on " << *pin << dendl; stop = true; } @@ -2010,8 +2010,7 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob, if (!stop && mut->wrlocks.count(&pin->nestlock) == 0 && - (!pin->can_auth_pin() || - !pin->versionlock.can_wrlock() || // make sure we can take versionlock, too + (!pin->versionlock.can_wrlock() || // make sure we can take versionlock, too //true !mds->locker->wrlock_start(&pin->nestlock, static_cast<MDRequest*>(mut), true) // can cast only because i'm passing nowait=true )) { // ** do not initiate.. see above comment ** @@ -8663,9 +8662,9 @@ void MDCache::dispatch_request(MDRequest *mdr) mds->server->dispatch_slave_request(mdr); } else { switch (mdr->internal_op) { - - // ... - + case CEPH_MDS_OP_FRAGMENTDIR: + dispatch_fragment_dir(mdr); + break; default: assert(0); } @@ -10878,17 +10877,6 @@ public: } }; - -bool MDCache::can_fragment_lock(CInode *diri) -{ - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(7) << "can_fragment: can't wrlock dftlock" << dendl; - mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL); - return false; - } - return true; -} - bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs) { if (mds->mdsmap->is_degraded()) { @@ -10900,8 +10888,8 @@ bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs) dout(7) << "can_fragment: i won't merge|split anything in stray" << dendl; return false; } - if (diri->is_mdsdir() || diri->ino() == MDS_INO_CEPH) { - dout(7) << "can_fragment: i won't fragment the mdsdir or .ceph" << dendl; + if (diri->is_mdsdir() || diri->is_stray() || diri->ino() == MDS_INO_CEPH) { + dout(7) << "can_fragment: i won't fragment the mdsdir or straydir or .ceph" << dendl; return false; } @@ -10936,11 +10924,6 @@ void MDCache::split_dir(CDir *dir, int bits) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - mds->balancer->queue_split(dir); - return; - } C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); @@ -10968,18 +10951,13 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - //dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - //mds->mdbalancer->split_queue.insert(dir->dirfrag()); - return; - } CDir *first = dirs.front(); int bits = first->get_frag().bits() - frag.bits(); dout(10) << " we are merginb by " << bits << " bits" << dendl; C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentFrozen(this, dirs, frag, bits)); + new C_MDC_FragmentFrozen(this, dirs, frag, -bits)); fragment_freeze_dirs(dirs, gather); gather.activate(); @@ -11078,66 +11056,144 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs) } } -class C_MDC_FragmentLoggedAndStored : public Context { +class C_MDC_FragmentPrep : public Context { MDCache *mdcache; - Mutation *mut; + MDRequest *mdr; +public: + C_MDC_FragmentPrep(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} + virtual void finish(int r) { + mdcache->_fragment_logged(mdr); + } +}; + +class C_MDC_FragmentStore : public Context { + MDCache *mdcache; + MDRequest *mdr; +public: + C_MDC_FragmentStore(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} + virtual void finish(int r) { + mdcache->_fragment_stored(mdr); + } +}; + +class C_MDC_FragmentCommit : public Context { + MDCache *mdcache; + dirfrag_t basedirfrag; + list<CDir*> resultfrags; +public: + C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) : + mdcache(m), basedirfrag(ino, f) { + resultfrags.swap(l); + } + virtual void finish(int r) { + mdcache->_fragment_committed(basedirfrag, resultfrags); + } +}; + +class C_MDC_FragmentFinish : public Context { + MDCache *mdcache; + dirfrag_t basedirfrag; list<CDir*> resultfrags; - frag_t basefrag; - int bits; public: - C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) : - mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} + C_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) : + mdcache(m), basedirfrag(f) { + resultfrags.swap(l); + } virtual void finish(int r) { - mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits); + mdcache->_fragment_finish(basedirfrag, resultfrags); } }; void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) { - CInode *diri = dirs.front()->get_inode(); + dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits + << " on " << dirs.front()->get_inode() << dendl; - if (bits > 0) { + if (bits > 0) assert(dirs.size() == 1); - } else { - assert(bits < 0); - } + else if (bits < 0) + assert(dirs.size() > 1); + else + assert(0); - dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits - << " on " << *diri << dendl; + MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR); + fragment_info_t &info = fragment_requests[mdr->reqid]; + info.basefrag = basefrag; + info.bits = bits; + info.dirs = dirs; - // wrlock dirfragtreelock - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl; - fragment_unmark_unfreeze_dirs(dirs); - return; + dispatch_fragment_dir(mdr); +} + +void MDCache::dispatch_fragment_dir(MDRequest *mdr) +{ + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.dirs.front()->get_inode(); + + dout(10) << "dispatch_fragment_dir " << info.resultfrags << " " + << info.basefrag << " bits " << info.bits << " on " << *diri << dendl; + + // avoid freeze dir deadlock + if (!mdr->is_auth_pinned(diri)) { + if (!diri->can_auth_pin()) { + dout(10) << " can't auth_pin " << *diri << ", requeuing dir " + << info.dirs.front()->dirfrag() << dendl; + if (info.bits > 0) + mds->balancer->queue_split(info.dirs.front()); + else + mds->balancer->queue_merge(info.dirs.front()); + fragment_unmark_unfreeze_dirs(info.dirs); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); + return; + } + mdr->auth_pin(diri); } - diri->dirfragtreelock.get_wrlock(true); + set<SimpleLock*> rdlocks, wrlocks, xlocks; + wrlocks.insert(&diri->dirfragtreelock); // prevent a racing gather on any other scatterlocks too - diri->nestlock.get_wrlock(true); - diri->filelock.get_wrlock(true); + wrlocks.insert(&diri->nestlock); + wrlocks.insert(&diri->filelock); + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + + mdr->ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), + info.basefrag, info.bits); + mds->mdlog->start_entry(le); + + for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p) { + CDir *dir = *p; + dirfrag_rollback rollback; + rollback.fnode = dir->fnode; + le->add_orig_frag(dir->get_frag(), &rollback); + } // refragment - list<CDir*> resultfrags; list<Context*> waiters; - adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false); + adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits, + info.resultfrags, waiters, false); if (g_conf->mds_debug_frag) diri->verify_dirfrags(); mds->queue_waiters(waiters); - // journal - Mutation *mut = new Mutation; + for (list<frag_t>::iterator p = le->orig_frags.begin(); p != le->orig_frags.end(); ++p) + assert(!diri->dirfragtree.is_leaf(*p)); - mut->ls = mds->mdlog->get_current_segment(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - - le->metablob.add_dir_context(*resultfrags.begin()); + le->metablob.add_dir_context(*info.resultfrags.begin()); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); + ++p) { + le->metablob.add_dir(*p, false); + } // dft lock mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock); - mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); - mut->add_updated_lock(&diri->dirfragtreelock); + mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); + mdr->add_updated_lock(&diri->dirfragtreelock); /* // filelock @@ -11151,48 +11207,57 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) mut->add_updated_lock(&diri->nestlock); */ - // freeze, journal, and store resulting frags - C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentLoggedAndStored(this, mut, - resultfrags, basefrag, bits)); + add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags, mdr->ls); + mds->mdlog->submit_entry(le, new C_MDC_FragmentPrep(this, mdr)); + mds->mdlog->flush(); +} + +void MDCache::_fragment_logged(MDRequest *mdr) +{ + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.resultfrags.front()->get_inode(); + + dout(10) << "fragment_logged " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + // store resulting frags + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr)); + + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; - dout(10) << " result frag " << *dir << dendl; - le->metablob.add_dir(dir, false); + dout(10) << " storing result frag " << *dir << dendl; // freeze and store them too + dir->auth_pin(this); dir->state_set(CDir::STATE_FRAGMENTING); dir->commit(0, gather.new_sub(), true); // ignore authpinnability } - mds->mdlog->submit_entry(le, gather.new_sub()); - mds->mdlog->flush(); gather.activate(); } -void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits) +void MDCache::_fragment_stored(MDRequest *mdr) { - CInode *diri = resultfrags.front()->get_inode(); + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.resultfrags.front()->get_inode(); - dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits - << " on " << *diri << dendl; - - // journal commit - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - mds->mdlog->submit_entry(le); + dout(10) << "fragment_stored " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; // tell peers - CDir *first = *resultfrags.begin(); + CDir *first = *info.resultfrags.begin(); for (map<int,int>::iterator p = first->replica_map.begin(); p != first->replica_map.end(); ++p) { if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN) continue; - MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits); + MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits); /* // freshly replicate new dirs to peers @@ -11203,26 +11268,15 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags mds->send_message_mds(notify, p->first); } - mut->apply(); // mark scatterlock - mds->locker->drop_locks(mut); - mut->cleanup(); - delete mut; - - // drop dft wrlock - bool need_issue = false; - mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue); + mdr->apply(); // mark scatterlock + mds->locker->drop_locks(mdr); // unfreeze resulting frags - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; - - // unmark, unfreeze - dir->state_clear(CDir::STATE_FRAGMENTING); for (CDir::map_t::iterator p = dir->items.begin(); p != dir->items.end(); @@ -11233,13 +11287,72 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags dn->put(CDentry::PIN_FRAGMENTING); } + // unfreeze dir->unfreeze_dir(); } - if (need_issue) - mds->locker->issue_caps(diri); + // journal commit + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, + diri->ino(), info.basefrag, info.bits); + mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag, + info.resultfrags)); + + fragment_requests.erase(it); + request_finish(mdr); +} + +void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_committed " << basedirfrag << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + assert(it != uncommitted_fragments.end()); + ufragment &uf = it->second; + + // remove old frags + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags)); + + SnapContext nullsnapc; + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + for (list<frag_t>::iterator p = uf.old_frags.begin(); + p != uf.old_frags.end(); + ++p) { + object_t oid = CInode::get_object_name(basedirfrag.ino, *p, ""); + ObjectOperation op; + if (*p == frag_t()) { + // backtrace object + dout(10) << " truncate orphan dirfrag " << oid << dendl; + op.truncate(0); + } else { + dout(10) << " removing orphan dirfrag " << oid << dendl; + op.remove(); + } + mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context), + 0, NULL, gather.new_sub()); + } + + assert(gather.has_subs()); + gather.activate(); } +void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_finish " << basedirfrag << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + assert(it != uncommitted_fragments.end()); + ufragment &uf = it->second; + + // unmark & auth_unpin + for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) { + (*p)->state_clear(CDir::STATE_FRAGMENTING); + (*p)->auth_unpin(this); + } + + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, + basedirfrag.ino, basedirfrag.frag, uf.bits); + mds->mdlog->start_submit_entry(le); + + finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH); +} /* This function DOES put the passed message before returning */ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) @@ -11285,26 +11398,140 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) notify->put(); } +void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags, + LogSegment *ls, bufferlist *rollback) +{ + dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl; + assert(!uncommitted_fragments.count(basedirfrag)); + ufragment& uf = uncommitted_fragments[basedirfrag]; + uf.old_frags = old_frags; + uf.bits = bits; + uf.ls = ls; + ls->uncommitted_fragments.insert(basedirfrag); + if (rollback) + uf.rollback.swap(*rollback); +} + +void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag, int op) +{ + dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag + << " op " << EFragment::op_name(op) << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + if (it != uncommitted_fragments.end()) { + ufragment& uf = it->second; + if (op != EFragment::OP_FINISH && !uf.old_frags.empty()) { + uf.committed = true; + } else { + uf.ls->uncommitted_fragments.erase(basedirfrag); + mds->queue_waiters(uf.waiters); + uncommitted_fragments.erase(it); + } + } +} + +void MDCache::rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags) +{ + dout(10) << "rollback_uncommitted_fragment: base dirfrag " << basedirfrag + << " old_frags (" << old_frags << ")" << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + if (it != uncommitted_fragments.end()) { + ufragment& uf = it->second; + if (!uf.old_frags.empty()) { + uf.old_frags.swap(old_frags); + uf.committed = true; + } else { + uf.ls->uncommitted_fragments.erase(basedirfrag); + uncommitted_fragments.erase(it); + } + } +} void MDCache::rollback_uncommitted_fragments() { dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl; - for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin(); + for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin(); p != uncommitted_fragments.end(); ++p) { + ufragment &uf = p->second; CInode *diri = get_inode(p->first.ino); assert(diri); - dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl; + + if (uf.committed) { + list<CDir*> frags; + diri->get_dirfrags_under(p->first.frag, frags); + for (list<CDir*>::iterator q = frags.begin(); q != frags.end(); ++q) { + CDir *dir = *q; + dir->auth_pin(this); + dir->state_set(CDir::STATE_FRAGMENTING); + } + _fragment_committed(p->first, frags); + continue; + } + + dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl; + + LogSegment *ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits); + mds->mdlog->start_entry(le); + + list<frag_t> old_frags; + diri->dirfragtree.get_leaves_under(p->first.frag, old_frags); + list<CDir*> resultfrags; - list<Context*> waiters; - adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true); + if (uf.old_frags.empty()) { + // created by old format EFragment + list<Context*> waiters; + adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true); + } else { + bufferlist::iterator bp = uf.rollback.begin(); + for (list<frag_t>::iterator q = uf.old_frags.begin(); q != uf.old_frags.end(); ++q) { + CDir *dir = force_dir_fragment(diri, *q); + resultfrags.push_back(dir); + + dirfrag_rollback rollback; + ::decode(rollback, bp); + + dir->set_version(rollback.fnode.version); + dir->fnode = rollback.fnode; + + dir->_mark_dirty(ls); + + if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) { + dout(10) << " dirty nestinfo on " << *dir << dendl; + mds->locker->mark_updated_scatterlock(&dir->inode->nestlock); + ls->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest); + dir->get_inode()->nestlock.mark_dirty(); + } + if (!(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) { + dout(10) << " dirty fragstat on " << *dir << dendl; + mds->locker->mark_updated_scatterlock(&dir->inode->filelock); + ls->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir); + dir->get_inode()->filelock.mark_dirty(); + } + + le->add_orig_frag(dir->get_frag()); + le->metablob.add_dir_context(dir); + le->metablob.add_dir(dir, true); + } + } + if (g_conf->mds_debug_frag) diri->verify_dirfrags(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second); - mds->mdlog->start_submit_entry(le); + for (list<frag_t>::iterator q = old_frags.begin(); q != old_frags.end(); ++q) + assert(!diri->dirfragtree.is_leaf(*q)); + + for (list<CDir*>::iterator q = resultfrags.begin(); q != resultfrags.end(); ++q) { + CDir *dir = *q; + dir->auth_pin(this); + dir->state_set(CDir::STATE_FRAGMENTING); + } + + mds->mdlog->submit_entry(le); + + uf.old_frags.swap(old_frags); + _fragment_committed(p->first, resultfrags); } - uncommitted_fragments.clear(); } |