diff options
-rwxr-xr-x | qa/workunits/misc/dirfrag.sh | 48 | ||||
-rw-r--r-- | src/client/Client.cc | 46 | ||||
-rw-r--r-- | src/client/Client.h | 6 | ||||
-rw-r--r-- | src/client/MetaRequest.h | 1 | ||||
-rw-r--r-- | src/common/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/ceph_fs.h | 3 | ||||
-rw-r--r-- | src/include/frag.h | 2 | ||||
-rw-r--r-- | src/mds/CDir.cc | 100 | ||||
-rw-r--r-- | src/mds/CDir.h | 1 | ||||
-rw-r--r-- | src/mds/CInode.cc | 16 | ||||
-rw-r--r-- | src/mds/LogSegment.h | 1 | ||||
-rw-r--r-- | src/mds/MDBalancer.cc | 12 | ||||
-rw-r--r-- | src/mds/MDCache.cc | 445 | ||||
-rw-r--r-- | src/mds/MDCache.h | 59 | ||||
-rw-r--r-- | src/mds/Server.cc | 21 | ||||
-rw-r--r-- | src/mds/events/EFragment.h | 22 | ||||
-rw-r--r-- | src/mds/journal.cc | 63 |
17 files changed, 611 insertions, 236 deletions
diff --git a/qa/workunits/misc/dirfrag.sh b/qa/workunits/misc/dirfrag.sh new file mode 100755 index 00000000000..393667427fd --- /dev/null +++ b/qa/workunits/misc/dirfrag.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +set -e + +DEPTH=5 +COUNT=10000 + +kill_jobs() { + jobs -p | xargs kill +} +trap kill_jobs INT + +create_files() { + for i in `seq 1 $COUNT` + do + touch file$i + done +} + +delete_files() { + for i in `ls -f` + do + if [[ ${i}a = file*a ]] + then + rm -f $i + fi + done +} + +rm -rf testdir +mkdir testdir +cd testdir + +for i in `seq 1 $DEPTH`; do + mkdir dir$i + cd dir$i + create_files & +done +wait + +for i in `seq 1 $DEPTH`; do + delete_files & + cd .. +done +wait + +cd .. +rm -rf testdir diff --git a/src/client/Client.cc b/src/client/Client.cc index 60a5e4550b8..20651892c0c 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -818,16 +818,28 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, ::decode(end, p); ::decode(complete, p); + frag_t fg = request->readdir_frag; + uint64_t readdir_offset = request->readdir_offset; + string readdir_start = request->readdir_start; + if (fg != dst.frag) { + ldout(cct, 10) << "insert_trace got new frag " << fg << " -> " << dst.frag << dendl; + fg = dst.frag; + if (fg.is_leftmost()) + readdir_offset = 2; + else + readdir_offset = 0; + readdir_start.clear(); + } + ldout(cct, 10) << "insert_trace " << numdn << " readdir items, end=" << (int)end - << ", offset " << request->readdir_offset - << ", readdir_start " << request->readdir_start << dendl; + << ", offset " << readdir_offset + << ", readdir_start " << readdir_start << dendl; + request->readdir_reply_frag = fg; request->readdir_end = end; request->readdir_num = numdn; - map<string,Dentry*>::iterator pd = dir->dentry_map.upper_bound(request->readdir_start); - - frag_t fg = request->readdir_frag; + map<string,Dentry*>::iterator pd = dir->dentry_map.upper_bound(readdir_start); string dname; LeaseStat dlease; @@ -878,7 +890,7 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, dn = link(dir, dname, in, NULL); } update_dentry_lease(dn, &dlease, request->sent_stamp, session); - dn->offset = dir_result_t::make_fpos(request->readdir_frag, i + request->readdir_offset); + dn->offset = dir_result_t::make_fpos(fg, i + readdir_offset); // add to cached result list in->get(); @@ -5016,8 +5028,16 @@ int Client::_readdir_get_frag(dir_result_t *dirp) dirp->buffer = new vector<pair<string,Inode*> >; dirp->buffer->swap(req->readdir_result); - dirp->buffer_frag = fg; + if (fg != req->readdir_reply_frag) { + fg = req->readdir_reply_frag; + if (fg.is_leftmost()) + dirp->next_offset = 2; + else + dirp->next_offset = 0; + dirp->offset = dir_result_t::make_fpos(fg, dirp->next_offset); + } + dirp->buffer_frag = fg; dirp->this_offset = dirp->next_offset; ldout(cct, 10) << "_readdir_get_frag " << dirp << " got frag " << dirp->buffer_frag << " this_offset " << dirp->this_offset @@ -5196,14 +5216,18 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p) int r = _readdir_get_frag(dirp); if (r) return r; + // _readdir_get_frag () may updates dirp->offset if the replied dirfrag is + // different than the requested one. (our dirfragtree was outdated) fg = dirp->buffer_frag; + off = dirp->fragpos(); } ldout(cct, 10) << "off " << off << " this_offset " << hex << dirp->this_offset << dec << " size " << dirp->buffer->size() << " frag " << fg << dendl; + + dirp->offset = dir_result_t::make_fpos(fg, off); while (off >= dirp->this_offset && off - dirp->this_offset < dirp->buffer->size()) { - uint64_t pos = dir_result_t::make_fpos(fg, off); pair<string,Inode*>& ent = (*dirp->buffer)[off - dirp->this_offset]; int stmask = fill_stat(ent.second, &st); @@ -5219,7 +5243,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p) return r; off++; - dirp->offset = pos + 1; + dirp->offset++; } if (dirp->last_name.length()) { @@ -5230,10 +5254,10 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p) if (!fg.is_rightmost()) { // next frag! - dirp->next_frag(); - off = 0; + _readdir_next_frag(dirp); ldout(cct, 10) << " advancing to next frag: " << fg << " -> " << dirp->frag() << dendl; fg = dirp->frag(); + off = 0; continue; } diff --git a/src/client/Client.h b/src/client/Client.h index df59f235de4..649bacc5ba6 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -137,7 +137,7 @@ struct dir_result_t { return ((uint64_t)frag << SHIFT) | (uint64_t)off; } static unsigned fpos_frag(uint64_t p) { - return p >> SHIFT; + return (p & ~END) >> SHIFT; } static unsigned fpos_off(uint64_t p) { return p & MASK; @@ -176,8 +176,8 @@ struct dir_result_t { offset = (uint64_t)f << SHIFT; assert(sizeof(offset) == 8); } - void set_end() { offset = END; } - bool at_end() { return (offset == END); } + void set_end() { offset |= END; } + bool at_end() { return (offset & END); } void reset() { last_name.clear(); diff --git a/src/client/MetaRequest.h b/src/client/MetaRequest.h index 036b4154e0c..5583cd16281 100644 --- a/src/client/MetaRequest.h +++ b/src/client/MetaRequest.h @@ -57,6 +57,7 @@ public: string readdir_start; // starting _after_ this name uint64_t readdir_offset; + frag_t readdir_reply_frag; vector<pair<string,Inode*> > readdir_result; bool readdir_end; int readdir_num; diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc index 47648ce19b3..221fb059740 100644 --- a/src/common/ceph_strings.cc +++ b/src/common/ceph_strings.cc @@ -183,6 +183,7 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_RMSNAP: return "rmsnap"; case CEPH_MDS_OP_SETFILELOCK: return "setfilelock"; case CEPH_MDS_OP_GETFILELOCK: return "getfilelock"; + case CEPH_MDS_OP_FRAGMENTDIR: return "fragmentdir"; } return "???"; } diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index ba0b5eb0f19..47ec1f14f6e 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -333,6 +333,9 @@ enum { CEPH_MDS_OP_MKSNAP = 0x01400, CEPH_MDS_OP_RMSNAP = 0x01401, CEPH_MDS_OP_LSSNAP = 0x00402, + + // internal op + CEPH_MDS_OP_FRAGMENTDIR= 0x01500, }; extern const char *ceph_mds_op_name(int op); diff --git a/src/include/frag.h b/src/include/frag.h index 715eb098283..fbe5b43f8cb 100644 --- a/src/include/frag.h +++ b/src/include/frag.h @@ -285,7 +285,7 @@ public: */ void get_leaves_under(frag_t x, std::list<frag_t>& ls) const { std::list<frag_t> q; - q.push_back(get_branch(x)); + q.push_back(get_branch_or_leaf(x)); while (!q.empty()) { frag_t t = q.front(); q.pop_front(); diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index 4a5e636d9a6..2c985e4775d 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -786,8 +786,10 @@ void CDir::prepare_old_fragment(bool replay) void CDir::prepare_new_fragment(bool replay) { - if (!replay && is_auth()) + if (!replay && is_auth()) { _freeze_dir(); + mark_complete(); + } } void CDir::finish_old_fragment(list<Context*>& waiters, bool replay) @@ -856,11 +858,16 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool repl double fac = 1.0 / (double)(1 << bits); // for scaling load vecs - nest_info_t olddiff; // old += f - af; - dout(10) << " rstat " << fnode.rstat << dendl; - dout(10) << " accounted_rstat " << fnode.accounted_rstat << dendl; - olddiff.add_delta(fnode.rstat, fnode.accounted_rstat); - dout(10) << " olddiff " << olddiff << dendl; + dout(15) << " rstat " << fnode.rstat << dendl; + dout(15) << " accounted_rstat " << fnode.accounted_rstat << dendl; + nest_info_t rstatdiff; + rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat); + dout(15) << " fragstat " << fnode.fragstat << dendl; + dout(15) << " accounted_fragstat " << fnode.accounted_fragstat << dendl; + frag_info_t fragstatdiff; + bool touched_mtime; + fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat, touched_mtime); + dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl; prepare_old_fragment(replay); @@ -905,27 +912,24 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool repl f->steal_dentry(dn); } + // FIXME: handle dirty old rstat + // fix up new frag fragstats - bool stale_fragstat = fnode.fragstat.version != fnode.accounted_fragstat.version; - bool stale_rstat = fnode.rstat.version != fnode.accounted_rstat.version; for (int i=0; i<n; i++) { - subfrags[i]->fnode.fragstat.version = fnode.fragstat.version; - subfrags[i]->fnode.accounted_fragstat = subfrags[i]->fnode.fragstat; - if (i == 0) { - if (stale_fragstat) - subfrags[0]->fnode.accounted_fragstat.version--; - if (stale_rstat) - subfrags[0]->fnode.accounted_rstat.version--; - } - dout(10) << " fragstat " << subfrags[i]->fnode.fragstat << " on " << *subfrags[i] << dendl; + CDir *f = subfrags[i]; + f->fnode.rstat.version = fnode.rstat.version; + f->fnode.accounted_rstat = f->fnode.rstat; + f->fnode.fragstat.version = fnode.fragstat.version; + f->fnode.accounted_fragstat = f->fnode.fragstat; + dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat + << " on " << *f << dendl; } // give any outstanding frag stat differential to first frag - // af[0] -= olddiff - dout(10) << "giving olddiff " << olddiff << " to " << *subfrags[0] << dendl; - nest_info_t zero; - subfrags[0]->fnode.accounted_rstat.add_delta(zero, olddiff); - dout(10) << " " << subfrags[0]->fnode.accounted_fragstat << dendl; + dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff + << " to " << *subfrags[0] << dendl; + subfrags[0]->fnode.accounted_rstat.add(rstatdiff); + subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff); finish_old_fragment(waiters, replay); } @@ -936,15 +940,23 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay) prepare_new_fragment(replay); - // see if _any_ of the source frags have stale fragstat or rstat - int stale_rstat = 0; - int stale_fragstat = 0; + nest_info_t rstatdiff; + frag_info_t fragstatdiff; + bool touched_mtime; + version_t rstat_version = inode->get_projected_inode()->rstat.version; + version_t dirstat_version = inode->get_projected_inode()->dirstat.version; for (list<CDir*>::iterator p = subs.begin(); p != subs.end(); ++p) { CDir *dir = *p; dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl; assert(!dir->is_auth() || dir->is_complete() || replay); - + + if (dir->fnode.accounted_rstat.version == rstat_version) + rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat); + if (dir->fnode.accounted_fragstat.version == dirstat_version) + fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat, + touched_mtime); + dir->prepare_old_fragment(replay); // steal dentries @@ -964,21 +976,6 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay) if (dir->get_version() > get_version()) set_version(dir->get_version()); - // *stat versions - if (fnode.fragstat.version < dir->fnode.fragstat.version) - fnode.fragstat.version = dir->fnode.fragstat.version; - if (fnode.rstat.version < dir->fnode.rstat.version) - fnode.rstat.version = dir->fnode.rstat.version; - - if (dir->fnode.accounted_fragstat.version != dir->fnode.fragstat.version) - stale_fragstat = 1; - if (dir->fnode.accounted_rstat.version != dir->fnode.rstat.version) - stale_rstat = 1; - - // sum accounted_* - fnode.accounted_fragstat.add(dir->fnode.accounted_fragstat); - fnode.accounted_rstat.add(dir->fnode.accounted_rstat, 1); - // merge state state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT); dir_auth = dir->dir_auth; @@ -987,9 +984,14 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay) inode->close_dirfrag(dir->get_frag()); } - // offset accounted_* version by -1 if any source frag was stale - fnode.accounted_fragstat.version = fnode.fragstat.version - stale_fragstat; - fnode.accounted_rstat.version = fnode.rstat.version - stale_rstat; + // FIXME: merge dirty old rstat + fnode.rstat.version = rstat_version; + fnode.accounted_rstat = fnode.rstat; + fnode.accounted_rstat.add(rstatdiff); + + fnode.fragstat.version = dirstat_version; + fnode.accounted_fragstat = fnode.fragstat; + fnode.accounted_fragstat.add(fragstatdiff); init_fragment_pins(); } @@ -1412,7 +1414,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn) log_mark_dirty(); // mark complete, !fetching - state_set(STATE_COMPLETE); + mark_complete(); state_clear(STATE_FETCHING); auth_unpin(this); @@ -1687,7 +1689,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn) log_mark_dirty(); // mark complete, !fetching - state_set(STATE_COMPLETE); + mark_complete(); state_clear(STATE_FETCHING); auth_unpin(this); @@ -1851,7 +1853,8 @@ CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m, try_trim_snap_dentry(dn, *snaps)) continue; - if (!dn->is_dirty()) + if (!dn->is_dirty() && + (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null())) continue; // skip clean dentries if (dn->get_linkage()->is_null()) { @@ -1995,7 +1998,8 @@ void CDir::_commit(version_t want) unsigned max_write_size = cache->max_dir_commit_size; if (is_complete() && - (num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio))) { + ((num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio)) || + state_test(CDir::STATE_FRAGMENTING))) { fnode.snap_purged_thru = realm->get_last_destroyed(); committed_dn = _commit_full(m, snaps, max_write_size); } else { diff --git a/src/mds/CDir.h b/src/mds/CDir.h index 86da4e5dfd3..f131d834ca0 100644 --- a/src/mds/CDir.h +++ b/src/mds/CDir.h @@ -286,6 +286,7 @@ protected: public: CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth); ~CDir() { + remove_bloom(); g_num_dir--; g_num_dirs++; } diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 7accc5a4dba..1fc57feea4d 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -458,13 +458,6 @@ frag_t CInode::pick_dirfrag(const string& dn) bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls) { bool all = true; - for (map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) { - if (fg.contains(p->first)) - ls.push_back(p->second); - else - all = false; - } - /* list<frag_t> fglist; dirfragtree.get_leaves_under(fg, fglist); for (list<frag_t>::iterator p = fglist.begin(); @@ -474,7 +467,6 @@ bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls) ls.push_back(dirfrags[*p]); else all = false; - */ return all; } @@ -1776,7 +1768,7 @@ void CInode::finish_scatter_gather_update(int type) CDir *dir = p->second; dout(20) << fg << " " << *dir << dendl; - bool update = dir->is_auth() && !dir->is_frozen(); + bool update = dir->is_auth() && dir->get_version() != 0 && !dir->is_frozen(); fnode_t *pf = dir->get_projected_fnode(); if (update) @@ -1857,7 +1849,7 @@ void CInode::finish_scatter_gather_update(int type) CDir *dir = p->second; dout(20) << fg << " " << *dir << dendl; - bool update = dir->is_auth() && !dir->is_frozen(); + bool update = dir->is_auth() && dir->get_version() != 0 && !dir->is_frozen(); fnode_t *pf = dir->get_projected_fnode(); if (update) @@ -1944,7 +1936,7 @@ void CInode::finish_scatter_gather_update_accounted(int type, Mutation *mut, EMe p != dirfrags.end(); ++p) { CDir *dir = p->second; - if (!dir->is_auth() || dir->is_frozen()) + if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen()) continue; if (type == CEPH_LOCK_IDFT) @@ -2080,7 +2072,7 @@ void CInode::clear_ambiguous_auth() // auth_pins bool CInode::can_auth_pin() { - if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin()) + if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin()) return false; if (parent) return parent->can_auth_pin(); diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index 723267da116..624c3bc2395 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -56,6 +56,7 @@ class LogSegment { map<int, hash_set<version_t> > pending_commit_tids; // mdstable set<metareqid_t> uncommitted_masters; + set<dirfrag_t> uncommitted_fragments; // client request ids map<int, tid_t> last_client_tids; diff --git a/src/mds/MDBalancer.cc b/src/mds/MDBalancer.cc index 8d7f91d24a4..6a404c46974 100644 --- a/src/mds/MDBalancer.cc +++ b/src/mds/MDBalancer.cc @@ -351,7 +351,7 @@ void MDBalancer::do_fragmenting() } if (!split_queue.empty()) { - dout(0) << "do_fragmenting " << split_queue.size() << " dirs marked for possible splitting" << dendl; + dout(10) << "do_fragmenting " << split_queue.size() << " dirs marked for possible splitting" << dendl; set<dirfrag_t> q; q.swap(split_queue); @@ -364,13 +364,13 @@ void MDBalancer::do_fragmenting() !dir->is_auth()) continue; - dout(0) << "do_fragmenting splitting " << *dir << dendl; + dout(10) << "do_fragmenting splitting " << *dir << dendl; mds->mdcache->split_dir(dir, g_conf->mds_bal_split_bits); } } if (!merge_queue.empty()) { - dout(0) << "do_fragmenting " << merge_queue.size() << " dirs marked for possible merging" << dendl; + dout(10) << "do_fragmenting " << merge_queue.size() << " dirs marked for possible merging" << dendl; set<dirfrag_t> q; q.swap(merge_queue); @@ -384,7 +384,7 @@ void MDBalancer::do_fragmenting() dir->get_frag() == frag_t()) // ok who's the joker? continue; - dout(0) << "do_fragmenting merging " << *dir << dendl; + dout(10) << "do_fragmenting merging " << *dir << dendl; CInode *diri = dir->get_inode(); @@ -1007,7 +1007,7 @@ void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amoun (v > g_conf->mds_bal_split_rd && type == META_POP_IRD) || (v > g_conf->mds_bal_split_wr && type == META_POP_IWR)) && split_queue.count(dir->dirfrag()) == 0) { - dout(1) << "hit_dir " << type << " pop is " << v << ", putting in split_queue: " << *dir << dendl; + dout(10) << "hit_dir " << type << " pop is " << v << ", putting in split_queue: " << *dir << dendl; split_queue.insert(dir->dirfrag()); } @@ -1015,7 +1015,7 @@ void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amoun if (dir->get_frag() != frag_t() && (dir->get_num_head_items() < (unsigned)g_conf->mds_bal_merge_size) && merge_queue.count(dir->dirfrag()) == 0) { - dout(1) << "hit_dir " << type << " pop is " << v << ", putting in merge_queue: " << *dir << dendl; + dout(10) << "hit_dir " << type << " pop is " << v << ", putting in merge_queue: " << *dir << dendl; merge_queue.insert(dir->dirfrag()); } } diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index b1f1f0ad4c9..ae59c26ee13 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -1984,8 +1984,8 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob, } bool stop = false; - if (!pin->is_auth() || pin->is_ambiguous_auth()) { - dout(10) << "predirty_journal_parents !auth or ambig on " << *pin << dendl; + if (!pin->can_auth_pin() || pin->is_ambiguous_auth()) { + dout(10) << "predirty_journal_parents can't auth pin or ambig on " << *pin << dendl; stop = true; } @@ -2010,8 +2010,7 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob, if (!stop && mut->wrlocks.count(&pin->nestlock) == 0 && - (!pin->can_auth_pin() || - !pin->versionlock.can_wrlock() || // make sure we can take versionlock, too + (!pin->versionlock.can_wrlock() || // make sure we can take versionlock, too //true !mds->locker->wrlock_start(&pin->nestlock, static_cast<MDRequest*>(mut), true) // can cast only because i'm passing nowait=true )) { // ** do not initiate.. see above comment ** @@ -8663,9 +8662,9 @@ void MDCache::dispatch_request(MDRequest *mdr) mds->server->dispatch_slave_request(mdr); } else { switch (mdr->internal_op) { - - // ... - + case CEPH_MDS_OP_FRAGMENTDIR: + dispatch_fragment_dir(mdr); + break; default: assert(0); } @@ -10878,17 +10877,6 @@ public: } }; - -bool MDCache::can_fragment_lock(CInode *diri) -{ - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(7) << "can_fragment: can't wrlock dftlock" << dendl; - mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL); - return false; - } - return true; -} - bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs) { if (mds->mdsmap->is_degraded()) { @@ -10900,8 +10888,8 @@ bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs) dout(7) << "can_fragment: i won't merge|split anything in stray" << dendl; return false; } - if (diri->is_mdsdir() || diri->ino() == MDS_INO_CEPH) { - dout(7) << "can_fragment: i won't fragment the mdsdir or .ceph" << dendl; + if (diri->is_mdsdir() || diri->is_stray() || diri->ino() == MDS_INO_CEPH) { + dout(7) << "can_fragment: i won't fragment the mdsdir or straydir or .ceph" << dendl; return false; } @@ -10936,11 +10924,6 @@ void MDCache::split_dir(CDir *dir, int bits) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - mds->balancer->queue_split(dir); - return; - } C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits)); @@ -10968,18 +10951,13 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) if (!can_fragment(diri, dirs)) return; - if (!can_fragment_lock(diri)) { - //dout(10) << " requeuing dir " << dir->dirfrag() << dendl; - //mds->mdbalancer->split_queue.insert(dir->dirfrag()); - return; - } CDir *first = dirs.front(); int bits = first->get_frag().bits() - frag.bits(); dout(10) << " we are merginb by " << bits << " bits" << dendl; C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentFrozen(this, dirs, frag, bits)); + new C_MDC_FragmentFrozen(this, dirs, frag, -bits)); fragment_freeze_dirs(dirs, gather); gather.activate(); @@ -11078,66 +11056,144 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs) } } -class C_MDC_FragmentLoggedAndStored : public Context { +class C_MDC_FragmentPrep : public Context { MDCache *mdcache; - Mutation *mut; + MDRequest *mdr; +public: + C_MDC_FragmentPrep(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} + virtual void finish(int r) { + mdcache->_fragment_logged(mdr); + } +}; + +class C_MDC_FragmentStore : public Context { + MDCache *mdcache; + MDRequest *mdr; +public: + C_MDC_FragmentStore(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {} + virtual void finish(int r) { + mdcache->_fragment_stored(mdr); + } +}; + +class C_MDC_FragmentCommit : public Context { + MDCache *mdcache; + dirfrag_t basedirfrag; + list<CDir*> resultfrags; +public: + C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) : + mdcache(m), basedirfrag(ino, f) { + resultfrags.swap(l); + } + virtual void finish(int r) { + mdcache->_fragment_committed(basedirfrag, resultfrags); + } +}; + +class C_MDC_FragmentFinish : public Context { + MDCache *mdcache; + dirfrag_t basedirfrag; list<CDir*> resultfrags; - frag_t basefrag; - int bits; public: - C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) : - mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} + C_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) : + mdcache(m), basedirfrag(f) { + resultfrags.swap(l); + } virtual void finish(int r) { - mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits); + mdcache->_fragment_finish(basedirfrag, resultfrags); } }; void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) { - CInode *diri = dirs.front()->get_inode(); + dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits + << " on " << dirs.front()->get_inode() << dendl; - if (bits > 0) { + if (bits > 0) assert(dirs.size() == 1); - } else { - assert(bits < 0); - } + else if (bits < 0) + assert(dirs.size() > 1); + else + assert(0); - dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits - << " on " << *diri << dendl; + MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR); + fragment_info_t &info = fragment_requests[mdr->reqid]; + info.basefrag = basefrag; + info.bits = bits; + info.dirs = dirs; - // wrlock dirfragtreelock - if (!diri->dirfragtreelock.can_wrlock(-1)) { - dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl; - fragment_unmark_unfreeze_dirs(dirs); - return; + dispatch_fragment_dir(mdr); +} + +void MDCache::dispatch_fragment_dir(MDRequest *mdr) +{ + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.dirs.front()->get_inode(); + + dout(10) << "dispatch_fragment_dir " << info.resultfrags << " " + << info.basefrag << " bits " << info.bits << " on " << *diri << dendl; + + // avoid freeze dir deadlock + if (!mdr->is_auth_pinned(diri)) { + if (!diri->can_auth_pin()) { + dout(10) << " can't auth_pin " << *diri << ", requeuing dir " + << info.dirs.front()->dirfrag() << dendl; + if (info.bits > 0) + mds->balancer->queue_split(info.dirs.front()); + else + mds->balancer->queue_merge(info.dirs.front()); + fragment_unmark_unfreeze_dirs(info.dirs); + fragment_requests.erase(mdr->reqid); + request_finish(mdr); + return; + } + mdr->auth_pin(diri); } - diri->dirfragtreelock.get_wrlock(true); + set<SimpleLock*> rdlocks, wrlocks, xlocks; + wrlocks.insert(&diri->dirfragtreelock); // prevent a racing gather on any other scatterlocks too - diri->nestlock.get_wrlock(true); - diri->filelock.get_wrlock(true); + wrlocks.insert(&diri->nestlock); + wrlocks.insert(&diri->filelock); + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + + mdr->ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), + info.basefrag, info.bits); + mds->mdlog->start_entry(le); + + for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p) { + CDir *dir = *p; + dirfrag_rollback rollback; + rollback.fnode = dir->fnode; + le->add_orig_frag(dir->get_frag(), &rollback); + } // refragment - list<CDir*> resultfrags; list<Context*> waiters; - adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false); + adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits, + info.resultfrags, waiters, false); if (g_conf->mds_debug_frag) diri->verify_dirfrags(); mds->queue_waiters(waiters); - // journal - Mutation *mut = new Mutation; + for (list<frag_t>::iterator p = le->orig_frags.begin(); p != le->orig_frags.end(); ++p) + assert(!diri->dirfragtree.is_leaf(*p)); - mut->ls = mds->mdlog->get_current_segment(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - - le->metablob.add_dir_context(*resultfrags.begin()); + le->metablob.add_dir_context(*info.resultfrags.begin()); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); + ++p) { + le->metablob.add_dir(*p, false); + } // dft lock mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock); - mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); - mut->add_updated_lock(&diri->dirfragtreelock); + mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree); + mdr->add_updated_lock(&diri->dirfragtreelock); /* // filelock @@ -11151,48 +11207,57 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits) mut->add_updated_lock(&diri->nestlock); */ - // freeze, journal, and store resulting frags - C_GatherBuilder gather(g_ceph_context, - new C_MDC_FragmentLoggedAndStored(this, mut, - resultfrags, basefrag, bits)); + add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags, mdr->ls); + mds->mdlog->submit_entry(le, new C_MDC_FragmentPrep(this, mdr)); + mds->mdlog->flush(); +} + +void MDCache::_fragment_logged(MDRequest *mdr) +{ + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.resultfrags.front()->get_inode(); + + dout(10) << "fragment_logged " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + // store resulting frags + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr)); + + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; - dout(10) << " result frag " << *dir << dendl; - le->metablob.add_dir(dir, false); + dout(10) << " storing result frag " << *dir << dendl; // freeze and store them too + dir->auth_pin(this); dir->state_set(CDir::STATE_FRAGMENTING); dir->commit(0, gather.new_sub(), true); // ignore authpinnability } - mds->mdlog->submit_entry(le, gather.new_sub()); - mds->mdlog->flush(); gather.activate(); } -void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits) +void MDCache::_fragment_stored(MDRequest *mdr) { - CInode *diri = resultfrags.front()->get_inode(); + map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid); + assert(it != fragment_requests.end()); + fragment_info_t &info = it->second; + CInode *diri = info.resultfrags.front()->get_inode(); - dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits - << " on " << *diri << dendl; - - // journal commit - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits); - mds->mdlog->start_entry(le); - mds->mdlog->submit_entry(le); + dout(10) << "fragment_stored " << info.resultfrags << " " << info.basefrag + << " bits " << info.bits << " on " << *diri << dendl; // tell peers - CDir *first = *resultfrags.begin(); + CDir *first = *info.resultfrags.begin(); for (map<int,int>::iterator p = first->replica_map.begin(); p != first->replica_map.end(); ++p) { if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN) continue; - MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits); + MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits); /* // freshly replicate new dirs to peers @@ -11203,26 +11268,15 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags mds->send_message_mds(notify, p->first); } - mut->apply(); // mark scatterlock - mds->locker->drop_locks(mut); - mut->cleanup(); - delete mut; - - // drop dft wrlock - bool need_issue = false; - mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue); - mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue); + mdr->apply(); // mark scatterlock + mds->locker->drop_locks(mdr); // unfreeze resulting frags - for (list<CDir*>::iterator p = resultfrags.begin(); - p != resultfrags.end(); + for (list<CDir*>::iterator p = info.resultfrags.begin(); + p != info.resultfrags.end(); ++p) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; - - // unmark, unfreeze - dir->state_clear(CDir::STATE_FRAGMENTING); for (CDir::map_t::iterator p = dir->items.begin(); p != dir->items.end(); @@ -11233,13 +11287,72 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags dn->put(CDentry::PIN_FRAGMENTING); } + // unfreeze dir->unfreeze_dir(); } - if (need_issue) - mds->locker->issue_caps(diri); + // journal commit + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, + diri->ino(), info.basefrag, info.bits); + mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag, + info.resultfrags)); + + fragment_requests.erase(it); + request_finish(mdr); +} + +void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_committed " << basedirfrag << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + assert(it != uncommitted_fragments.end()); + ufragment &uf = it->second; + + // remove old frags + C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags)); + + SnapContext nullsnapc; + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + for (list<frag_t>::iterator p = uf.old_frags.begin(); + p != uf.old_frags.end(); + ++p) { + object_t oid = CInode::get_object_name(basedirfrag.ino, *p, ""); + ObjectOperation op; + if (*p == frag_t()) { + // backtrace object + dout(10) << " truncate orphan dirfrag " << oid << dendl; + op.truncate(0); + } else { + dout(10) << " removing orphan dirfrag " << oid << dendl; + op.remove(); + } + mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context), + 0, NULL, gather.new_sub()); + } + + assert(gather.has_subs()); + gather.activate(); } +void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags) +{ + dout(10) << "fragment_finish " << basedirfrag << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + assert(it != uncommitted_fragments.end()); + ufragment &uf = it->second; + + // unmark & auth_unpin + for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) { + (*p)->state_clear(CDir::STATE_FRAGMENTING); + (*p)->auth_unpin(this); + } + + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, + basedirfrag.ino, basedirfrag.frag, uf.bits); + mds->mdlog->start_submit_entry(le); + + finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH); +} /* This function DOES put the passed message before returning */ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) @@ -11285,26 +11398,140 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) notify->put(); } +void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags, + LogSegment *ls, bufferlist *rollback) +{ + dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl; + assert(!uncommitted_fragments.count(basedirfrag)); + ufragment& uf = uncommitted_fragments[basedirfrag]; + uf.old_frags = old_frags; + uf.bits = bits; + uf.ls = ls; + ls->uncommitted_fragments.insert(basedirfrag); + if (rollback) + uf.rollback.swap(*rollback); +} + +void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag, int op) +{ + dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag + << " op " << EFragment::op_name(op) << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + if (it != uncommitted_fragments.end()) { + ufragment& uf = it->second; + if (op != EFragment::OP_FINISH && !uf.old_frags.empty()) { + uf.committed = true; + } else { + uf.ls->uncommitted_fragments.erase(basedirfrag); + mds->queue_waiters(uf.waiters); + uncommitted_fragments.erase(it); + } + } +} + +void MDCache::rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags) +{ + dout(10) << "rollback_uncommitted_fragment: base dirfrag " << basedirfrag + << " old_frags (" << old_frags << ")" << dendl; + map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag); + if (it != uncommitted_fragments.end()) { + ufragment& uf = it->second; + if (!uf.old_frags.empty()) { + uf.old_frags.swap(old_frags); + uf.committed = true; + } else { + uf.ls->uncommitted_fragments.erase(basedirfrag); + uncommitted_fragments.erase(it); + } + } +} void MDCache::rollback_uncommitted_fragments() { dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl; - for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin(); + for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin(); p != uncommitted_fragments.end(); ++p) { + ufragment &uf = p->second; CInode *diri = get_inode(p->first.ino); assert(diri); - dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl; + + if (uf.committed) { + list<CDir*> frags; + diri->get_dirfrags_under(p->first.frag, frags); + for (list<CDir*>::iterator q = frags.begin(); q != frags.end(); ++q) { + CDir *dir = *q; + dir->auth_pin(this); + dir->state_set(CDir::STATE_FRAGMENTING); + } + _fragment_committed(p->first, frags); + continue; + } + + dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl; + + LogSegment *ls = mds->mdlog->get_current_segment(); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits); + mds->mdlog->start_entry(le); + + list<frag_t> old_frags; + diri->dirfragtree.get_leaves_under(p->first.frag, old_frags); + list<CDir*> resultfrags; - list<Context*> waiters; - adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true); + if (uf.old_frags.empty()) { + // created by old format EFragment + list<Context*> waiters; + adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true); + } else { + bufferlist::iterator bp = uf.rollback.begin(); + for (list<frag_t>::iterator q = uf.old_frags.begin(); q != uf.old_frags.end(); ++q) { + CDir *dir = force_dir_fragment(diri, *q); + resultfrags.push_back(dir); + + dirfrag_rollback rollback; + ::decode(rollback, bp); + + dir->set_version(rollback.fnode.version); + dir->fnode = rollback.fnode; + + dir->_mark_dirty(ls); + + if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) { + dout(10) << " dirty nestinfo on " << *dir << dendl; + mds->locker->mark_updated_scatterlock(&dir->inode->nestlock); + ls->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest); + dir->get_inode()->nestlock.mark_dirty(); + } + if (!(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) { + dout(10) << " dirty fragstat on " << *dir << dendl; + mds->locker->mark_updated_scatterlock(&dir->inode->filelock); + ls->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir); + dir->get_inode()->filelock.mark_dirty(); + } + + le->add_orig_frag(dir->get_frag()); + le->metablob.add_dir_context(dir); + le->metablob.add_dir(dir, true); + } + } + if (g_conf->mds_debug_frag) diri->verify_dirfrags(); - EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second); - mds->mdlog->start_submit_entry(le); + for (list<frag_t>::iterator q = old_frags.begin(); q != old_frags.end(); ++q) + assert(!diri->dirfragtree.is_leaf(*q)); + + for (list<CDir*>::iterator q = resultfrags.begin(); q != resultfrags.end(); ++q) { + CDir *dir = *q; + dir->auth_pin(this); + dir->state_set(CDir::STATE_FRAGMENTING); + } + + mds->mdlog->submit_entry(le); + + uf.old_frags.swap(old_frags); + _fragment_committed(p->first, resultfrags); } - uncommitted_fragments.clear(); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 416c6454292..87b1098bb52 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -943,10 +943,26 @@ protected: // -- fragmenting -- -public: - set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations - private: + struct ufragment { + int bits; + bool committed; + LogSegment *ls; + list<Context*> waiters; + list<frag_t> old_frags; + bufferlist rollback; + ufragment() : bits(0), committed(false), ls(NULL) {} + }; + map<dirfrag_t, ufragment> uncommitted_fragments; + + struct fragment_info_t { + frag_t basefrag; + int bits; + list<CDir*> dirs; + list<CDir*> resultfrags; + }; + map<metareqid_t, fragment_info_t> fragment_requests; + void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits, list<CDir*>& frags, list<Context*>& waiters, bool replay); void adjust_dir_fragments(CInode *diri, @@ -958,32 +974,39 @@ private: CDir *force_dir_fragment(CInode *diri, frag_t fg); void get_force_dirfrag_bound_set(vector<dirfrag_t>& dfs, set<CDir*>& bounds); - - friend class EFragment; - - bool can_fragment_lock(CInode *diri); bool can_fragment(CInode *diri, list<CDir*>& dirs); - -public: - void split_dir(CDir *dir, int byn); - void merge_dir(CInode *diri, frag_t fg); - -private: void fragment_freeze_dirs(list<CDir*>& dirs, C_GatherBuilder &gather); void fragment_mark_and_complete(list<CDir*>& dirs); void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits); void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs); - void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits); -public: - void rollback_uncommitted_fragments(); -private: + void dispatch_fragment_dir(MDRequest *mdr); + void _fragment_logged(MDRequest *mdr); + void _fragment_stored(MDRequest *mdr); + void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags); + void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags); + friend class EFragment; friend class C_MDC_FragmentFrozen; friend class C_MDC_FragmentMarking; - friend class C_MDC_FragmentLoggedAndStored; + friend class C_MDC_FragmentPrep; + friend class C_MDC_FragmentStore; + friend class C_MDC_FragmentCommit; + friend class C_MDC_FragmentFinish; void handle_fragment_notify(MMDSFragmentNotify *m); + void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag, + LogSegment *ls, bufferlist *rollback=NULL); + void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op); + void rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags); +public: + void wait_for_uncommitted_fragment(dirfrag_t dirfrag, Context *c) { + assert(uncommitted_fragments.count(dirfrag)); + uncommitted_fragments[dirfrag].waiters.push_back(c); + } + void split_dir(CDir *dir, int byn); + void merge_dir(CInode *diri, frag_t fg); + void rollback_uncommitted_fragments(); // -- updates -- //int send_inode_updates(CInode *in); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 41862847e27..0c500cdfe63 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -2735,13 +2735,15 @@ void Server::handle_client_readdir(MDRequest *mdr) // which frag? frag_t fg = (__u32)req->head.args.readdir.frag; - dout(10) << " frag " << fg << dendl; + string offset_str = req->get_path2(); + dout(10) << " frag " << fg << " offset '" << offset_str << "'" << dendl; // does the frag exist? if (diri->dirfragtree[fg.value()] != fg) { - dout(10) << "frag " << fg << " doesn't appear in fragtree " << diri->dirfragtree << dendl; - reply_request(mdr, -EAGAIN); - return; + frag_t newfg = diri->dirfragtree[fg.value()]; + dout(10) << " adjust frag " << fg << " -> " << newfg << " " << diri->dirfragtree << dendl; + fg = newfg; + offset_str.clear(); } CDir *dir = try_open_auth_dirfrag(diri, fg, mdr); @@ -2770,12 +2772,7 @@ void Server::handle_client_readdir(MDRequest *mdr) mdr->now = ceph_clock_now(g_ceph_context); snapid_t snapid = mdr->snapid; - - string offset_str = req->get_path2(); - const char *offset = offset_str.length() ? offset_str.c_str() : 0; - - dout(10) << "snapid " << snapid << " offset '" << offset_str << "'" << dendl; - + dout(10) << "snapid " << snapid << dendl; // purge stale snap data? const set<snapid_t> *snaps = 0; @@ -2831,7 +2828,7 @@ void Server::handle_client_readdir(MDRequest *mdr) continue; } - if (offset && strcmp(dn->get_name().c_str(), offset) <= 0) + if (!offset_str.empty() && dn->get_name().compare(offset_str) <= 0) continue; CInode *in = dnl->get_inode(); @@ -2901,7 +2898,7 @@ void Server::handle_client_readdir(MDRequest *mdr) } __u8 end = (it == dir->end()); - __u8 complete = (end && !offset); // FIXME: what purpose does this serve + __u8 complete = (end && offset_str.empty()); // FIXME: what purpose does this serve // finish final blob ::encode(numfiles, dirbl); diff --git a/src/mds/events/EFragment.h b/src/mds/events/EFragment.h index bdbbd335e29..a9ddd548502 100644 --- a/src/mds/events/EFragment.h +++ b/src/mds/events/EFragment.h @@ -18,6 +18,14 @@ #include "../LogEvent.h" #include "EMetaBlob.h" +struct dirfrag_rollback { + fnode_t fnode; + dirfrag_rollback() { } + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& bl); +}; +WRITE_CLASS_ENCODER(dirfrag_rollback) + class EFragment : public LogEvent { public: EMetaBlob metablob; @@ -25,6 +33,8 @@ public: inodeno_t ino; frag_t basefrag; __s32 bits; // positive for split (from basefrag), negative for merge (to basefrag) + list<frag_t> orig_frags; + bufferlist rollback; EFragment() : LogEvent(EVENT_FRAGMENT) { } EFragment(MDLog *mdlog, int o, inodeno_t i, frag_t bf, int b) : @@ -39,17 +49,25 @@ public: OP_PREPARE = 1, OP_COMMIT = 2, OP_ROLLBACK = 3, - OP_ONESHOT = 4, // (legacy) PREPARE+COMMIT + OP_FINISH = 4, // finish deleting orphan dirfrags + OP_ONESHOT = 5, // (legacy) PREPARE+COMMIT }; - const char *op_name(int o) const { + static const char *op_name(int o) { switch (o) { case OP_PREPARE: return "prepare"; case OP_COMMIT: return "commit"; case OP_ROLLBACK: return "rollback"; + case OP_FINISH: return "finish"; default: return "???"; } } + void add_orig_frag(frag_t df, dirfrag_rollback *drb=NULL) { + orig_frags.push_back(df); + if (drb) + ::encode(*drb, rollback); + } + void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); void dump(Formatter *f) const; diff --git a/src/mds/journal.cc b/src/mds/journal.cc index aeff07eb905..41a79f9fb38 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -119,6 +119,14 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld) mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub()); } + // uncommitted fragments + for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin(); + p != uncommitted_fragments.end(); + ++p) { + dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl; + mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub()); + } + // nudge scatterlocks for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) { CInode *in = *p; @@ -2381,7 +2389,7 @@ void EFragment::replay(MDS *mds) list<CDir*> resultfrags; list<Context*> waiters; - pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits); + list<frag_t> old_frags; // in may be NULL if it wasn't in our cache yet. if it's a prepare // it will be once we replay the metablob , but first we need to @@ -2390,45 +2398,56 @@ void EFragment::replay(MDS *mds) switch (op) { case OP_PREPARE: - mds->mdcache->uncommitted_fragments.insert(desc); + mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, _segment, &rollback); // fall-thru case OP_ONESHOT: if (in) mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true); break; - case OP_COMMIT: - mds->mdcache->uncommitted_fragments.erase(desc); - break; - case OP_ROLLBACK: - if (mds->mdcache->uncommitted_fragments.count(desc)) { - mds->mdcache->uncommitted_fragments.erase(desc); - assert(in); - mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true); - } else { - dout(10) << " no record of prepare for " << desc << dendl; + if (in) { + in->dirfragtree.get_leaves_under(basefrag, old_frags); + if (orig_frags.empty()) { + // old format EFragment + mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true); + } else { + for (list<frag_t>::iterator p = orig_frags.begin(); p != orig_frags.end(); ++p) + mds->mdcache->force_dir_fragment(in, *p); + } } + mds->mdcache->rollback_uncommitted_fragment(dirfrag_t(ino, basefrag), old_frags); + break; + + case OP_COMMIT: + case OP_FINISH: + mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag), op); break; + + default: + assert(0); } + metablob.replay(mds, _segment); if (in && g_conf->mds_debug_frag) in->verify_dirfrags(); } void EFragment::encode(bufferlist &bl) const { - ENCODE_START(4, 4, bl); + ENCODE_START(5, 4, bl); ::encode(stamp, bl); ::encode(op, bl); ::encode(ino, bl); ::encode(basefrag, bl); ::encode(bits, bl); ::encode(metablob, bl); + ::encode(orig_frags, bl); + ::encode(rollback, bl); ENCODE_FINISH(bl); } void EFragment::decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl); + DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl); if (struct_v >= 2) ::decode(stamp, bl); if (struct_v >= 3) @@ -2439,6 +2458,10 @@ void EFragment::decode(bufferlist::iterator &bl) { ::decode(basefrag, bl); ::decode(bits, bl); ::decode(metablob, bl); + if (struct_v >= 5) { + ::decode(orig_frags, bl); + ::decode(rollback, bl); + } DECODE_FINISH(bl); } @@ -2462,7 +2485,19 @@ void EFragment::generate_test_instances(list<EFragment*>& ls) ls.back()->bits = 5; } +void dirfrag_rollback::encode(bufferlist &bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(fnode, bl); + ENCODE_FINISH(bl); +} +void dirfrag_rollback::decode(bufferlist::iterator &bl) +{ + DECODE_START(1, bl); + ::decode(fnode, bl); + DECODE_FINISH(bl); +} |