summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-02-18 14:13:49 -0800
committerSage Weil <sage@newdream.net>2009-02-19 11:58:25 -0800
commit376ac88a3bd0c01b9ac90c8c908293f579e361fc (patch)
tree4ba344b87b2b59877a8ee3ea106d906640d60588
parent77a5f24f21347d17011bf68a7db3b627e756ba75 (diff)
downloadceph-376ac88a3bd0c01b9ac90c8c908293f579e361fc.tar.gz
kclient: async mds requests
Restructure the mds client to handle mds request asynchronously, so that we can handle requests that are not managed by a blocking thread.
-rw-r--r--src/TODO3
-rw-r--r--src/kernel/caps.c6
-rw-r--r--src/kernel/mds_client.c535
-rw-r--r--src/kernel/mds_client.h9
-rw-r--r--src/kernel/snap.c2
-rw-r--r--src/kernel/super.c1
-rw-r--r--src/kernel/super.h5
7 files changed, 294 insertions, 267 deletions
diff --git a/src/TODO b/src/TODO
index cbddec519c2..b916dcd76fa 100644
--- a/src/TODO
+++ b/src/TODO
@@ -44,6 +44,9 @@ timer
- each SafeTimer should just be its own thread.
kernel client
+- async mds requests
+- pass dentries directly into mds requests; rebuild messages on message retry
+- optional or no fill_trace?
- flock, fnctl locks
- async xattrs
- avoid pinning inodes with expireable caps?
diff --git a/src/kernel/caps.c b/src/kernel/caps.c
index 99da64d104e..25340319b68 100644
--- a/src/kernel/caps.c
+++ b/src/kernel/caps.c
@@ -446,7 +446,7 @@ void ceph_remove_cap(struct ceph_cap *cap)
static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
- ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+ ci->i_hold_caps_until = round_jiffies(jiffies + CEPH_CAP_DELAY);
dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
ci->i_hold_caps_until);
spin_lock(&mdsc->cap_delay_lock);
@@ -691,7 +691,7 @@ retry:
if (!session) {
spin_unlock(&inode->i_lock);
mutex_lock(&mdsc->mutex);
- session = __ceph_get_mds_session(mdsc, mds);
+ session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (session) {
dout(10, "inverting session/ino locks on %p\n",
@@ -1625,7 +1625,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
/* find session */
mutex_lock(&mdsc->mutex);
- session = __ceph_get_mds_session(mdsc, mds);
+ session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout(10, "WTF, got cap but no session for mds%d\n", mds);
diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c
index 60624985a8f..6cb37ccdfc1 100644
--- a/src/kernel/mds_client.c
+++ b/src/kernel/mds_client.c
@@ -13,6 +13,8 @@ int ceph_debug_mdsc = -1;
#include "messenger.h"
#include "decode.h"
+static void __wake_requests(struct ceph_mds_client *mdsc,
+ struct list_head *head);
/*
* address and send message to a given mds
@@ -278,30 +280,39 @@ static const char *session_state_name(int s)
}
}
+static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+{
+ dout(30, "get_session %p %d -> %d\n", s,
+ atomic_read(&s->s_ref), atomic_read(&s->s_ref)+1);
+ atomic_inc(&s->s_ref);
+ return s;
+}
+
+void ceph_put_mds_session(struct ceph_mds_session *s)
+{
+ dout(30, "put_session %p %d -> %d\n", s,
+ atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
+ if (atomic_dec_and_test(&s->s_ref))
+ kfree(s);
+}
+
/*
* called under mdsc->mutex
*/
-struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc,
- int mds)
+struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
+ int mds)
{
struct ceph_mds_session *session;
if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
return NULL;
session = mdsc->sessions[mds];
- dout(30, "get_mds_session %p %d -> %d\n", session,
+ dout(30, "lookup_mds_session %p %d -> %d\n", session,
atomic_read(&session->s_ref), atomic_read(&session->s_ref)+1);
- atomic_inc(&session->s_ref);
+ get_session(session);
return session;
}
-void ceph_put_mds_session(struct ceph_mds_session *s)
-{
- dout(30, "put_mds_session %p %d -> %d\n", s,
- atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
- if (atomic_dec_and_test(&s->s_ref))
- kfree(s);
-}
/*
* create+register a new session for given mds.
@@ -326,7 +337,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_rdcaps);
s->s_nr_caps = 0;
atomic_set(&s->s_ref, 1);
- init_completion(&s->s_completion);
+ INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
dout(10, "register_session mds%d\n", mds);
@@ -610,43 +621,25 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
}
/*
- * Register request with mon_client for a new mds map. Wait until
- * we get one (or time out).
+ * Register request with mon_client for a new mds map.
*
- * called under mdsc->mutex (dropped while we wait)
+ * called under mdsc->mutex.
*/
-static int wait_for_new_map(struct ceph_mds_client *mdsc,
- unsigned long timeout)
+static void request_new_map(struct ceph_mds_client *mdsc)
{
- u32 have;
- int err = 0;
-
- dout(30, "wait_for_new_map enter\n");
- have = mdsc->mdsmap->m_epoch;
+ dout(30, "request_new_map enter\n");
mutex_unlock(&mdsc->mutex);
- ceph_monc_request_mdsmap(&mdsc->client->monc, have+1);
- if (timeout) {
- err = wait_for_completion_timeout(&mdsc->map_waiters, timeout);
- if (err > 0)
- err = 0;
- else if (err == 0)
- err = -EIO;
- } else {
- wait_for_completion(&mdsc->map_waiters);
- }
+ ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch+1);
mutex_lock(&mdsc->mutex);
- dout(30, "wait_for_new_map err %d\n", err);
- return err;
}
/*
- * open a new session with the given mds, and wait for mds ack. the
- * timeout is optional.
+ * send session open request.
*
* called under mdsc->mutex
*/
-static int open_session(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session, unsigned long timeout)
+static int __open_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
struct ceph_msg *msg;
int mstate;
@@ -656,45 +649,21 @@ static int open_session(struct ceph_mds_client *mdsc,
/* wait for mds to go active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
dout(10, "open_session to mds%d, state %d\n", mds, mstate);
- if (mstate < CEPH_MDS_STATE_ACTIVE) {
- err = wait_for_new_map(mdsc, timeout);
- if (err)
- return err;
- mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
- if (mstate < CEPH_MDS_STATE_ACTIVE) {
- dout(30, "open_session mds%d now %d still not active\n",
- mds, mstate);
- return -EAGAIN; /* hrm, try again? */
- }
- }
-
session->s_state = CEPH_MDS_SESSION_OPENING;
session->s_renew_requested = jiffies;
mutex_unlock(&mdsc->mutex);
/* send connect message */
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
- if (IS_ERR(msg))
- return PTR_ERR(msg);
- ceph_send_msg_mds(mdsc, msg, mds);
-
- /* wait for session to open (or fail, or close) */
- dout(30, "open_session waiting on session %p\n", session);
- if (timeout) {
- err = wait_for_completion_timeout(&session->s_completion,
- timeout);
- if (err > 0)
- err = 0;
- else if (err == 0)
- err = -EIO;
- } else {
- wait_for_completion(&session->s_completion);
+ if (IS_ERR(msg)) {
+ err = PTR_ERR(msg);
+ goto out;
}
- dout(30, "open_session done waiting on session %p, state %d\n",
- session, session->s_state);
+ ceph_send_msg_mds(mdsc, msg, mds);
+out:
mutex_lock(&mdsc->mutex);
- return err;
+ return 0;
}
/*
@@ -752,38 +721,6 @@ static void wake_up_session_caps(struct ceph_mds_session *session)
}
/*
- * Wake up threads with requests pending for @mds, so that they can
- * resubmit their requests to a possibly different mds. If @all is set,
- * wake up if their requests has been forwarded to @mds, too.
- */
-static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
-{
- struct ceph_mds_request *reqs[10];
- u64 nexttid = 0;
- int i, got;
-
- dout(20, "kick_requests mds%d\n", mds);
- while (nexttid < mdsc->last_tid) {
- got = radix_tree_gang_lookup(&mdsc->request_tree,
- (void **)&reqs, nexttid, 10);
- if (got == 0)
- break;
- nexttid = reqs[got-1]->r_tid + 1;
- for (i = 0; i < got; i++) {
- if (!reqs[i]->r_got_unsafe &&
- ((reqs[i]->r_session &&
- reqs[i]->r_session->s_mds == mds) ||
- (all && reqs[i]->r_fwd_session &&
- reqs[i]->r_fwd_session->s_mds == mds))) {
- dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
- put_request_sessions(reqs[i]);
- complete(&reqs[i]->r_completion);
- }
- }
- }
-}
-
-/*
* Send periodic message to MDS renewing all currently held caps. The
* ack will reset the expiration for all caps from this session.
*
@@ -928,94 +865,6 @@ void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
}
-/*
- * handle a mds session control message
- */
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg)
-{
- u32 op;
- u64 seq;
- struct ceph_mds_session *session = NULL;
- int mds;
- struct ceph_mds_session_head *h = msg->front.iov_base;
-
- if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
- return;
- mds = le32_to_cpu(msg->hdr.src.name.num);
-
- /* decode */
- if (msg->front.iov_len != sizeof(*h))
- goto bad;
- op = le32_to_cpu(h->op);
- seq = le64_to_cpu(h->seq);
-
- mutex_lock(&mdsc->mutex);
- session = __ceph_get_mds_session(mdsc, mds);
- if (session && mdsc->mdsmap)
- /* FIXME: this ttl calculation is generous */
- session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
- mutex_unlock(&mdsc->mutex);
-
- if (!session) {
- if (op != CEPH_SESSION_OPEN) {
- dout(10, "handle_session no session for mds%d\n", mds);
- return;
- }
- dout(10, "handle_session creating session for mds%d\n", mds);
- session = register_session(mdsc, mds);
- }
-
- mutex_lock(&session->s_mutex);
-
- dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
- mds, ceph_session_op_name(op), session,
- session_state_name(session->s_state), seq);
- switch (op) {
- case CEPH_SESSION_OPEN:
- session->s_state = CEPH_MDS_SESSION_OPEN;
- renewed_caps(mdsc, session, 0);
- complete(&session->s_completion);
- if (mdsc->stopping)
- __close_session(mdsc, session);
- break;
-
- case CEPH_SESSION_RENEWCAPS:
- renewed_caps(mdsc, session, 1);
- break;
-
- case CEPH_SESSION_CLOSE:
- unregister_session(mdsc, mds);
- remove_session_caps(session);
- complete(&session->s_completion); /* for good measure */
- complete(&mdsc->session_close_waiters);
- kick_requests(mdsc, mds, 0); /* cur only */
- break;
-
- case CEPH_SESSION_STALE:
- dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
- spin_lock(&session->s_cap_lock);
- session->s_cap_gen++;
- session->s_cap_ttl = 0;
- spin_unlock(&session->s_cap_lock);
- send_renew_caps(mdsc, session);
- break;
-
- default:
- derr(0, "bad session op %d from mds%d\n", op, mds);
- WARN_ON(1);
- }
-
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
- return;
-
-bad:
- derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
- (int)msg->front.iov_len, (int)sizeof(*h));
- return;
-}
-
/*
* create an mds request and message.
@@ -1136,27 +985,20 @@ static void __prepare_send_request(struct ceph_mds_client *mdsc,
}
/*
- * Synchrously perform an mds request. Take care of all of the
- * session setup, forwarding, retry details.
+ * send request, or put it on the appropriate wait list.
*/
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *listener,
- struct ceph_mds_request *req)
+static int __do_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
{
struct ceph_mds_session *session = NULL;
- int err;
int mds = -1;
- int safe = 0;
+ int err = -EAGAIN;
- dout(30, "do_request on %p\n", req);
+ if (req->r_reply)
+ goto out;
- mutex_lock(&mdsc->mutex);
- __register_request(mdsc, listener, req);
-retry:
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
- if (session && session->s_state == CEPH_MDS_SESSION_OPENING)
- unregister_session(mdsc, mds);
dout(10, "do_request timed out\n");
err = -EIO;
goto finish;
@@ -1166,82 +1008,146 @@ retry:
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
dout(30, "do_request no mds or not active, waiting for map\n");
- err = wait_for_new_map(mdsc, req->r_timeout);
- if (err)
- goto finish;
- goto retry;
+ list_add(&req->r_wait, &mdsc->waiting_for_map);
+ request_new_map(mdsc);
+ goto out;
}
- /* get session */
- session = __ceph_get_mds_session(mdsc, mds);
+ /* get, open session */
+ session = __ceph_lookup_mds_session(mdsc, mds);
if (!session)
session = register_session(mdsc, mds);
dout(30, "do_request mds%d session %p state %s\n", mds, session,
session_state_name(session->s_state));
-
- /* open? */
- err = 0;
- if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
- err = open_session(mdsc, session, req->r_timeout);
- if (session->s_state != CEPH_MDS_SESSION_OPEN ||
- err == -EAGAIN) {
- dout(30, "do_request session %p not open, state=%s\n",
- session, session_state_name(session->s_state));
- ceph_put_mds_session(session);
- goto retry;
+ if (session->s_state != CEPH_MDS_SESSION_OPEN) {
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+ list_add(&req->r_wait, &session->s_waiting);
+ request_new_map(mdsc);
+ goto out_session;
}
- BUG_ON(req->r_session);
- req->r_session = session; /* request now owns the session ref */
+ /* send request */
+ req->r_session = get_session(session);
req->r_resend_mds = -1; /* forget any previous mds hint */
if (req->r_request_started == 0) /* note request start time */
req->r_request_started = jiffies;
__prepare_send_request(mdsc, req);
- mutex_unlock(&mdsc->mutex);
+ mutex_unlock(&mdsc->mutex);
ceph_msg_get(req->r_request);
ceph_send_msg_mds(mdsc, req->r_request, mds);
+ mutex_lock(&mdsc->mutex);
- if (req->r_timeout) {
- err = wait_for_completion_timeout(&req->r_completion,
- req->r_timeout);
- if (err > 0)
- err = 0;
- else if (err == 0)
- err = -EIO; /* timed out */
- } else {
- err = 0;
- wait_for_completion(&req->r_completion);
+ err = 0;
+out_session:
+ ceph_put_mds_session(session);
+out:
+ return err;
+
+finish:
+ req->r_reply = ERR_PTR(err);
+ complete(&req->r_completion);
+ goto out;
+}
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+ struct list_head *head)
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe(p, n, head) {
+ struct ceph_mds_request *req =
+ list_entry(p, struct ceph_mds_request, r_wait);
+ list_del_init(&req->r_wait);
+ __do_request(mdsc, req);
}
+}
+
+/*
+ * Wake up threads with requests pending for @mds, so that they can
+ * resubmit their requests to a possibly different mds. If @all is set,
+ * wake up if their requests has been forwarded to @mds, too.
+ */
+static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+ struct ceph_mds_request *reqs[10];
+ u64 nexttid = 0;
+ int i, got;
+
+ dout(20, "kick_requests mds%d\n", mds);
+ while (nexttid < mdsc->last_tid) {
+ got = radix_tree_gang_lookup(&mdsc->request_tree,
+ (void **)&reqs, nexttid, 10);
+ if (got == 0)
+ break;
+ nexttid = reqs[got-1]->r_tid + 1;
+ for (i = 0; i < got; i++) {
+ if (!reqs[i]->r_got_unsafe &&
+ ((reqs[i]->r_session &&
+ reqs[i]->r_session->s_mds == mds) ||
+ (all && reqs[i]->r_fwd_session &&
+ reqs[i]->r_fwd_session->s_mds == mds))) {
+ dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
+ put_request_sessions(reqs[i]);
+ __do_request(mdsc, reqs[i]);
+ }
+ }
+ }
+}
+
+
+/*
+ * Synchrously perform an mds request. Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
+ struct ceph_mds_request *req)
+{
+ int err;
+ int safe = 0;
+
+ dout(30, "do_request on %p\n", req);
+
mutex_lock(&mdsc->mutex);
- if (req->r_reply == NULL && !err) {
- put_request_sessions(req);
- goto retry;
+ __register_request(mdsc, listener, req);
+ __do_request(mdsc, req);
+
+ if (!req->r_reply) {
+ mutex_unlock(&mdsc->mutex);
+ if (req->r_timeout) {
+ err = wait_for_completion_timeout(&req->r_completion,
+ req->r_timeout);
+ if (err > 0)
+ err = 0;
+ else if (err == 0)
+ req->r_reply = ERR_PTR(-EIO);
+ } else {
+ wait_for_completion(&req->r_completion);
+ }
+ mutex_lock(&mdsc->mutex);
}
+
if (IS_ERR(req->r_reply)) {
err = PTR_ERR(req->r_reply);
req->r_reply = NULL;
- }
- if (!err)
- /* all is well, reply has been parsed. */
+ safe = 1;
+ } else {
err = le32_to_cpu(req->r_reply_info.head->result);
- if (req)
safe = req->r_reply_info.head->safe;
-finish:
- if (safe) {
- complete(&req->r_safe_completion);
- __unregister_request(mdsc, req);
}
- mutex_unlock(&mdsc->mutex);
-
if (safe) {
+ complete(&req->r_safe_completion);
+ __unregister_request(mdsc, req);
ceph_msg_put(req->r_request);
req->r_request = NULL;
}
+ mutex_unlock(&mdsc->mutex);
dout(30, "do_request %p done, result %d\n", req, err);
return err;
@@ -1313,7 +1219,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
if (req->r_session && req->r_session->s_mds != mds) {
ceph_put_mds_session(req->r_session);
- req->r_session = __ceph_get_mds_session(mdsc, mds);
+ req->r_session = __ceph_lookup_mds_session(mdsc, mds);
}
if (req->r_session == NULL) {
derr(1, "got reply on %llu, but no session for mds%d\n",
@@ -1433,8 +1339,8 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
from_mds);
req->r_num_fwd = fwd_seq;
put_request_sessions(req);
- req->r_session = __ceph_get_mds_session(mdsc, next_mds);
- req->r_fwd_session = __ceph_get_mds_session(mdsc, from_mds);
+ req->r_session = __ceph_lookup_mds_session(mdsc, next_mds);
+ req->r_fwd_session = __ceph_lookup_mds_session(mdsc, from_mds);
} else {
/* no, resend. */
/* forward race not possible; mds would drop */
@@ -1442,7 +1348,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
put_request_sessions(req);
- complete(&req->r_completion); /* wake up do_request */
+ __do_request(mdsc, req);
}
ceph_mdsc_put_request(req);
out:
@@ -1453,6 +1359,100 @@ bad:
derr(0, "problem decoding message, err=%d\n", err);
}
+/*
+ * handle a mds session control message
+ */
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ u32 op;
+ u64 seq;
+ struct ceph_mds_session *session = NULL;
+ int mds;
+ struct ceph_mds_session_head *h = msg->front.iov_base;
+ int wake = 0;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+ return;
+ mds = le32_to_cpu(msg->hdr.src.name.num);
+
+ /* decode */
+ if (msg->front.iov_len != sizeof(*h))
+ goto bad;
+ op = le32_to_cpu(h->op);
+ seq = le64_to_cpu(h->seq);
+
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ if (session && mdsc->mdsmap)
+ /* FIXME: this ttl calculation is generous */
+ session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
+ mutex_unlock(&mdsc->mutex);
+
+ if (!session) {
+ if (op != CEPH_SESSION_OPEN) {
+ dout(10, "handle_session no session for mds%d\n", mds);
+ return;
+ }
+ dout(10, "handle_session creating session for mds%d\n", mds);
+ session = register_session(mdsc, mds);
+ }
+
+ mutex_lock(&session->s_mutex);
+
+ dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+ mds, ceph_session_op_name(op), session,
+ session_state_name(session->s_state), seq);
+ switch (op) {
+ case CEPH_SESSION_OPEN:
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ renewed_caps(mdsc, session, 0);
+ wake = 1;
+ if (mdsc->stopping)
+ __close_session(mdsc, session);
+ break;
+
+ case CEPH_SESSION_RENEWCAPS:
+ renewed_caps(mdsc, session, 1);
+ break;
+
+ case CEPH_SESSION_CLOSE:
+ unregister_session(mdsc, mds);
+ remove_session_caps(session);
+ wake = 1; /* for good measure */
+ complete(&mdsc->session_close_waiters);
+ kick_requests(mdsc, mds, 0); /* cur only */
+ break;
+
+ case CEPH_SESSION_STALE:
+ dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
+ spin_lock(&session->s_cap_lock);
+ session->s_cap_gen++;
+ session->s_cap_ttl = 0;
+ spin_unlock(&session->s_cap_lock);
+ send_renew_caps(mdsc, session);
+ break;
+
+ default:
+ derr(0, "bad session op %d from mds%d\n", op, mds);
+ WARN_ON(1);
+ }
+
+ mutex_unlock(&session->s_mutex);
+ if (wake) {
+ mutex_lock(&mdsc->mutex);
+ __wake_requests(mdsc, &session->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+ }
+ ceph_put_mds_session(session);
+ return;
+
+bad:
+ derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
+ (int)msg->front.iov_len, (int)sizeof(*h));
+ return;
+}
+
/*
* called under session->mutex.
@@ -1507,7 +1507,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
dout(1, "reconnect to recovering mds%d\n", mds);
/* find session */
- session = __ceph_get_mds_session(mdsc, mds);
+ session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex); /* drop lock for duration */
if (session) {
@@ -1654,7 +1654,7 @@ send:
if (session) {
session->s_state = CEPH_MDS_SESSION_OPEN;
- complete(&session->s_completion);
+ __wake_requests(mdsc, &session->s_waiting);
}
out:
@@ -1730,7 +1730,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
/* the session never opened, just close it
* out now */
- complete(&s->s_completion);
+ __wake_requests(mdsc, &s->s_waiting);
unregister_session(mdsc, i);
}
@@ -1792,7 +1792,7 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
/* find session */
mutex_lock(&mdsc->mutex);
- session = __ceph_get_mds_session(mdsc, mds);
+ session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
derr(0, "WTF, got lease but no session for mds%d\n", mds);
@@ -1946,13 +1946,14 @@ static void delayed_work(struct work_struct *work)
mdsc->last_renew_caps = jiffies;
for (i = 0; i < mdsc->max_sessions; i++) {
- struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i);
+ struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
if (s == NULL)
continue;
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
dout(10, "resending session close request for mds%d\n",
s->s_mds);
request_close_session(mdsc, s);
+ ceph_put_mds_session(s);
continue;
}
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
@@ -1990,8 +1991,8 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
mdsc->client = client;
mutex_init(&mdsc->mutex);
mdsc->mdsmap = NULL; /* none yet */
- init_completion(&mdsc->map_waiters);
init_completion(&mdsc->session_close_waiters);
+ INIT_LIST_HEAD(&mdsc->waiting_for_map);
mdsc->sessions = NULL;
mdsc->max_sessions = 0;
mdsc->stopping = 0;
@@ -2019,7 +2020,7 @@ static void drop_leases(struct ceph_mds_client *mdsc)
dout(10, "drop_leases\n");
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
- struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i);
+ struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
mutex_unlock(&mdsc->mutex);
@@ -2049,7 +2050,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
struct ceph_mds_session *session;
int i;
int n;
- unsigned long started, timeout = 60 * HZ;
+ unsigned long started, timeout = CEPH_MOUNT_TIMEOUT;
struct ceph_client *client = mdsc->client;
dout(10, "close_sessions\n");
@@ -2085,7 +2086,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
dout(10, "closing sessions\n");
n = 0;
for (i = 0; i < mdsc->max_sessions; i++) {
- session = __ceph_get_mds_session(mdsc, i);
+ session = __ceph_lookup_mds_session(mdsc, i);
if (!session)
continue;
mutex_unlock(&mdsc->mutex);
@@ -2109,6 +2110,21 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_lock(&mdsc->mutex);
}
+ /* tear down remaining sessions */
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ if (mdsc->sessions[i]) {
+ session = get_session(mdsc->sessions[i]);
+ unregister_session(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&session->s_mutex);
+ remove_session_caps(session);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ mutex_lock(&mdsc->mutex);
+ }
+ }
+
+
WARN_ON(!list_empty(&mdsc->cap_delay_list));
mutex_unlock(&mdsc->mutex);
@@ -2197,9 +2213,10 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
mdsc->mdsmap = newmap; /* first mds map */
}
+ __wake_requests(mdsc, &mdsc->waiting_for_map);
+
mutex_unlock(&mdsc->mutex);
schedule_delayed(mdsc);
- complete(&mdsc->map_waiters);
return;
bad_unlock:
diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h
index 7f93791fe73..b65bb7a8506 100644
--- a/src/kernel/mds_client.h
+++ b/src/kernel/mds_client.h
@@ -120,7 +120,7 @@ struct ceph_mds_session {
struct list_head s_rdcaps; /* just the readonly caps */
int s_nr_caps;
atomic_t s_ref;
- struct completion s_completion;
+ struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
};
@@ -149,6 +149,8 @@ struct ceph_mds_request {
unsigned long r_request_started; /* start time for mds request only,
used to measure lease durations */
+ struct list_head r_wait;
+
/* for choosing which mds to send this request to */
struct dentry *r_direct_dentry;
int r_direct_mode;
@@ -189,7 +191,8 @@ struct ceph_mds_client {
struct mutex mutex; /* all nested structures */
struct ceph_mdsmap *mdsmap;
- struct completion map_waiters, session_close_waiters;
+ struct completion session_close_waiters;
+ struct list_head waiting_for_map;
struct ceph_mds_session **sessions; /* NULL for mds if no session */
int max_sessions; /* len of s_mds_sessions */
@@ -219,7 +222,7 @@ struct ceph_mds_client {
extern const char *ceph_mds_op_name(int op);
-extern struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *, int mds);
+extern struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
inline static struct ceph_mds_session *
ceph_get_mds_session(struct ceph_mds_session *s)
diff --git a/src/kernel/snap.c b/src/kernel/snap.c
index 89e94f8666b..888e5772d1b 100644
--- a/src/kernel/snap.c
+++ b/src/kernel/snap.c
@@ -718,7 +718,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
/* find session */
mutex_lock(&mdsc->mutex);
- session = __ceph_get_mds_session(mdsc, mds);
+ session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout(10, "WTF, got snap but no session for mds%d\n", mds);
diff --git a/src/kernel/super.c b/src/kernel/super.c
index 9560c004184..5e2d4415d3f 100644
--- a/src/kernel/super.c
+++ b/src/kernel/super.c
@@ -1050,6 +1050,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
return 0;
out_splat:
+ ceph_mdsc_close_sessions(&client->mdsc);
up_write(&sb->s_umount);
deactivate_super(sb);
goto out_final;
diff --git a/src/kernel/super.h b/src/kernel/super.h
index 8251811dd7f..136ed6e183d 100644
--- a/src/kernel/super.h
+++ b/src/kernel/super.h
@@ -26,6 +26,9 @@
#define CEPH_BLOCK_SHIFT 20 /* 1 MB */
#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT)
+#define CEPH_MOUNT_TIMEOUT (60*HZ)
+#define CEPH_CAP_DELAY (5*HZ) /* cap release delay */
+
/*
* subtract jiffies
*/
@@ -131,7 +134,7 @@ static inline struct ceph_client *ceph_client(struct super_block *sb)
* capabilities.
*
* Each cap is referenced by the inode's i_caps tree and by a per-mds
- * session capability list.
+ * session capability list(s).
*/
struct ceph_cap {
struct ceph_inode_info *ci;