diff options
author | Sage Weil <sage@newdream.net> | 2009-02-18 14:13:49 -0800 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2009-02-19 11:58:25 -0800 |
commit | 376ac88a3bd0c01b9ac90c8c908293f579e361fc (patch) | |
tree | 4ba344b87b2b59877a8ee3ea106d906640d60588 | |
parent | 77a5f24f21347d17011bf68a7db3b627e756ba75 (diff) | |
download | ceph-376ac88a3bd0c01b9ac90c8c908293f579e361fc.tar.gz |
kclient: async mds requests
Restructure the mds client to handle mds request asynchronously, so
that we can handle requests that are not managed by a blocking
thread.
-rw-r--r-- | src/TODO | 3 | ||||
-rw-r--r-- | src/kernel/caps.c | 6 | ||||
-rw-r--r-- | src/kernel/mds_client.c | 535 | ||||
-rw-r--r-- | src/kernel/mds_client.h | 9 | ||||
-rw-r--r-- | src/kernel/snap.c | 2 | ||||
-rw-r--r-- | src/kernel/super.c | 1 | ||||
-rw-r--r-- | src/kernel/super.h | 5 |
7 files changed, 294 insertions, 267 deletions
@@ -44,6 +44,9 @@ timer - each SafeTimer should just be its own thread. kernel client +- async mds requests +- pass dentries directly into mds requests; rebuild messages on message retry +- optional or no fill_trace? - flock, fnctl locks - async xattrs - avoid pinning inodes with expireable caps? diff --git a/src/kernel/caps.c b/src/kernel/caps.c index 99da64d104e..25340319b68 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -446,7 +446,7 @@ void ceph_remove_cap(struct ceph_cap *cap) static void __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5); + ci->i_hold_caps_until = round_jiffies(jiffies + CEPH_CAP_DELAY); dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode, ci->i_hold_caps_until); spin_lock(&mdsc->cap_delay_lock); @@ -691,7 +691,7 @@ retry: if (!session) { spin_unlock(&inode->i_lock); mutex_lock(&mdsc->mutex); - session = __ceph_get_mds_session(mdsc, mds); + session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); if (session) { dout(10, "inverting session/ino locks on %p\n", @@ -1625,7 +1625,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc, /* find session */ mutex_lock(&mdsc->mutex); - session = __ceph_get_mds_session(mdsc, mds); + session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); if (!session) { dout(10, "WTF, got cap but no session for mds%d\n", mds); diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 60624985a8f..6cb37ccdfc1 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -13,6 +13,8 @@ int ceph_debug_mdsc = -1; #include "messenger.h" #include "decode.h" +static void __wake_requests(struct ceph_mds_client *mdsc, + struct list_head *head); /* * address and send message to a given mds @@ -278,30 +280,39 @@ static const char *session_state_name(int s) } } +static struct ceph_mds_session *get_session(struct ceph_mds_session *s) +{ + dout(30, "get_session %p %d -> %d\n", s, + atomic_read(&s->s_ref), atomic_read(&s->s_ref)+1); + atomic_inc(&s->s_ref); + return s; +} + +void ceph_put_mds_session(struct ceph_mds_session *s) +{ + dout(30, "put_session %p %d -> %d\n", s, + atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); + if (atomic_dec_and_test(&s->s_ref)) + kfree(s); +} + /* * called under mdsc->mutex */ -struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc, - int mds) +struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, + int mds) { struct ceph_mds_session *session; if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) return NULL; session = mdsc->sessions[mds]; - dout(30, "get_mds_session %p %d -> %d\n", session, + dout(30, "lookup_mds_session %p %d -> %d\n", session, atomic_read(&session->s_ref), atomic_read(&session->s_ref)+1); - atomic_inc(&session->s_ref); + get_session(session); return session; } -void ceph_put_mds_session(struct ceph_mds_session *s) -{ - dout(30, "put_mds_session %p %d -> %d\n", s, - atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); - if (atomic_dec_and_test(&s->s_ref)) - kfree(s); -} /* * create+register a new session for given mds. @@ -326,7 +337,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, INIT_LIST_HEAD(&s->s_rdcaps); s->s_nr_caps = 0; atomic_set(&s->s_ref, 1); - init_completion(&s->s_completion); + INIT_LIST_HEAD(&s->s_waiting); INIT_LIST_HEAD(&s->s_unsafe); dout(10, "register_session mds%d\n", mds); @@ -610,43 +621,25 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) } /* - * Register request with mon_client for a new mds map. Wait until - * we get one (or time out). + * Register request with mon_client for a new mds map. * - * called under mdsc->mutex (dropped while we wait) + * called under mdsc->mutex. */ -static int wait_for_new_map(struct ceph_mds_client *mdsc, - unsigned long timeout) +static void request_new_map(struct ceph_mds_client *mdsc) { - u32 have; - int err = 0; - - dout(30, "wait_for_new_map enter\n"); - have = mdsc->mdsmap->m_epoch; + dout(30, "request_new_map enter\n"); mutex_unlock(&mdsc->mutex); - ceph_monc_request_mdsmap(&mdsc->client->monc, have+1); - if (timeout) { - err = wait_for_completion_timeout(&mdsc->map_waiters, timeout); - if (err > 0) - err = 0; - else if (err == 0) - err = -EIO; - } else { - wait_for_completion(&mdsc->map_waiters); - } + ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch+1); mutex_lock(&mdsc->mutex); - dout(30, "wait_for_new_map err %d\n", err); - return err; } /* - * open a new session with the given mds, and wait for mds ack. the - * timeout is optional. + * send session open request. * * called under mdsc->mutex */ -static int open_session(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, unsigned long timeout) +static int __open_session(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg; int mstate; @@ -656,45 +649,21 @@ static int open_session(struct ceph_mds_client *mdsc, /* wait for mds to go active? */ mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); dout(10, "open_session to mds%d, state %d\n", mds, mstate); - if (mstate < CEPH_MDS_STATE_ACTIVE) { - err = wait_for_new_map(mdsc, timeout); - if (err) - return err; - mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); - if (mstate < CEPH_MDS_STATE_ACTIVE) { - dout(30, "open_session mds%d now %d still not active\n", - mds, mstate); - return -EAGAIN; /* hrm, try again? */ - } - } - session->s_state = CEPH_MDS_SESSION_OPENING; session->s_renew_requested = jiffies; mutex_unlock(&mdsc->mutex); /* send connect message */ msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); - if (IS_ERR(msg)) - return PTR_ERR(msg); - ceph_send_msg_mds(mdsc, msg, mds); - - /* wait for session to open (or fail, or close) */ - dout(30, "open_session waiting on session %p\n", session); - if (timeout) { - err = wait_for_completion_timeout(&session->s_completion, - timeout); - if (err > 0) - err = 0; - else if (err == 0) - err = -EIO; - } else { - wait_for_completion(&session->s_completion); + if (IS_ERR(msg)) { + err = PTR_ERR(msg); + goto out; } - dout(30, "open_session done waiting on session %p, state %d\n", - session, session->s_state); + ceph_send_msg_mds(mdsc, msg, mds); +out: mutex_lock(&mdsc->mutex); - return err; + return 0; } /* @@ -752,38 +721,6 @@ static void wake_up_session_caps(struct ceph_mds_session *session) } /* - * Wake up threads with requests pending for @mds, so that they can - * resubmit their requests to a possibly different mds. If @all is set, - * wake up if their requests has been forwarded to @mds, too. - */ -static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) -{ - struct ceph_mds_request *reqs[10]; - u64 nexttid = 0; - int i, got; - - dout(20, "kick_requests mds%d\n", mds); - while (nexttid < mdsc->last_tid) { - got = radix_tree_gang_lookup(&mdsc->request_tree, - (void **)&reqs, nexttid, 10); - if (got == 0) - break; - nexttid = reqs[got-1]->r_tid + 1; - for (i = 0; i < got; i++) { - if (!reqs[i]->r_got_unsafe && - ((reqs[i]->r_session && - reqs[i]->r_session->s_mds == mds) || - (all && reqs[i]->r_fwd_session && - reqs[i]->r_fwd_session->s_mds == mds))) { - dout(10, " kicking tid %llu\n", reqs[i]->r_tid); - put_request_sessions(reqs[i]); - complete(&reqs[i]->r_completion); - } - } - } -} - -/* * Send periodic message to MDS renewing all currently held caps. The * ack will reset the expiration for all caps from this session. * @@ -928,94 +865,6 @@ void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc, } -/* - * handle a mds session control message - */ -void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, - struct ceph_msg *msg) -{ - u32 op; - u64 seq; - struct ceph_mds_session *session = NULL; - int mds; - struct ceph_mds_session_head *h = msg->front.iov_base; - - if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS) - return; - mds = le32_to_cpu(msg->hdr.src.name.num); - - /* decode */ - if (msg->front.iov_len != sizeof(*h)) - goto bad; - op = le32_to_cpu(h->op); - seq = le64_to_cpu(h->seq); - - mutex_lock(&mdsc->mutex); - session = __ceph_get_mds_session(mdsc, mds); - if (session && mdsc->mdsmap) - /* FIXME: this ttl calculation is generous */ - session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; - mutex_unlock(&mdsc->mutex); - - if (!session) { - if (op != CEPH_SESSION_OPEN) { - dout(10, "handle_session no session for mds%d\n", mds); - return; - } - dout(10, "handle_session creating session for mds%d\n", mds); - session = register_session(mdsc, mds); - } - - mutex_lock(&session->s_mutex); - - dout(2, "handle_session mds%d %s %p state %s seq %llu\n", - mds, ceph_session_op_name(op), session, - session_state_name(session->s_state), seq); - switch (op) { - case CEPH_SESSION_OPEN: - session->s_state = CEPH_MDS_SESSION_OPEN; - renewed_caps(mdsc, session, 0); - complete(&session->s_completion); - if (mdsc->stopping) - __close_session(mdsc, session); - break; - - case CEPH_SESSION_RENEWCAPS: - renewed_caps(mdsc, session, 1); - break; - - case CEPH_SESSION_CLOSE: - unregister_session(mdsc, mds); - remove_session_caps(session); - complete(&session->s_completion); /* for good measure */ - complete(&mdsc->session_close_waiters); - kick_requests(mdsc, mds, 0); /* cur only */ - break; - - case CEPH_SESSION_STALE: - dout(1, "mds%d caps went stale, renewing\n", session->s_mds); - spin_lock(&session->s_cap_lock); - session->s_cap_gen++; - session->s_cap_ttl = 0; - spin_unlock(&session->s_cap_lock); - send_renew_caps(mdsc, session); - break; - - default: - derr(0, "bad session op %d from mds%d\n", op, mds); - WARN_ON(1); - } - - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - return; - -bad: - derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds, - (int)msg->front.iov_len, (int)sizeof(*h)); - return; -} - /* * create an mds request and message. @@ -1136,27 +985,20 @@ static void __prepare_send_request(struct ceph_mds_client *mdsc, } /* - * Synchrously perform an mds request. Take care of all of the - * session setup, forwarding, retry details. + * send request, or put it on the appropriate wait list. */ -int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, - struct inode *listener, - struct ceph_mds_request *req) +static int __do_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) { struct ceph_mds_session *session = NULL; - int err; int mds = -1; - int safe = 0; + int err = -EAGAIN; - dout(30, "do_request on %p\n", req); + if (req->r_reply) + goto out; - mutex_lock(&mdsc->mutex); - __register_request(mdsc, listener, req); -retry: if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { - if (session && session->s_state == CEPH_MDS_SESSION_OPENING) - unregister_session(mdsc, mds); dout(10, "do_request timed out\n"); err = -EIO; goto finish; @@ -1166,82 +1008,146 @@ retry: if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { dout(30, "do_request no mds or not active, waiting for map\n"); - err = wait_for_new_map(mdsc, req->r_timeout); - if (err) - goto finish; - goto retry; + list_add(&req->r_wait, &mdsc->waiting_for_map); + request_new_map(mdsc); + goto out; } - /* get session */ - session = __ceph_get_mds_session(mdsc, mds); + /* get, open session */ + session = __ceph_lookup_mds_session(mdsc, mds); if (!session) session = register_session(mdsc, mds); dout(30, "do_request mds%d session %p state %s\n", mds, session, session_state_name(session->s_state)); - - /* open? */ - err = 0; - if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - err = open_session(mdsc, session, req->r_timeout); - if (session->s_state != CEPH_MDS_SESSION_OPEN || - err == -EAGAIN) { - dout(30, "do_request session %p not open, state=%s\n", - session, session_state_name(session->s_state)); - ceph_put_mds_session(session); - goto retry; + if (session->s_state != CEPH_MDS_SESSION_OPEN) { + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + list_add(&req->r_wait, &session->s_waiting); + request_new_map(mdsc); + goto out_session; } - BUG_ON(req->r_session); - req->r_session = session; /* request now owns the session ref */ + /* send request */ + req->r_session = get_session(session); req->r_resend_mds = -1; /* forget any previous mds hint */ if (req->r_request_started == 0) /* note request start time */ req->r_request_started = jiffies; __prepare_send_request(mdsc, req); - mutex_unlock(&mdsc->mutex); + mutex_unlock(&mdsc->mutex); ceph_msg_get(req->r_request); ceph_send_msg_mds(mdsc, req->r_request, mds); + mutex_lock(&mdsc->mutex); - if (req->r_timeout) { - err = wait_for_completion_timeout(&req->r_completion, - req->r_timeout); - if (err > 0) - err = 0; - else if (err == 0) - err = -EIO; /* timed out */ - } else { - err = 0; - wait_for_completion(&req->r_completion); + err = 0; +out_session: + ceph_put_mds_session(session); +out: + return err; + +finish: + req->r_reply = ERR_PTR(err); + complete(&req->r_completion); + goto out; +} + +static void __wake_requests(struct ceph_mds_client *mdsc, + struct list_head *head) +{ + struct list_head *p, *n; + + list_for_each_safe(p, n, head) { + struct ceph_mds_request *req = + list_entry(p, struct ceph_mds_request, r_wait); + list_del_init(&req->r_wait); + __do_request(mdsc, req); } +} + +/* + * Wake up threads with requests pending for @mds, so that they can + * resubmit their requests to a possibly different mds. If @all is set, + * wake up if their requests has been forwarded to @mds, too. + */ +static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) +{ + struct ceph_mds_request *reqs[10]; + u64 nexttid = 0; + int i, got; + + dout(20, "kick_requests mds%d\n", mds); + while (nexttid < mdsc->last_tid) { + got = radix_tree_gang_lookup(&mdsc->request_tree, + (void **)&reqs, nexttid, 10); + if (got == 0) + break; + nexttid = reqs[got-1]->r_tid + 1; + for (i = 0; i < got; i++) { + if (!reqs[i]->r_got_unsafe && + ((reqs[i]->r_session && + reqs[i]->r_session->s_mds == mds) || + (all && reqs[i]->r_fwd_session && + reqs[i]->r_fwd_session->s_mds == mds))) { + dout(10, " kicking tid %llu\n", reqs[i]->r_tid); + put_request_sessions(reqs[i]); + __do_request(mdsc, reqs[i]); + } + } + } +} + + +/* + * Synchrously perform an mds request. Take care of all of the + * session setup, forwarding, retry details. + */ +int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, + struct inode *listener, + struct ceph_mds_request *req) +{ + int err; + int safe = 0; + + dout(30, "do_request on %p\n", req); + mutex_lock(&mdsc->mutex); - if (req->r_reply == NULL && !err) { - put_request_sessions(req); - goto retry; + __register_request(mdsc, listener, req); + __do_request(mdsc, req); + + if (!req->r_reply) { + mutex_unlock(&mdsc->mutex); + if (req->r_timeout) { + err = wait_for_completion_timeout(&req->r_completion, + req->r_timeout); + if (err > 0) + err = 0; + else if (err == 0) + req->r_reply = ERR_PTR(-EIO); + } else { + wait_for_completion(&req->r_completion); + } + mutex_lock(&mdsc->mutex); } + if (IS_ERR(req->r_reply)) { err = PTR_ERR(req->r_reply); req->r_reply = NULL; - } - if (!err) - /* all is well, reply has been parsed. */ + safe = 1; + } else { err = le32_to_cpu(req->r_reply_info.head->result); - if (req) safe = req->r_reply_info.head->safe; -finish: - if (safe) { - complete(&req->r_safe_completion); - __unregister_request(mdsc, req); } - mutex_unlock(&mdsc->mutex); - if (safe) { + complete(&req->r_safe_completion); + __unregister_request(mdsc, req); ceph_msg_put(req->r_request); req->r_request = NULL; } + mutex_unlock(&mdsc->mutex); dout(30, "do_request %p done, result %d\n", req, err); return err; @@ -1313,7 +1219,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) if (req->r_session && req->r_session->s_mds != mds) { ceph_put_mds_session(req->r_session); - req->r_session = __ceph_get_mds_session(mdsc, mds); + req->r_session = __ceph_lookup_mds_session(mdsc, mds); } if (req->r_session == NULL) { derr(1, "got reply on %llu, but no session for mds%d\n", @@ -1433,8 +1339,8 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, from_mds); req->r_num_fwd = fwd_seq; put_request_sessions(req); - req->r_session = __ceph_get_mds_session(mdsc, next_mds); - req->r_fwd_session = __ceph_get_mds_session(mdsc, from_mds); + req->r_session = __ceph_lookup_mds_session(mdsc, next_mds); + req->r_fwd_session = __ceph_lookup_mds_session(mdsc, from_mds); } else { /* no, resend. */ /* forward race not possible; mds would drop */ @@ -1442,7 +1348,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; put_request_sessions(req); - complete(&req->r_completion); /* wake up do_request */ + __do_request(mdsc, req); } ceph_mdsc_put_request(req); out: @@ -1453,6 +1359,100 @@ bad: derr(0, "problem decoding message, err=%d\n", err); } +/* + * handle a mds session control message + */ +void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, + struct ceph_msg *msg) +{ + u32 op; + u64 seq; + struct ceph_mds_session *session = NULL; + int mds; + struct ceph_mds_session_head *h = msg->front.iov_base; + int wake = 0; + + if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS) + return; + mds = le32_to_cpu(msg->hdr.src.name.num); + + /* decode */ + if (msg->front.iov_len != sizeof(*h)) + goto bad; + op = le32_to_cpu(h->op); + seq = le64_to_cpu(h->seq); + + mutex_lock(&mdsc->mutex); + session = __ceph_lookup_mds_session(mdsc, mds); + if (session && mdsc->mdsmap) + /* FIXME: this ttl calculation is generous */ + session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; + mutex_unlock(&mdsc->mutex); + + if (!session) { + if (op != CEPH_SESSION_OPEN) { + dout(10, "handle_session no session for mds%d\n", mds); + return; + } + dout(10, "handle_session creating session for mds%d\n", mds); + session = register_session(mdsc, mds); + } + + mutex_lock(&session->s_mutex); + + dout(2, "handle_session mds%d %s %p state %s seq %llu\n", + mds, ceph_session_op_name(op), session, + session_state_name(session->s_state), seq); + switch (op) { + case CEPH_SESSION_OPEN: + session->s_state = CEPH_MDS_SESSION_OPEN; + renewed_caps(mdsc, session, 0); + wake = 1; + if (mdsc->stopping) + __close_session(mdsc, session); + break; + + case CEPH_SESSION_RENEWCAPS: + renewed_caps(mdsc, session, 1); + break; + + case CEPH_SESSION_CLOSE: + unregister_session(mdsc, mds); + remove_session_caps(session); + wake = 1; /* for good measure */ + complete(&mdsc->session_close_waiters); + kick_requests(mdsc, mds, 0); /* cur only */ + break; + + case CEPH_SESSION_STALE: + dout(1, "mds%d caps went stale, renewing\n", session->s_mds); + spin_lock(&session->s_cap_lock); + session->s_cap_gen++; + session->s_cap_ttl = 0; + spin_unlock(&session->s_cap_lock); + send_renew_caps(mdsc, session); + break; + + default: + derr(0, "bad session op %d from mds%d\n", op, mds); + WARN_ON(1); + } + + mutex_unlock(&session->s_mutex); + if (wake) { + mutex_lock(&mdsc->mutex); + __wake_requests(mdsc, &session->s_waiting); + mutex_unlock(&mdsc->mutex); + } + ceph_put_mds_session(session); + return; + +bad: + derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds, + (int)msg->front.iov_len, (int)sizeof(*h)); + return; +} + /* * called under session->mutex. @@ -1507,7 +1507,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) dout(1, "reconnect to recovering mds%d\n", mds); /* find session */ - session = __ceph_get_mds_session(mdsc, mds); + session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); /* drop lock for duration */ if (session) { @@ -1654,7 +1654,7 @@ send: if (session) { session->s_state = CEPH_MDS_SESSION_OPEN; - complete(&session->s_completion); + __wake_requests(mdsc, &session->s_waiting); } out: @@ -1730,7 +1730,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, if (s->s_state == CEPH_MDS_SESSION_OPENING) { /* the session never opened, just close it * out now */ - complete(&s->s_completion); + __wake_requests(mdsc, &s->s_waiting); unregister_session(mdsc, i); } @@ -1792,7 +1792,7 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg) /* find session */ mutex_lock(&mdsc->mutex); - session = __ceph_get_mds_session(mdsc, mds); + session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); if (!session) { derr(0, "WTF, got lease but no session for mds%d\n", mds); @@ -1946,13 +1946,14 @@ static void delayed_work(struct work_struct *work) mdsc->last_renew_caps = jiffies; for (i = 0; i < mdsc->max_sessions; i++) { - struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i); + struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); if (s == NULL) continue; if (s->s_state == CEPH_MDS_SESSION_CLOSING) { dout(10, "resending session close request for mds%d\n", s->s_mds); request_close_session(mdsc, s); + ceph_put_mds_session(s); continue; } if (s->s_ttl && time_after(jiffies, s->s_ttl)) { @@ -1990,8 +1991,8 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) mdsc->client = client; mutex_init(&mdsc->mutex); mdsc->mdsmap = NULL; /* none yet */ - init_completion(&mdsc->map_waiters); init_completion(&mdsc->session_close_waiters); + INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->sessions = NULL; mdsc->max_sessions = 0; mdsc->stopping = 0; @@ -2019,7 +2020,7 @@ static void drop_leases(struct ceph_mds_client *mdsc) dout(10, "drop_leases\n"); mutex_lock(&mdsc->mutex); for (i = 0; i < mdsc->max_sessions; i++) { - struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i); + struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); if (!s) continue; mutex_unlock(&mdsc->mutex); @@ -2049,7 +2050,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) struct ceph_mds_session *session; int i; int n; - unsigned long started, timeout = 60 * HZ; + unsigned long started, timeout = CEPH_MOUNT_TIMEOUT; struct ceph_client *client = mdsc->client; dout(10, "close_sessions\n"); @@ -2085,7 +2086,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) dout(10, "closing sessions\n"); n = 0; for (i = 0; i < mdsc->max_sessions; i++) { - session = __ceph_get_mds_session(mdsc, i); + session = __ceph_lookup_mds_session(mdsc, i); if (!session) continue; mutex_unlock(&mdsc->mutex); @@ -2109,6 +2110,21 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) mutex_lock(&mdsc->mutex); } + /* tear down remaining sessions */ + for (i = 0; i < mdsc->max_sessions; i++) { + if (mdsc->sessions[i]) { + session = get_session(mdsc->sessions[i]); + unregister_session(mdsc, i); + mutex_unlock(&mdsc->mutex); + mutex_lock(&session->s_mutex); + remove_session_caps(session); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + mutex_lock(&mdsc->mutex); + } + } + + WARN_ON(!list_empty(&mdsc->cap_delay_list)); mutex_unlock(&mdsc->mutex); @@ -2197,9 +2213,10 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) mdsc->mdsmap = newmap; /* first mds map */ } + __wake_requests(mdsc, &mdsc->waiting_for_map); + mutex_unlock(&mdsc->mutex); schedule_delayed(mdsc); - complete(&mdsc->map_waiters); return; bad_unlock: diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index 7f93791fe73..b65bb7a8506 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -120,7 +120,7 @@ struct ceph_mds_session { struct list_head s_rdcaps; /* just the readonly caps */ int s_nr_caps; atomic_t s_ref; - struct completion s_completion; + struct list_head s_waiting; /* waiting requests */ struct list_head s_unsafe; /* unsafe requests */ }; @@ -149,6 +149,8 @@ struct ceph_mds_request { unsigned long r_request_started; /* start time for mds request only, used to measure lease durations */ + struct list_head r_wait; + /* for choosing which mds to send this request to */ struct dentry *r_direct_dentry; int r_direct_mode; @@ -189,7 +191,8 @@ struct ceph_mds_client { struct mutex mutex; /* all nested structures */ struct ceph_mdsmap *mdsmap; - struct completion map_waiters, session_close_waiters; + struct completion session_close_waiters; + struct list_head waiting_for_map; struct ceph_mds_session **sessions; /* NULL for mds if no session */ int max_sessions; /* len of s_mds_sessions */ @@ -219,7 +222,7 @@ struct ceph_mds_client { extern const char *ceph_mds_op_name(int op); -extern struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *, int mds); +extern struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *, int mds); inline static struct ceph_mds_session * ceph_get_mds_session(struct ceph_mds_session *s) diff --git a/src/kernel/snap.c b/src/kernel/snap.c index 89e94f8666b..888e5772d1b 100644 --- a/src/kernel/snap.c +++ b/src/kernel/snap.c @@ -718,7 +718,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, /* find session */ mutex_lock(&mdsc->mutex); - session = __ceph_get_mds_session(mdsc, mds); + session = __ceph_lookup_mds_session(mdsc, mds); mutex_unlock(&mdsc->mutex); if (!session) { dout(10, "WTF, got snap but no session for mds%d\n", mds); diff --git a/src/kernel/super.c b/src/kernel/super.c index 9560c004184..5e2d4415d3f 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -1050,6 +1050,7 @@ static int ceph_get_sb(struct file_system_type *fs_type, return 0; out_splat: + ceph_mdsc_close_sessions(&client->mdsc); up_write(&sb->s_umount); deactivate_super(sb); goto out_final; diff --git a/src/kernel/super.h b/src/kernel/super.h index 8251811dd7f..136ed6e183d 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -26,6 +26,9 @@ #define CEPH_BLOCK_SHIFT 20 /* 1 MB */ #define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT) +#define CEPH_MOUNT_TIMEOUT (60*HZ) +#define CEPH_CAP_DELAY (5*HZ) /* cap release delay */ + /* * subtract jiffies */ @@ -131,7 +134,7 @@ static inline struct ceph_client *ceph_client(struct super_block *sb) * capabilities. * * Each cap is referenced by the inode's i_caps tree and by a per-mds - * session capability list. + * session capability list(s). */ struct ceph_cap { struct ceph_inode_info *ci; |