summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Farnum <greg@inktank.com>2013-10-17 08:35:38 -0700
committerGregory Farnum <greg@inktank.com>2013-10-17 08:35:38 -0700
commitff8e6a763ab0b7a6df9c430a46b7dda4e80772c1 (patch)
treeb6bb427f9ad51ad9357fb764e53fd3677b5478b7
parente1e6408403cc628ca09487d152653d09e1218805 (diff)
parent3c6710b9065e6ec9455123d3941cf48254bbb97b (diff)
downloadceph-ff8e6a763ab0b7a6df9c430a46b7dda4e80772c1.tar.gz
Merge pull request #691 from ceph/wip-dirfrag
Reviewed-by: Greg Farnum <greg@inktank.com> Partly-Reviewed-by: Sage Weil <sage@inktank.com>
-rwxr-xr-xqa/workunits/misc/dirfrag.sh48
-rw-r--r--src/client/Client.cc46
-rw-r--r--src/client/Client.h6
-rw-r--r--src/client/MetaRequest.h1
-rw-r--r--src/common/ceph_strings.cc1
-rw-r--r--src/include/ceph_fs.h3
-rw-r--r--src/include/frag.h2
-rw-r--r--src/mds/CDir.cc100
-rw-r--r--src/mds/CDir.h1
-rw-r--r--src/mds/CInode.cc16
-rw-r--r--src/mds/LogSegment.h1
-rw-r--r--src/mds/MDBalancer.cc12
-rw-r--r--src/mds/MDCache.cc445
-rw-r--r--src/mds/MDCache.h59
-rw-r--r--src/mds/Server.cc21
-rw-r--r--src/mds/events/EFragment.h22
-rw-r--r--src/mds/journal.cc63
17 files changed, 611 insertions, 236 deletions
diff --git a/qa/workunits/misc/dirfrag.sh b/qa/workunits/misc/dirfrag.sh
new file mode 100755
index 00000000000..393667427fd
--- /dev/null
+++ b/qa/workunits/misc/dirfrag.sh
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+set -e
+
+DEPTH=5
+COUNT=10000
+
+kill_jobs() {
+ jobs -p | xargs kill
+}
+trap kill_jobs INT
+
+create_files() {
+ for i in `seq 1 $COUNT`
+ do
+ touch file$i
+ done
+}
+
+delete_files() {
+ for i in `ls -f`
+ do
+ if [[ ${i}a = file*a ]]
+ then
+ rm -f $i
+ fi
+ done
+}
+
+rm -rf testdir
+mkdir testdir
+cd testdir
+
+for i in `seq 1 $DEPTH`; do
+ mkdir dir$i
+ cd dir$i
+ create_files &
+done
+wait
+
+for i in `seq 1 $DEPTH`; do
+ delete_files &
+ cd ..
+done
+wait
+
+cd ..
+rm -rf testdir
diff --git a/src/client/Client.cc b/src/client/Client.cc
index 60a5e4550b8..20651892c0c 100644
--- a/src/client/Client.cc
+++ b/src/client/Client.cc
@@ -818,16 +818,28 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
::decode(end, p);
::decode(complete, p);
+ frag_t fg = request->readdir_frag;
+ uint64_t readdir_offset = request->readdir_offset;
+ string readdir_start = request->readdir_start;
+ if (fg != dst.frag) {
+ ldout(cct, 10) << "insert_trace got new frag " << fg << " -> " << dst.frag << dendl;
+ fg = dst.frag;
+ if (fg.is_leftmost())
+ readdir_offset = 2;
+ else
+ readdir_offset = 0;
+ readdir_start.clear();
+ }
+
ldout(cct, 10) << "insert_trace " << numdn << " readdir items, end=" << (int)end
- << ", offset " << request->readdir_offset
- << ", readdir_start " << request->readdir_start << dendl;
+ << ", offset " << readdir_offset
+ << ", readdir_start " << readdir_start << dendl;
+ request->readdir_reply_frag = fg;
request->readdir_end = end;
request->readdir_num = numdn;
- map<string,Dentry*>::iterator pd = dir->dentry_map.upper_bound(request->readdir_start);
-
- frag_t fg = request->readdir_frag;
+ map<string,Dentry*>::iterator pd = dir->dentry_map.upper_bound(readdir_start);
string dname;
LeaseStat dlease;
@@ -878,7 +890,7 @@ void Client::insert_readdir_results(MetaRequest *request, MetaSession *session,
dn = link(dir, dname, in, NULL);
}
update_dentry_lease(dn, &dlease, request->sent_stamp, session);
- dn->offset = dir_result_t::make_fpos(request->readdir_frag, i + request->readdir_offset);
+ dn->offset = dir_result_t::make_fpos(fg, i + readdir_offset);
// add to cached result list
in->get();
@@ -5016,8 +5028,16 @@ int Client::_readdir_get_frag(dir_result_t *dirp)
dirp->buffer = new vector<pair<string,Inode*> >;
dirp->buffer->swap(req->readdir_result);
- dirp->buffer_frag = fg;
+ if (fg != req->readdir_reply_frag) {
+ fg = req->readdir_reply_frag;
+ if (fg.is_leftmost())
+ dirp->next_offset = 2;
+ else
+ dirp->next_offset = 0;
+ dirp->offset = dir_result_t::make_fpos(fg, dirp->next_offset);
+ }
+ dirp->buffer_frag = fg;
dirp->this_offset = dirp->next_offset;
ldout(cct, 10) << "_readdir_get_frag " << dirp << " got frag " << dirp->buffer_frag
<< " this_offset " << dirp->this_offset
@@ -5196,14 +5216,18 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
int r = _readdir_get_frag(dirp);
if (r)
return r;
+ // _readdir_get_frag () may updates dirp->offset if the replied dirfrag is
+ // different than the requested one. (our dirfragtree was outdated)
fg = dirp->buffer_frag;
+ off = dirp->fragpos();
}
ldout(cct, 10) << "off " << off << " this_offset " << hex << dirp->this_offset << dec << " size " << dirp->buffer->size()
<< " frag " << fg << dendl;
+
+ dirp->offset = dir_result_t::make_fpos(fg, off);
while (off >= dirp->this_offset &&
off - dirp->this_offset < dirp->buffer->size()) {
- uint64_t pos = dir_result_t::make_fpos(fg, off);
pair<string,Inode*>& ent = (*dirp->buffer)[off - dirp->this_offset];
int stmask = fill_stat(ent.second, &st);
@@ -5219,7 +5243,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
return r;
off++;
- dirp->offset = pos + 1;
+ dirp->offset++;
}
if (dirp->last_name.length()) {
@@ -5230,10 +5254,10 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p)
if (!fg.is_rightmost()) {
// next frag!
- dirp->next_frag();
- off = 0;
+ _readdir_next_frag(dirp);
ldout(cct, 10) << " advancing to next frag: " << fg << " -> " << dirp->frag() << dendl;
fg = dirp->frag();
+ off = 0;
continue;
}
diff --git a/src/client/Client.h b/src/client/Client.h
index df59f235de4..649bacc5ba6 100644
--- a/src/client/Client.h
+++ b/src/client/Client.h
@@ -137,7 +137,7 @@ struct dir_result_t {
return ((uint64_t)frag << SHIFT) | (uint64_t)off;
}
static unsigned fpos_frag(uint64_t p) {
- return p >> SHIFT;
+ return (p & ~END) >> SHIFT;
}
static unsigned fpos_off(uint64_t p) {
return p & MASK;
@@ -176,8 +176,8 @@ struct dir_result_t {
offset = (uint64_t)f << SHIFT;
assert(sizeof(offset) == 8);
}
- void set_end() { offset = END; }
- bool at_end() { return (offset == END); }
+ void set_end() { offset |= END; }
+ bool at_end() { return (offset & END); }
void reset() {
last_name.clear();
diff --git a/src/client/MetaRequest.h b/src/client/MetaRequest.h
index 036b4154e0c..5583cd16281 100644
--- a/src/client/MetaRequest.h
+++ b/src/client/MetaRequest.h
@@ -57,6 +57,7 @@ public:
string readdir_start; // starting _after_ this name
uint64_t readdir_offset;
+ frag_t readdir_reply_frag;
vector<pair<string,Inode*> > readdir_result;
bool readdir_end;
int readdir_num;
diff --git a/src/common/ceph_strings.cc b/src/common/ceph_strings.cc
index 47648ce19b3..221fb059740 100644
--- a/src/common/ceph_strings.cc
+++ b/src/common/ceph_strings.cc
@@ -183,6 +183,7 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_RMSNAP: return "rmsnap";
case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
+ case CEPH_MDS_OP_FRAGMENTDIR: return "fragmentdir";
}
return "???";
}
diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
index ba0b5eb0f19..47ec1f14f6e 100644
--- a/src/include/ceph_fs.h
+++ b/src/include/ceph_fs.h
@@ -333,6 +333,9 @@ enum {
CEPH_MDS_OP_MKSNAP = 0x01400,
CEPH_MDS_OP_RMSNAP = 0x01401,
CEPH_MDS_OP_LSSNAP = 0x00402,
+
+ // internal op
+ CEPH_MDS_OP_FRAGMENTDIR= 0x01500,
};
extern const char *ceph_mds_op_name(int op);
diff --git a/src/include/frag.h b/src/include/frag.h
index 715eb098283..fbe5b43f8cb 100644
--- a/src/include/frag.h
+++ b/src/include/frag.h
@@ -285,7 +285,7 @@ public:
*/
void get_leaves_under(frag_t x, std::list<frag_t>& ls) const {
std::list<frag_t> q;
- q.push_back(get_branch(x));
+ q.push_back(get_branch_or_leaf(x));
while (!q.empty()) {
frag_t t = q.front();
q.pop_front();
diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc
index 4a5e636d9a6..2c985e4775d 100644
--- a/src/mds/CDir.cc
+++ b/src/mds/CDir.cc
@@ -786,8 +786,10 @@ void CDir::prepare_old_fragment(bool replay)
void CDir::prepare_new_fragment(bool replay)
{
- if (!replay && is_auth())
+ if (!replay && is_auth()) {
_freeze_dir();
+ mark_complete();
+ }
}
void CDir::finish_old_fragment(list<Context*>& waiters, bool replay)
@@ -856,11 +858,16 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool repl
double fac = 1.0 / (double)(1 << bits); // for scaling load vecs
- nest_info_t olddiff; // old += f - af;
- dout(10) << " rstat " << fnode.rstat << dendl;
- dout(10) << " accounted_rstat " << fnode.accounted_rstat << dendl;
- olddiff.add_delta(fnode.rstat, fnode.accounted_rstat);
- dout(10) << " olddiff " << olddiff << dendl;
+ dout(15) << " rstat " << fnode.rstat << dendl;
+ dout(15) << " accounted_rstat " << fnode.accounted_rstat << dendl;
+ nest_info_t rstatdiff;
+ rstatdiff.add_delta(fnode.accounted_rstat, fnode.rstat);
+ dout(15) << " fragstat " << fnode.fragstat << dendl;
+ dout(15) << " accounted_fragstat " << fnode.accounted_fragstat << dendl;
+ frag_info_t fragstatdiff;
+ bool touched_mtime;
+ fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat, touched_mtime);
+ dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
prepare_old_fragment(replay);
@@ -905,27 +912,24 @@ void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool repl
f->steal_dentry(dn);
}
+ // FIXME: handle dirty old rstat
+
// fix up new frag fragstats
- bool stale_fragstat = fnode.fragstat.version != fnode.accounted_fragstat.version;
- bool stale_rstat = fnode.rstat.version != fnode.accounted_rstat.version;
for (int i=0; i<n; i++) {
- subfrags[i]->fnode.fragstat.version = fnode.fragstat.version;
- subfrags[i]->fnode.accounted_fragstat = subfrags[i]->fnode.fragstat;
- if (i == 0) {
- if (stale_fragstat)
- subfrags[0]->fnode.accounted_fragstat.version--;
- if (stale_rstat)
- subfrags[0]->fnode.accounted_rstat.version--;
- }
- dout(10) << " fragstat " << subfrags[i]->fnode.fragstat << " on " << *subfrags[i] << dendl;
+ CDir *f = subfrags[i];
+ f->fnode.rstat.version = fnode.rstat.version;
+ f->fnode.accounted_rstat = f->fnode.rstat;
+ f->fnode.fragstat.version = fnode.fragstat.version;
+ f->fnode.accounted_fragstat = f->fnode.fragstat;
+ dout(10) << " rstat " << f->fnode.rstat << " fragstat " << f->fnode.fragstat
+ << " on " << *f << dendl;
}
// give any outstanding frag stat differential to first frag
- // af[0] -= olddiff
- dout(10) << "giving olddiff " << olddiff << " to " << *subfrags[0] << dendl;
- nest_info_t zero;
- subfrags[0]->fnode.accounted_rstat.add_delta(zero, olddiff);
- dout(10) << " " << subfrags[0]->fnode.accounted_fragstat << dendl;
+ dout(10) << " giving rstatdiff " << rstatdiff << " fragstatdiff" << fragstatdiff
+ << " to " << *subfrags[0] << dendl;
+ subfrags[0]->fnode.accounted_rstat.add(rstatdiff);
+ subfrags[0]->fnode.accounted_fragstat.add(fragstatdiff);
finish_old_fragment(waiters, replay);
}
@@ -936,15 +940,23 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
prepare_new_fragment(replay);
- // see if _any_ of the source frags have stale fragstat or rstat
- int stale_rstat = 0;
- int stale_fragstat = 0;
+ nest_info_t rstatdiff;
+ frag_info_t fragstatdiff;
+ bool touched_mtime;
+ version_t rstat_version = inode->get_projected_inode()->rstat.version;
+ version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
for (list<CDir*>::iterator p = subs.begin(); p != subs.end(); ++p) {
CDir *dir = *p;
dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
assert(!dir->is_auth() || dir->is_complete() || replay);
-
+
+ if (dir->fnode.accounted_rstat.version == rstat_version)
+ rstatdiff.add_delta(dir->fnode.accounted_rstat, dir->fnode.rstat);
+ if (dir->fnode.accounted_fragstat.version == dirstat_version)
+ fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
+ touched_mtime);
+
dir->prepare_old_fragment(replay);
// steal dentries
@@ -964,21 +976,6 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
if (dir->get_version() > get_version())
set_version(dir->get_version());
- // *stat versions
- if (fnode.fragstat.version < dir->fnode.fragstat.version)
- fnode.fragstat.version = dir->fnode.fragstat.version;
- if (fnode.rstat.version < dir->fnode.rstat.version)
- fnode.rstat.version = dir->fnode.rstat.version;
-
- if (dir->fnode.accounted_fragstat.version != dir->fnode.fragstat.version)
- stale_fragstat = 1;
- if (dir->fnode.accounted_rstat.version != dir->fnode.rstat.version)
- stale_rstat = 1;
-
- // sum accounted_*
- fnode.accounted_fragstat.add(dir->fnode.accounted_fragstat);
- fnode.accounted_rstat.add(dir->fnode.accounted_rstat, 1);
-
// merge state
state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
dir_auth = dir->dir_auth;
@@ -987,9 +984,14 @@ void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
inode->close_dirfrag(dir->get_frag());
}
- // offset accounted_* version by -1 if any source frag was stale
- fnode.accounted_fragstat.version = fnode.fragstat.version - stale_fragstat;
- fnode.accounted_rstat.version = fnode.rstat.version - stale_rstat;
+ // FIXME: merge dirty old rstat
+ fnode.rstat.version = rstat_version;
+ fnode.accounted_rstat = fnode.rstat;
+ fnode.accounted_rstat.add(rstatdiff);
+
+ fnode.fragstat.version = dirstat_version;
+ fnode.accounted_fragstat = fnode.fragstat;
+ fnode.accounted_fragstat.add(fragstatdiff);
init_fragment_pins();
}
@@ -1412,7 +1414,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
log_mark_dirty();
// mark complete, !fetching
- state_set(STATE_COMPLETE);
+ mark_complete();
state_clear(STATE_FETCHING);
auth_unpin(this);
@@ -1687,7 +1689,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
log_mark_dirty();
// mark complete, !fetching
- state_set(STATE_COMPLETE);
+ mark_complete();
state_clear(STATE_FETCHING);
auth_unpin(this);
@@ -1851,7 +1853,8 @@ CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
try_trim_snap_dentry(dn, *snaps))
continue;
- if (!dn->is_dirty())
+ if (!dn->is_dirty() &&
+ (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
continue; // skip clean dentries
if (dn->get_linkage()->is_null()) {
@@ -1995,7 +1998,8 @@ void CDir::_commit(version_t want)
unsigned max_write_size = cache->max_dir_commit_size;
if (is_complete() &&
- (num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio))) {
+ ((num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio)) ||
+ state_test(CDir::STATE_FRAGMENTING))) {
fnode.snap_purged_thru = realm->get_last_destroyed();
committed_dn = _commit_full(m, snaps, max_write_size);
} else {
diff --git a/src/mds/CDir.h b/src/mds/CDir.h
index 86da4e5dfd3..f131d834ca0 100644
--- a/src/mds/CDir.h
+++ b/src/mds/CDir.h
@@ -286,6 +286,7 @@ protected:
public:
CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth);
~CDir() {
+ remove_bloom();
g_num_dir--;
g_num_dirs++;
}
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index 7accc5a4dba..1fc57feea4d 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -458,13 +458,6 @@ frag_t CInode::pick_dirfrag(const string& dn)
bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
{
bool all = true;
- for (map<frag_t,CDir*>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) {
- if (fg.contains(p->first))
- ls.push_back(p->second);
- else
- all = false;
- }
- /*
list<frag_t> fglist;
dirfragtree.get_leaves_under(fg, fglist);
for (list<frag_t>::iterator p = fglist.begin();
@@ -474,7 +467,6 @@ bool CInode::get_dirfrags_under(frag_t fg, list<CDir*>& ls)
ls.push_back(dirfrags[*p]);
else
all = false;
- */
return all;
}
@@ -1776,7 +1768,7 @@ void CInode::finish_scatter_gather_update(int type)
CDir *dir = p->second;
dout(20) << fg << " " << *dir << dendl;
- bool update = dir->is_auth() && !dir->is_frozen();
+ bool update = dir->is_auth() && dir->get_version() != 0 && !dir->is_frozen();
fnode_t *pf = dir->get_projected_fnode();
if (update)
@@ -1857,7 +1849,7 @@ void CInode::finish_scatter_gather_update(int type)
CDir *dir = p->second;
dout(20) << fg << " " << *dir << dendl;
- bool update = dir->is_auth() && !dir->is_frozen();
+ bool update = dir->is_auth() && dir->get_version() != 0 && !dir->is_frozen();
fnode_t *pf = dir->get_projected_fnode();
if (update)
@@ -1944,7 +1936,7 @@ void CInode::finish_scatter_gather_update_accounted(int type, Mutation *mut, EMe
p != dirfrags.end();
++p) {
CDir *dir = p->second;
- if (!dir->is_auth() || dir->is_frozen())
+ if (!dir->is_auth() || dir->get_version() == 0 || dir->is_frozen())
continue;
if (type == CEPH_LOCK_IDFT)
@@ -2080,7 +2072,7 @@ void CInode::clear_ambiguous_auth()
// auth_pins
bool CInode::can_auth_pin() {
- if (is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
+ if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
return false;
if (parent)
return parent->can_auth_pin();
diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h
index 723267da116..624c3bc2395 100644
--- a/src/mds/LogSegment.h
+++ b/src/mds/LogSegment.h
@@ -56,6 +56,7 @@ class LogSegment {
map<int, hash_set<version_t> > pending_commit_tids; // mdstable
set<metareqid_t> uncommitted_masters;
+ set<dirfrag_t> uncommitted_fragments;
// client request ids
map<int, tid_t> last_client_tids;
diff --git a/src/mds/MDBalancer.cc b/src/mds/MDBalancer.cc
index 8d7f91d24a4..6a404c46974 100644
--- a/src/mds/MDBalancer.cc
+++ b/src/mds/MDBalancer.cc
@@ -351,7 +351,7 @@ void MDBalancer::do_fragmenting()
}
if (!split_queue.empty()) {
- dout(0) << "do_fragmenting " << split_queue.size() << " dirs marked for possible splitting" << dendl;
+ dout(10) << "do_fragmenting " << split_queue.size() << " dirs marked for possible splitting" << dendl;
set<dirfrag_t> q;
q.swap(split_queue);
@@ -364,13 +364,13 @@ void MDBalancer::do_fragmenting()
!dir->is_auth())
continue;
- dout(0) << "do_fragmenting splitting " << *dir << dendl;
+ dout(10) << "do_fragmenting splitting " << *dir << dendl;
mds->mdcache->split_dir(dir, g_conf->mds_bal_split_bits);
}
}
if (!merge_queue.empty()) {
- dout(0) << "do_fragmenting " << merge_queue.size() << " dirs marked for possible merging" << dendl;
+ dout(10) << "do_fragmenting " << merge_queue.size() << " dirs marked for possible merging" << dendl;
set<dirfrag_t> q;
q.swap(merge_queue);
@@ -384,7 +384,7 @@ void MDBalancer::do_fragmenting()
dir->get_frag() == frag_t()) // ok who's the joker?
continue;
- dout(0) << "do_fragmenting merging " << *dir << dendl;
+ dout(10) << "do_fragmenting merging " << *dir << dendl;
CInode *diri = dir->get_inode();
@@ -1007,7 +1007,7 @@ void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amoun
(v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
(v > g_conf->mds_bal_split_wr && type == META_POP_IWR)) &&
split_queue.count(dir->dirfrag()) == 0) {
- dout(1) << "hit_dir " << type << " pop is " << v << ", putting in split_queue: " << *dir << dendl;
+ dout(10) << "hit_dir " << type << " pop is " << v << ", putting in split_queue: " << *dir << dendl;
split_queue.insert(dir->dirfrag());
}
@@ -1015,7 +1015,7 @@ void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amoun
if (dir->get_frag() != frag_t() &&
(dir->get_num_head_items() < (unsigned)g_conf->mds_bal_merge_size) &&
merge_queue.count(dir->dirfrag()) == 0) {
- dout(1) << "hit_dir " << type << " pop is " << v << ", putting in merge_queue: " << *dir << dendl;
+ dout(10) << "hit_dir " << type << " pop is " << v << ", putting in merge_queue: " << *dir << dendl;
merge_queue.insert(dir->dirfrag());
}
}
diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc
index b1f1f0ad4c9..ae59c26ee13 100644
--- a/src/mds/MDCache.cc
+++ b/src/mds/MDCache.cc
@@ -1984,8 +1984,8 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob,
}
bool stop = false;
- if (!pin->is_auth() || pin->is_ambiguous_auth()) {
- dout(10) << "predirty_journal_parents !auth or ambig on " << *pin << dendl;
+ if (!pin->can_auth_pin() || pin->is_ambiguous_auth()) {
+ dout(10) << "predirty_journal_parents can't auth pin or ambig on " << *pin << dendl;
stop = true;
}
@@ -2010,8 +2010,7 @@ void MDCache::predirty_journal_parents(Mutation *mut, EMetaBlob *blob,
if (!stop &&
mut->wrlocks.count(&pin->nestlock) == 0 &&
- (!pin->can_auth_pin() ||
- !pin->versionlock.can_wrlock() || // make sure we can take versionlock, too
+ (!pin->versionlock.can_wrlock() || // make sure we can take versionlock, too
//true
!mds->locker->wrlock_start(&pin->nestlock, static_cast<MDRequest*>(mut), true) // can cast only because i'm passing nowait=true
)) { // ** do not initiate.. see above comment **
@@ -8663,9 +8662,9 @@ void MDCache::dispatch_request(MDRequest *mdr)
mds->server->dispatch_slave_request(mdr);
} else {
switch (mdr->internal_op) {
-
- // ...
-
+ case CEPH_MDS_OP_FRAGMENTDIR:
+ dispatch_fragment_dir(mdr);
+ break;
default:
assert(0);
}
@@ -10878,17 +10877,6 @@ public:
}
};
-
-bool MDCache::can_fragment_lock(CInode *diri)
-{
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(7) << "can_fragment: can't wrlock dftlock" << dendl;
- mds->locker->scatter_nudge(&diri->dirfragtreelock, NULL);
- return false;
- }
- return true;
-}
-
bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
{
if (mds->mdsmap->is_degraded()) {
@@ -10900,8 +10888,8 @@ bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
dout(7) << "can_fragment: i won't merge|split anything in stray" << dendl;
return false;
}
- if (diri->is_mdsdir() || diri->ino() == MDS_INO_CEPH) {
- dout(7) << "can_fragment: i won't fragment the mdsdir or .ceph" << dendl;
+ if (diri->is_mdsdir() || diri->is_stray() || diri->ino() == MDS_INO_CEPH) {
+ dout(7) << "can_fragment: i won't fragment the mdsdir or straydir or .ceph" << dendl;
return false;
}
@@ -10936,11 +10924,6 @@ void MDCache::split_dir(CDir *dir, int bits)
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- mds->balancer->queue_split(dir);
- return;
- }
C_GatherBuilder gather(g_ceph_context,
new C_MDC_FragmentFrozen(this, dirs, dir->get_frag(), bits));
@@ -10968,18 +10951,13 @@ void MDCache::merge_dir(CInode *diri, frag_t frag)
if (!can_fragment(diri, dirs))
return;
- if (!can_fragment_lock(diri)) {
- //dout(10) << " requeuing dir " << dir->dirfrag() << dendl;
- //mds->mdbalancer->split_queue.insert(dir->dirfrag());
- return;
- }
CDir *first = dirs.front();
int bits = first->get_frag().bits() - frag.bits();
dout(10) << " we are merginb by " << bits << " bits" << dendl;
C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentFrozen(this, dirs, frag, bits));
+ new C_MDC_FragmentFrozen(this, dirs, frag, -bits));
fragment_freeze_dirs(dirs, gather);
gather.activate();
@@ -11078,66 +11056,144 @@ void MDCache::fragment_unmark_unfreeze_dirs(list<CDir*>& dirs)
}
}
-class C_MDC_FragmentLoggedAndStored : public Context {
+class C_MDC_FragmentPrep : public Context {
MDCache *mdcache;
- Mutation *mut;
+ MDRequest *mdr;
+public:
+ C_MDC_FragmentPrep(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ virtual void finish(int r) {
+ mdcache->_fragment_logged(mdr);
+ }
+};
+
+class C_MDC_FragmentStore : public Context {
+ MDCache *mdcache;
+ MDRequest *mdr;
+public:
+ C_MDC_FragmentStore(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ virtual void finish(int r) {
+ mdcache->_fragment_stored(mdr);
+ }
+};
+
+class C_MDC_FragmentCommit : public Context {
+ MDCache *mdcache;
+ dirfrag_t basedirfrag;
+ list<CDir*> resultfrags;
+public:
+ C_MDC_FragmentCommit(MDCache *m, inodeno_t ino, frag_t f, list<CDir*>& l) :
+ mdcache(m), basedirfrag(ino, f) {
+ resultfrags.swap(l);
+ }
+ virtual void finish(int r) {
+ mdcache->_fragment_committed(basedirfrag, resultfrags);
+ }
+};
+
+class C_MDC_FragmentFinish : public Context {
+ MDCache *mdcache;
+ dirfrag_t basedirfrag;
list<CDir*> resultfrags;
- frag_t basefrag;
- int bits;
public:
- C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
- mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
+ C_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) :
+ mdcache(m), basedirfrag(f) {
+ resultfrags.swap(l);
+ }
virtual void finish(int r) {
- mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits);
+ mdcache->_fragment_finish(basedirfrag, resultfrags);
}
};
void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
{
- CInode *diri = dirs.front()->get_inode();
+ dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
+ << " on " << dirs.front()->get_inode() << dendl;
- if (bits > 0) {
+ if (bits > 0)
assert(dirs.size() == 1);
- } else {
- assert(bits < 0);
- }
+ else if (bits < 0)
+ assert(dirs.size() > 1);
+ else
+ assert(0);
- dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
- << " on " << *diri << dendl;
+ MDRequest *mdr = request_start_internal(CEPH_MDS_OP_FRAGMENTDIR);
+ fragment_info_t &info = fragment_requests[mdr->reqid];
+ info.basefrag = basefrag;
+ info.bits = bits;
+ info.dirs = dirs;
- // wrlock dirfragtreelock
- if (!diri->dirfragtreelock.can_wrlock(-1)) {
- dout(10) << " can't wrlock " << diri->dirfragtreelock << " on " << *diri << dendl;
- fragment_unmark_unfreeze_dirs(dirs);
- return;
+ dispatch_fragment_dir(mdr);
+}
+
+void MDCache::dispatch_fragment_dir(MDRequest *mdr)
+{
+ map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
+ assert(it != fragment_requests.end());
+ fragment_info_t &info = it->second;
+ CInode *diri = info.dirs.front()->get_inode();
+
+ dout(10) << "dispatch_fragment_dir " << info.resultfrags << " "
+ << info.basefrag << " bits " << info.bits << " on " << *diri << dendl;
+
+ // avoid freeze dir deadlock
+ if (!mdr->is_auth_pinned(diri)) {
+ if (!diri->can_auth_pin()) {
+ dout(10) << " can't auth_pin " << *diri << ", requeuing dir "
+ << info.dirs.front()->dirfrag() << dendl;
+ if (info.bits > 0)
+ mds->balancer->queue_split(info.dirs.front());
+ else
+ mds->balancer->queue_merge(info.dirs.front());
+ fragment_unmark_unfreeze_dirs(info.dirs);
+ fragment_requests.erase(mdr->reqid);
+ request_finish(mdr);
+ return;
+ }
+ mdr->auth_pin(diri);
}
- diri->dirfragtreelock.get_wrlock(true);
+ set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ wrlocks.insert(&diri->dirfragtreelock);
// prevent a racing gather on any other scatterlocks too
- diri->nestlock.get_wrlock(true);
- diri->filelock.get_wrlock(true);
+ wrlocks.insert(&diri->nestlock);
+ wrlocks.insert(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
+ mdr->ls = mds->mdlog->get_current_segment();
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(),
+ info.basefrag, info.bits);
+ mds->mdlog->start_entry(le);
+
+ for (list<CDir*>::iterator p = info.dirs.begin(); p != info.dirs.end(); ++p) {
+ CDir *dir = *p;
+ dirfrag_rollback rollback;
+ rollback.fnode = dir->fnode;
+ le->add_orig_frag(dir->get_frag(), &rollback);
+ }
// refragment
- list<CDir*> resultfrags;
list<Context*> waiters;
- adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false);
+ adjust_dir_fragments(diri, info.dirs, info.basefrag, info.bits,
+ info.resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
mds->queue_waiters(waiters);
- // journal
- Mutation *mut = new Mutation;
+ for (list<frag_t>::iterator p = le->orig_frags.begin(); p != le->orig_frags.end(); ++p)
+ assert(!diri->dirfragtree.is_leaf(*p));
- mut->ls = mds->mdlog->get_current_segment();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
-
- le->metablob.add_dir_context(*resultfrags.begin());
+ le->metablob.add_dir_context(*info.resultfrags.begin());
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
+ ++p) {
+ le->metablob.add_dir(*p, false);
+ }
// dft lock
mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock);
- mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
- mut->add_updated_lock(&diri->dirfragtreelock);
+ mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
+ mdr->add_updated_lock(&diri->dirfragtreelock);
/*
// filelock
@@ -11151,48 +11207,57 @@ void MDCache::fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits)
mut->add_updated_lock(&diri->nestlock);
*/
- // freeze, journal, and store resulting frags
- C_GatherBuilder gather(g_ceph_context,
- new C_MDC_FragmentLoggedAndStored(this, mut,
- resultfrags, basefrag, bits));
+ add_uncommitted_fragment(dirfrag_t(diri->ino(), info.basefrag), info.bits, le->orig_frags, mdr->ls);
+ mds->mdlog->submit_entry(le, new C_MDC_FragmentPrep(this, mdr));
+ mds->mdlog->flush();
+}
+
+void MDCache::_fragment_logged(MDRequest *mdr)
+{
+ map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
+ assert(it != fragment_requests.end());
+ fragment_info_t &info = it->second;
+ CInode *diri = info.resultfrags.front()->get_inode();
+
+ dout(10) << "fragment_logged " << info.resultfrags << " " << info.basefrag
+ << " bits " << info.bits << " on " << *diri << dendl;
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ // store resulting frags
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr));
+
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
- dout(10) << " result frag " << *dir << dendl;
- le->metablob.add_dir(dir, false);
+ dout(10) << " storing result frag " << *dir << dendl;
// freeze and store them too
+ dir->auth_pin(this);
dir->state_set(CDir::STATE_FRAGMENTING);
dir->commit(0, gather.new_sub(), true); // ignore authpinnability
}
- mds->mdlog->submit_entry(le, gather.new_sub());
- mds->mdlog->flush();
gather.activate();
}
-void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
+void MDCache::_fragment_stored(MDRequest *mdr)
{
- CInode *diri = resultfrags.front()->get_inode();
+ map<metareqid_t, fragment_info_t>::iterator it = fragment_requests.find(mdr->reqid);
+ assert(it != fragment_requests.end());
+ fragment_info_t &info = it->second;
+ CInode *diri = info.resultfrags.front()->get_inode();
- dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits
- << " on " << *diri << dendl;
-
- // journal commit
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits);
- mds->mdlog->start_entry(le);
- mds->mdlog->submit_entry(le);
+ dout(10) << "fragment_stored " << info.resultfrags << " " << info.basefrag
+ << " bits " << info.bits << " on " << *diri << dendl;
// tell peers
- CDir *first = *resultfrags.begin();
+ CDir *first = *info.resultfrags.begin();
for (map<int,int>::iterator p = first->replica_map.begin();
p != first->replica_map.end();
++p) {
if (mds->mdsmap->get_state(p->first) <= MDSMap::STATE_REJOIN)
continue;
- MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), basefrag, bits);
+ MMDSFragmentNotify *notify = new MMDSFragmentNotify(diri->ino(), info.basefrag, info.bits);
/*
// freshly replicate new dirs to peers
@@ -11203,26 +11268,15 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags
mds->send_message_mds(notify, p->first);
}
- mut->apply(); // mark scatterlock
- mds->locker->drop_locks(mut);
- mut->cleanup();
- delete mut;
-
- // drop dft wrlock
- bool need_issue = false;
- mds->locker->wrlock_finish(&diri->dirfragtreelock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->nestlock, NULL, &need_issue);
- mds->locker->wrlock_finish(&diri->filelock, NULL, &need_issue);
+ mdr->apply(); // mark scatterlock
+ mds->locker->drop_locks(mdr);
// unfreeze resulting frags
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
+ for (list<CDir*>::iterator p = info.resultfrags.begin();
+ p != info.resultfrags.end();
++p) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
-
- // unmark, unfreeze
- dir->state_clear(CDir::STATE_FRAGMENTING);
for (CDir::map_t::iterator p = dir->items.begin();
p != dir->items.end();
@@ -11233,13 +11287,72 @@ void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags
dn->put(CDentry::PIN_FRAGMENTING);
}
+ // unfreeze
dir->unfreeze_dir();
}
- if (need_issue)
- mds->locker->issue_caps(diri);
+ // journal commit
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT,
+ diri->ino(), info.basefrag, info.bits);
+ mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, diri->ino(), info.basefrag,
+ info.resultfrags));
+
+ fragment_requests.erase(it);
+ request_finish(mdr);
+}
+
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_committed " << basedirfrag << dendl;
+ map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
+ assert(it != uncommitted_fragments.end());
+ ufragment &uf = it->second;
+
+ // remove old frags
+ C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentFinish(this, basedirfrag, resultfrags));
+
+ SnapContext nullsnapc;
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+ for (list<frag_t>::iterator p = uf.old_frags.begin();
+ p != uf.old_frags.end();
+ ++p) {
+ object_t oid = CInode::get_object_name(basedirfrag.ino, *p, "");
+ ObjectOperation op;
+ if (*p == frag_t()) {
+ // backtrace object
+ dout(10) << " truncate orphan dirfrag " << oid << dendl;
+ op.truncate(0);
+ } else {
+ dout(10) << " removing orphan dirfrag " << oid << dendl;
+ op.remove();
+ }
+ mds->objecter->mutate(oid, oloc, op, nullsnapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+ }
+
+ assert(gather.has_subs());
+ gather.activate();
}
+void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+{
+ dout(10) << "fragment_finish " << basedirfrag << dendl;
+ map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
+ assert(it != uncommitted_fragments.end());
+ ufragment &uf = it->second;
+
+ // unmark & auth_unpin
+ for (list<CDir*>::iterator p = resultfrags.begin(); p != resultfrags.end(); ++p) {
+ (*p)->state_clear(CDir::STATE_FRAGMENTING);
+ (*p)->auth_unpin(this);
+ }
+
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH,
+ basedirfrag.ino, basedirfrag.frag, uf.bits);
+ mds->mdlog->start_submit_entry(le);
+
+ finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH);
+}
/* This function DOES put the passed message before returning */
void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
@@ -11285,26 +11398,140 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify)
notify->put();
}
+void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags,
+ LogSegment *ls, bufferlist *rollback)
+{
+ dout(10) << "add_uncommitted_fragment: base dirfrag " << basedirfrag << " bits " << bits << dendl;
+ assert(!uncommitted_fragments.count(basedirfrag));
+ ufragment& uf = uncommitted_fragments[basedirfrag];
+ uf.old_frags = old_frags;
+ uf.bits = bits;
+ uf.ls = ls;
+ ls->uncommitted_fragments.insert(basedirfrag);
+ if (rollback)
+ uf.rollback.swap(*rollback);
+}
+
+void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag, int op)
+{
+ dout(10) << "finish_uncommitted_fragments: base dirfrag " << basedirfrag
+ << " op " << EFragment::op_name(op) << dendl;
+ map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
+ if (it != uncommitted_fragments.end()) {
+ ufragment& uf = it->second;
+ if (op != EFragment::OP_FINISH && !uf.old_frags.empty()) {
+ uf.committed = true;
+ } else {
+ uf.ls->uncommitted_fragments.erase(basedirfrag);
+ mds->queue_waiters(uf.waiters);
+ uncommitted_fragments.erase(it);
+ }
+ }
+}
+
+void MDCache::rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags)
+{
+ dout(10) << "rollback_uncommitted_fragment: base dirfrag " << basedirfrag
+ << " old_frags (" << old_frags << ")" << dendl;
+ map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
+ if (it != uncommitted_fragments.end()) {
+ ufragment& uf = it->second;
+ if (!uf.old_frags.empty()) {
+ uf.old_frags.swap(old_frags);
+ uf.committed = true;
+ } else {
+ uf.ls->uncommitted_fragments.erase(basedirfrag);
+ uncommitted_fragments.erase(it);
+ }
+ }
+}
void MDCache::rollback_uncommitted_fragments()
{
dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
- for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+ for (map<dirfrag_t, ufragment>::iterator p = uncommitted_fragments.begin();
p != uncommitted_fragments.end();
++p) {
+ ufragment &uf = p->second;
CInode *diri = get_inode(p->first.ino);
assert(diri);
- dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+
+ if (uf.committed) {
+ list<CDir*> frags;
+ diri->get_dirfrags_under(p->first.frag, frags);
+ for (list<CDir*>::iterator q = frags.begin(); q != frags.end(); ++q) {
+ CDir *dir = *q;
+ dir->auth_pin(this);
+ dir->state_set(CDir::STATE_FRAGMENTING);
+ }
+ _fragment_committed(p->first, frags);
+ continue;
+ }
+
+ dout(10) << " rolling back " << p->first << " refragment by " << uf.bits << " bits" << dendl;
+
+ LogSegment *ls = mds->mdlog->get_current_segment();
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, uf.bits);
+ mds->mdlog->start_entry(le);
+
+ list<frag_t> old_frags;
+ diri->dirfragtree.get_leaves_under(p->first.frag, old_frags);
+
list<CDir*> resultfrags;
- list<Context*> waiters;
- adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+ if (uf.old_frags.empty()) {
+ // created by old format EFragment
+ list<Context*> waiters;
+ adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true);
+ } else {
+ bufferlist::iterator bp = uf.rollback.begin();
+ for (list<frag_t>::iterator q = uf.old_frags.begin(); q != uf.old_frags.end(); ++q) {
+ CDir *dir = force_dir_fragment(diri, *q);
+ resultfrags.push_back(dir);
+
+ dirfrag_rollback rollback;
+ ::decode(rollback, bp);
+
+ dir->set_version(rollback.fnode.version);
+ dir->fnode = rollback.fnode;
+
+ dir->_mark_dirty(ls);
+
+ if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) {
+ dout(10) << " dirty nestinfo on " << *dir << dendl;
+ mds->locker->mark_updated_scatterlock(&dir->inode->nestlock);
+ ls->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest);
+ dir->get_inode()->nestlock.mark_dirty();
+ }
+ if (!(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) {
+ dout(10) << " dirty fragstat on " << *dir << dendl;
+ mds->locker->mark_updated_scatterlock(&dir->inode->filelock);
+ ls->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir);
+ dir->get_inode()->filelock.mark_dirty();
+ }
+
+ le->add_orig_frag(dir->get_frag());
+ le->metablob.add_dir_context(dir);
+ le->metablob.add_dir(dir, true);
+ }
+ }
+
if (g_conf->mds_debug_frag)
diri->verify_dirfrags();
- EFragment *le = new EFragment(mds->mdlog, EFragment::OP_ROLLBACK, diri->ino(), p->first.frag, p->second);
- mds->mdlog->start_submit_entry(le);
+ for (list<frag_t>::iterator q = old_frags.begin(); q != old_frags.end(); ++q)
+ assert(!diri->dirfragtree.is_leaf(*q));
+
+ for (list<CDir*>::iterator q = resultfrags.begin(); q != resultfrags.end(); ++q) {
+ CDir *dir = *q;
+ dir->auth_pin(this);
+ dir->state_set(CDir::STATE_FRAGMENTING);
+ }
+
+ mds->mdlog->submit_entry(le);
+
+ uf.old_frags.swap(old_frags);
+ _fragment_committed(p->first, resultfrags);
}
- uncommitted_fragments.clear();
}
diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h
index 416c6454292..87b1098bb52 100644
--- a/src/mds/MDCache.h
+++ b/src/mds/MDCache.h
@@ -943,10 +943,26 @@ protected:
// -- fragmenting --
-public:
- set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations
-
private:
+ struct ufragment {
+ int bits;
+ bool committed;
+ LogSegment *ls;
+ list<Context*> waiters;
+ list<frag_t> old_frags;
+ bufferlist rollback;
+ ufragment() : bits(0), committed(false), ls(NULL) {}
+ };
+ map<dirfrag_t, ufragment> uncommitted_fragments;
+
+ struct fragment_info_t {
+ frag_t basefrag;
+ int bits;
+ list<CDir*> dirs;
+ list<CDir*> resultfrags;
+ };
+ map<metareqid_t, fragment_info_t> fragment_requests;
+
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& frags, list<Context*>& waiters, bool replay);
void adjust_dir_fragments(CInode *diri,
@@ -958,32 +974,39 @@ private:
CDir *force_dir_fragment(CInode *diri, frag_t fg);
void get_force_dirfrag_bound_set(vector<dirfrag_t>& dfs, set<CDir*>& bounds);
-
- friend class EFragment;
-
- bool can_fragment_lock(CInode *diri);
bool can_fragment(CInode *diri, list<CDir*>& dirs);
-
-public:
- void split_dir(CDir *dir, int byn);
- void merge_dir(CInode *diri, frag_t fg);
-
-private:
void fragment_freeze_dirs(list<CDir*>& dirs, C_GatherBuilder &gather);
void fragment_mark_and_complete(list<CDir*>& dirs);
void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits);
void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
- void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
-public:
- void rollback_uncommitted_fragments();
-private:
+ void dispatch_fragment_dir(MDRequest *mdr);
+ void _fragment_logged(MDRequest *mdr);
+ void _fragment_stored(MDRequest *mdr);
+ void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags);
+ void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags);
+ friend class EFragment;
friend class C_MDC_FragmentFrozen;
friend class C_MDC_FragmentMarking;
- friend class C_MDC_FragmentLoggedAndStored;
+ friend class C_MDC_FragmentPrep;
+ friend class C_MDC_FragmentStore;
+ friend class C_MDC_FragmentCommit;
+ friend class C_MDC_FragmentFinish;
void handle_fragment_notify(MMDSFragmentNotify *m);
+ void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag,
+ LogSegment *ls, bufferlist *rollback=NULL);
+ void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op);
+ void rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags);
+public:
+ void wait_for_uncommitted_fragment(dirfrag_t dirfrag, Context *c) {
+ assert(uncommitted_fragments.count(dirfrag));
+ uncommitted_fragments[dirfrag].waiters.push_back(c);
+ }
+ void split_dir(CDir *dir, int byn);
+ void merge_dir(CInode *diri, frag_t fg);
+ void rollback_uncommitted_fragments();
// -- updates --
//int send_inode_updates(CInode *in);
diff --git a/src/mds/Server.cc b/src/mds/Server.cc
index 41862847e27..0c500cdfe63 100644
--- a/src/mds/Server.cc
+++ b/src/mds/Server.cc
@@ -2735,13 +2735,15 @@ void Server::handle_client_readdir(MDRequest *mdr)
// which frag?
frag_t fg = (__u32)req->head.args.readdir.frag;
- dout(10) << " frag " << fg << dendl;
+ string offset_str = req->get_path2();
+ dout(10) << " frag " << fg << " offset '" << offset_str << "'" << dendl;
// does the frag exist?
if (diri->dirfragtree[fg.value()] != fg) {
- dout(10) << "frag " << fg << " doesn't appear in fragtree " << diri->dirfragtree << dendl;
- reply_request(mdr, -EAGAIN);
- return;
+ frag_t newfg = diri->dirfragtree[fg.value()];
+ dout(10) << " adjust frag " << fg << " -> " << newfg << " " << diri->dirfragtree << dendl;
+ fg = newfg;
+ offset_str.clear();
}
CDir *dir = try_open_auth_dirfrag(diri, fg, mdr);
@@ -2770,12 +2772,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
mdr->now = ceph_clock_now(g_ceph_context);
snapid_t snapid = mdr->snapid;
-
- string offset_str = req->get_path2();
- const char *offset = offset_str.length() ? offset_str.c_str() : 0;
-
- dout(10) << "snapid " << snapid << " offset '" << offset_str << "'" << dendl;
-
+ dout(10) << "snapid " << snapid << dendl;
// purge stale snap data?
const set<snapid_t> *snaps = 0;
@@ -2831,7 +2828,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
continue;
}
- if (offset && strcmp(dn->get_name().c_str(), offset) <= 0)
+ if (!offset_str.empty() && dn->get_name().compare(offset_str) <= 0)
continue;
CInode *in = dnl->get_inode();
@@ -2901,7 +2898,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
}
__u8 end = (it == dir->end());
- __u8 complete = (end && !offset); // FIXME: what purpose does this serve
+ __u8 complete = (end && offset_str.empty()); // FIXME: what purpose does this serve
// finish final blob
::encode(numfiles, dirbl);
diff --git a/src/mds/events/EFragment.h b/src/mds/events/EFragment.h
index bdbbd335e29..a9ddd548502 100644
--- a/src/mds/events/EFragment.h
+++ b/src/mds/events/EFragment.h
@@ -18,6 +18,14 @@
#include "../LogEvent.h"
#include "EMetaBlob.h"
+struct dirfrag_rollback {
+ fnode_t fnode;
+ dirfrag_rollback() { }
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& bl);
+};
+WRITE_CLASS_ENCODER(dirfrag_rollback)
+
class EFragment : public LogEvent {
public:
EMetaBlob metablob;
@@ -25,6 +33,8 @@ public:
inodeno_t ino;
frag_t basefrag;
__s32 bits; // positive for split (from basefrag), negative for merge (to basefrag)
+ list<frag_t> orig_frags;
+ bufferlist rollback;
EFragment() : LogEvent(EVENT_FRAGMENT) { }
EFragment(MDLog *mdlog, int o, inodeno_t i, frag_t bf, int b) :
@@ -39,17 +49,25 @@ public:
OP_PREPARE = 1,
OP_COMMIT = 2,
OP_ROLLBACK = 3,
- OP_ONESHOT = 4, // (legacy) PREPARE+COMMIT
+ OP_FINISH = 4, // finish deleting orphan dirfrags
+ OP_ONESHOT = 5, // (legacy) PREPARE+COMMIT
};
- const char *op_name(int o) const {
+ static const char *op_name(int o) {
switch (o) {
case OP_PREPARE: return "prepare";
case OP_COMMIT: return "commit";
case OP_ROLLBACK: return "rollback";
+ case OP_FINISH: return "finish";
default: return "???";
}
}
+ void add_orig_frag(frag_t df, dirfrag_rollback *drb=NULL) {
+ orig_frags.push_back(df);
+ if (drb)
+ ::encode(*drb, rollback);
+ }
+
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &bl);
void dump(Formatter *f) const;
diff --git a/src/mds/journal.cc b/src/mds/journal.cc
index aeff07eb905..41a79f9fb38 100644
--- a/src/mds/journal.cc
+++ b/src/mds/journal.cc
@@ -119,6 +119,14 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
}
+ // uncommitted fragments
+ for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
+ p != uncommitted_fragments.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub());
+ }
+
// nudge scatterlocks
for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
CInode *in = *p;
@@ -2381,7 +2389,7 @@ void EFragment::replay(MDS *mds)
list<CDir*> resultfrags;
list<Context*> waiters;
- pair<dirfrag_t,int> desc(dirfrag_t(ino,basefrag), bits);
+ list<frag_t> old_frags;
// in may be NULL if it wasn't in our cache yet. if it's a prepare
// it will be once we replay the metablob , but first we need to
@@ -2390,45 +2398,56 @@ void EFragment::replay(MDS *mds)
switch (op) {
case OP_PREPARE:
- mds->mdcache->uncommitted_fragments.insert(desc);
+ mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, _segment, &rollback);
// fall-thru
case OP_ONESHOT:
if (in)
mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
break;
- case OP_COMMIT:
- mds->mdcache->uncommitted_fragments.erase(desc);
- break;
-
case OP_ROLLBACK:
- if (mds->mdcache->uncommitted_fragments.count(desc)) {
- mds->mdcache->uncommitted_fragments.erase(desc);
- assert(in);
- mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
- } else {
- dout(10) << " no record of prepare for " << desc << dendl;
+ if (in) {
+ in->dirfragtree.get_leaves_under(basefrag, old_frags);
+ if (orig_frags.empty()) {
+ // old format EFragment
+ mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
+ } else {
+ for (list<frag_t>::iterator p = orig_frags.begin(); p != orig_frags.end(); ++p)
+ mds->mdcache->force_dir_fragment(in, *p);
+ }
}
+ mds->mdcache->rollback_uncommitted_fragment(dirfrag_t(ino, basefrag), old_frags);
+ break;
+
+ case OP_COMMIT:
+ case OP_FINISH:
+ mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag), op);
break;
+
+ default:
+ assert(0);
}
+
metablob.replay(mds, _segment);
if (in && g_conf->mds_debug_frag)
in->verify_dirfrags();
}
void EFragment::encode(bufferlist &bl) const {
- ENCODE_START(4, 4, bl);
+ ENCODE_START(5, 4, bl);
::encode(stamp, bl);
::encode(op, bl);
::encode(ino, bl);
::encode(basefrag, bl);
::encode(bits, bl);
::encode(metablob, bl);
+ ::encode(orig_frags, bl);
+ ::encode(rollback, bl);
ENCODE_FINISH(bl);
}
void EFragment::decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
if (struct_v >= 2)
::decode(stamp, bl);
if (struct_v >= 3)
@@ -2439,6 +2458,10 @@ void EFragment::decode(bufferlist::iterator &bl) {
::decode(basefrag, bl);
::decode(bits, bl);
::decode(metablob, bl);
+ if (struct_v >= 5) {
+ ::decode(orig_frags, bl);
+ ::decode(rollback, bl);
+ }
DECODE_FINISH(bl);
}
@@ -2462,7 +2485,19 @@ void EFragment::generate_test_instances(list<EFragment*>& ls)
ls.back()->bits = 5;
}
+void dirfrag_rollback::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(fnode, bl);
+ ENCODE_FINISH(bl);
+}
+void dirfrag_rollback::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(fnode, bl);
+ DECODE_FINISH(bl);
+}