summaryrefslogtreecommitdiff
path: root/net/ceph/mon_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r--net/ceph/mon_client.c393
1 files changed, 230 insertions, 163 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index cf638c009cfa..37c38a7fb5c5 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -260,20 +260,26 @@ static void __send_subscribe(struct ceph_mon_client *monc)
BUG_ON(num < 1); /* monmap sub is always there */
ceph_encode_32(&p, num);
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
- const char *s = ceph_sub_str[i];
+ char buf[32];
+ int len;
if (!monc->subs[i].want)
continue;
- dout("%s %s start %llu flags 0x%x\n", __func__, s,
+ len = sprintf(buf, "%s", ceph_sub_str[i]);
+ if (i == CEPH_SUB_MDSMAP &&
+ monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
+ len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
+
+ dout("%s %s start %llu flags 0x%x\n", __func__, buf,
le64_to_cpu(monc->subs[i].item.start),
monc->subs[i].item.flags);
- ceph_encode_string(&p, end, s, strlen(s));
+ ceph_encode_string(&p, end, buf, len);
memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
p += sizeof(monc->subs[i].item);
}
- BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+ BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
ceph_msg_revoke(msg);
@@ -376,19 +382,13 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
}
EXPORT_SYMBOL(ceph_monc_got_map);
-/*
- * Register interest in the next osdmap
- */
-void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
+void ceph_monc_renew_subs(struct ceph_mon_client *monc)
{
- dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
mutex_lock(&monc->mutex);
- if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
- monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
- __send_subscribe(monc);
+ __send_subscribe(monc);
mutex_unlock(&monc->mutex);
}
-EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
+EXPORT_SYMBOL(ceph_monc_renew_subs);
/*
* Wait for an osdmap with a given epoch.
@@ -478,51 +478,17 @@ out:
/*
* generic requests (currently statfs, mon_get_version)
*/
-static struct ceph_mon_generic_request *__lookup_generic_req(
- struct ceph_mon_client *monc, u64 tid)
-{
- struct ceph_mon_generic_request *req;
- struct rb_node *n = monc->generic_request_tree.rb_node;
-
- while (n) {
- req = rb_entry(n, struct ceph_mon_generic_request, node);
- if (tid < req->tid)
- n = n->rb_left;
- else if (tid > req->tid)
- n = n->rb_right;
- else
- return req;
- }
- return NULL;
-}
-
-static void __insert_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *new)
-{
- struct rb_node **p = &monc->generic_request_tree.rb_node;
- struct rb_node *parent = NULL;
- struct ceph_mon_generic_request *req = NULL;
-
- while (*p) {
- parent = *p;
- req = rb_entry(parent, struct ceph_mon_generic_request, node);
- if (new->tid < req->tid)
- p = &(*p)->rb_left;
- else if (new->tid > req->tid)
- p = &(*p)->rb_right;
- else
- BUG();
- }
-
- rb_link_node(&new->node, parent, p);
- rb_insert_color(&new->node, &monc->generic_request_tree);
-}
+DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
static void release_generic_request(struct kref *kref)
{
struct ceph_mon_generic_request *req =
container_of(kref, struct ceph_mon_generic_request, kref);
+ dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
+ req->reply);
+ WARN_ON(!RB_EMPTY_NODE(&req->node));
+
if (req->reply)
ceph_msg_put(req->reply);
if (req->request)
@@ -533,7 +499,8 @@ static void release_generic_request(struct kref *kref)
static void put_generic_request(struct ceph_mon_generic_request *req)
{
- kref_put(&req->kref, release_generic_request);
+ if (req)
+ kref_put(&req->kref, release_generic_request);
}
static void get_generic_request(struct ceph_mon_generic_request *req)
@@ -541,6 +508,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req)
kref_get(&req->kref);
}
+static struct ceph_mon_generic_request *
+alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
+{
+ struct ceph_mon_generic_request *req;
+
+ req = kzalloc(sizeof(*req), gfp);
+ if (!req)
+ return NULL;
+
+ req->monc = monc;
+ kref_init(&req->kref);
+ RB_CLEAR_NODE(&req->node);
+ init_completion(&req->completion);
+
+ dout("%s greq %p\n", __func__, req);
+ return req;
+}
+
+static void register_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+
+ WARN_ON(req->tid);
+
+ get_generic_request(req);
+ req->tid = ++monc->last_tid;
+ insert_generic_request(&monc->generic_request_tree, req);
+}
+
+static void send_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *req)
+{
+ WARN_ON(!req->tid);
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ req->request->hdr.tid = cpu_to_le64(req->tid);
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
+}
+
+static void __finish_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ erase_generic_request(&monc->generic_request_tree, req);
+
+ ceph_msg_revoke(req->request);
+ ceph_msg_revoke_incoming(req->reply);
+}
+
+static void finish_generic_request(struct ceph_mon_generic_request *req)
+{
+ __finish_generic_request(req);
+ put_generic_request(req);
+}
+
+static void complete_generic_request(struct ceph_mon_generic_request *req)
+{
+ if (req->complete_cb)
+ req->complete_cb(req);
+ else
+ complete_all(&req->completion);
+ put_generic_request(req);
+}
+
+void cancel_generic_request(struct ceph_mon_generic_request *req)
+{
+ struct ceph_mon_client *monc = req->monc;
+ struct ceph_mon_generic_request *lookup_req;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+
+ mutex_lock(&monc->mutex);
+ lookup_req = lookup_generic_request(&monc->generic_request_tree,
+ req->tid);
+ if (lookup_req) {
+ WARN_ON(lookup_req != req);
+ finish_generic_request(req);
+ }
+
+ mutex_unlock(&monc->mutex);
+}
+
+static int wait_generic_request(struct ceph_mon_generic_request *req)
+{
+ int ret;
+
+ dout("%s greq %p tid %llu\n", __func__, req, req->tid);
+ ret = wait_for_completion_interruptible(&req->completion);
+ if (ret)
+ cancel_generic_request(req);
+ else
+ ret = req->result; /* completed */
+
+ return ret;
+}
+
static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
@@ -551,7 +615,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
struct ceph_msg *m;
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, tid);
+ req = lookup_generic_request(&monc->generic_request_tree, tid);
if (!req) {
dout("get_generic_reply %lld dne\n", tid);
*skip = 1;
@@ -570,42 +634,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
-static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
- struct ceph_mon_generic_request *req)
-{
- int err;
-
- /* register request */
- req->tid = tid != 0 ? tid : ++monc->last_tid;
- req->request->hdr.tid = cpu_to_le64(req->tid);
- __insert_generic_request(monc, req);
- monc->num_generic_requests++;
- ceph_con_send(&monc->con, ceph_msg_get(req->request));
- mutex_unlock(&monc->mutex);
-
- err = wait_for_completion_interruptible(&req->completion);
-
- mutex_lock(&monc->mutex);
- rb_erase(&req->node, &monc->generic_request_tree);
- monc->num_generic_requests--;
-
- if (!err)
- err = req->result;
- return err;
-}
-
-static int do_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *req)
-{
- int err;
-
- mutex_lock(&monc->mutex);
- err = __do_generic_request(monc, 0, req);
- mutex_unlock(&monc->mutex);
-
- return err;
-}
-
/*
* statfs
*/
@@ -616,22 +644,24 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);
+ dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
if (msg->front.iov_len != sizeof(*reply))
goto bad;
- dout("handle_statfs_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, tid);
- if (req) {
- *(struct ceph_statfs *)req->buf = reply->st;
- req->result = 0;
- get_generic_request(req);
+ req = lookup_generic_request(&monc->generic_request_tree, tid);
+ if (!req) {
+ mutex_unlock(&monc->mutex);
+ return;
}
+
+ req->result = 0;
+ *req->u.st = reply->st; /* struct */
+ __finish_generic_request(req);
mutex_unlock(&monc->mutex);
- if (req) {
- complete_all(&req->completion);
- put_generic_request(req);
- }
+
+ complete_generic_request(req);
return;
bad:
@@ -646,38 +676,38 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_statfs *h;
- int err;
+ int ret = -ENOMEM;
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = alloc_generic_request(monc, GFP_NOFS);
if (!req)
- return -ENOMEM;
-
- kref_init(&req->kref);
- req->buf = buf;
- init_completion(&req->completion);
+ goto out;
- err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
true);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
- true);
+
+ req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
if (!req->reply)
goto out;
+ req->u.st = buf;
+
+ mutex_lock(&monc->mutex);
+ register_generic_request(req);
/* fill out request */
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
+ send_generic_request(monc, req);
+ mutex_unlock(&monc->mutex);
- err = do_generic_request(monc, req);
-
+ ret = wait_generic_request(req);
out:
put_generic_request(req);
- return err;
+ return ret;
}
EXPORT_SYMBOL(ceph_monc_do_statfs);
@@ -690,7 +720,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
void *end = p + msg->front_alloc_len;
u64 handle;
- dout("%s %p tid %llu\n", __func__, msg, tid);
+ dout("%s msg %p tid %llu\n", __func__, msg, tid);
ceph_decode_need(&p, end, 2*sizeof(u64), bad);
handle = ceph_decode_64(&p);
@@ -698,77 +728,111 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
goto bad;
mutex_lock(&monc->mutex);
- req = __lookup_generic_req(monc, handle);
- if (req) {
- *(u64 *)req->buf = ceph_decode_64(&p);
- req->result = 0;
- get_generic_request(req);
+ req = lookup_generic_request(&monc->generic_request_tree, handle);
+ if (!req) {
+ mutex_unlock(&monc->mutex);
+ return;
}
+
+ req->result = 0;
+ req->u.newest = ceph_decode_64(&p);
+ __finish_generic_request(req);
mutex_unlock(&monc->mutex);
- if (req) {
- complete_all(&req->completion);
- put_generic_request(req);
- }
+ complete_generic_request(req);
return;
+
bad:
pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
-/*
- * Send MMonGetVersion and wait for the reply.
- *
- * @what: one of "mdsmap", "osdmap" or "monmap"
- */
-int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
- u64 *newest)
+static struct ceph_mon_generic_request *
+__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
+ ceph_monc_callback_t cb, u64 private_data)
{
struct ceph_mon_generic_request *req;
- void *p, *end;
- u64 tid;
- int err;
- req = kzalloc(sizeof(*req), GFP_NOFS);
+ req = alloc_generic_request(monc, GFP_NOIO);
if (!req)
- return -ENOMEM;
-
- kref_init(&req->kref);
- req->buf = newest;
- init_completion(&req->completion);
+ goto err_put_req;
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
sizeof(u64) + sizeof(u32) + strlen(what),
- GFP_NOFS, true);
- if (!req->request) {
- err = -ENOMEM;
- goto out;
- }
+ GFP_NOIO, true);
+ if (!req->request)
+ goto err_put_req;
- req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
- GFP_NOFS, true);
- if (!req->reply) {
- err = -ENOMEM;
- goto out;
- }
+ req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
+ true);
+ if (!req->reply)
+ goto err_put_req;
- p = req->request->front.iov_base;
- end = p + req->request->front_alloc_len;
+ req->complete_cb = cb;
+ req->private_data = private_data;
- /* fill out request */
mutex_lock(&monc->mutex);
- tid = ++monc->last_tid;
- ceph_encode_64(&p, tid); /* handle */
- ceph_encode_string(&p, end, what, strlen(what));
+ register_generic_request(req);
+ {
+ void *p = req->request->front.iov_base;
+ void *const end = p + req->request->front_alloc_len;
+
+ ceph_encode_64(&p, req->tid); /* handle */
+ ceph_encode_string(&p, end, what, strlen(what));
+ WARN_ON(p != end);
+ }
+ send_generic_request(monc, req);
+ mutex_unlock(&monc->mutex);
- err = __do_generic_request(monc, tid, req);
+ return req;
- mutex_unlock(&monc->mutex);
-out:
+err_put_req:
put_generic_request(req);
- return err;
+ return ERR_PTR(-ENOMEM);
+}
+
+/*
+ * Send MMonGetVersion and wait for the reply.
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
+ u64 *newest)
+{
+ struct ceph_mon_generic_request *req;
+ int ret;
+
+ req = __ceph_monc_get_version(monc, what, NULL, 0);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ ret = wait_generic_request(req);
+ if (!ret)
+ *newest = req->u.newest;
+
+ put_generic_request(req);
+ return ret;
}
-EXPORT_SYMBOL(ceph_monc_do_get_version);
+EXPORT_SYMBOL(ceph_monc_get_version);
+
+/*
+ * Send MMonGetVersion,
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
+ ceph_monc_callback_t cb, u64 private_data)
+{
+ struct ceph_mon_generic_request *req;
+
+ req = __ceph_monc_get_version(monc, what, cb, private_data);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ put_generic_request(req);
+ return 0;
+}
+EXPORT_SYMBOL(ceph_monc_get_version_async);
/*
* Resend pending generic requests.
@@ -890,7 +954,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
if (!monc->m_subscribe_ack)
goto out_auth;
- monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
+ monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS,
true);
if (!monc->m_subscribe)
goto out_subscribe_ack;
@@ -914,9 +978,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
monc->generic_request_tree = RB_ROOT;
- monc->num_generic_requests = 0;
monc->last_tid = 0;
+ monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
+
return 0;
out_auth_reply:
@@ -954,6 +1019,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
ceph_auth_destroy(monc->auth);
+ WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
+
ceph_msg_put(monc->m_auth);
ceph_msg_put(monc->m_auth_reply);
ceph_msg_put(monc->m_subscribe);