diff options
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r-- | net/ceph/mon_client.c | 393 |
1 files changed, 230 insertions, 163 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index cf638c009cfa..37c38a7fb5c5 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -260,20 +260,26 @@ static void __send_subscribe(struct ceph_mon_client *monc) BUG_ON(num < 1); /* monmap sub is always there */ ceph_encode_32(&p, num); for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { - const char *s = ceph_sub_str[i]; + char buf[32]; + int len; if (!monc->subs[i].want) continue; - dout("%s %s start %llu flags 0x%x\n", __func__, s, + len = sprintf(buf, "%s", ceph_sub_str[i]); + if (i == CEPH_SUB_MDSMAP && + monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) + len += sprintf(buf + len, ".%d", monc->fs_cluster_id); + + dout("%s %s start %llu flags 0x%x\n", __func__, buf, le64_to_cpu(monc->subs[i].item.start), monc->subs[i].item.flags); - ceph_encode_string(&p, end, s, strlen(s)); + ceph_encode_string(&p, end, buf, len); memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); p += sizeof(monc->subs[i].item); } - BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19)); + BUG_ON(p > end); msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); ceph_msg_revoke(msg); @@ -376,19 +382,13 @@ void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) } EXPORT_SYMBOL(ceph_monc_got_map); -/* - * Register interest in the next osdmap - */ -void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) +void ceph_monc_renew_subs(struct ceph_mon_client *monc) { - dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have); mutex_lock(&monc->mutex); - if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, - monc->subs[CEPH_SUB_OSDMAP].have + 1, false)) - __send_subscribe(monc); + __send_subscribe(monc); mutex_unlock(&monc->mutex); } -EXPORT_SYMBOL(ceph_monc_request_next_osdmap); +EXPORT_SYMBOL(ceph_monc_renew_subs); /* * Wait for an osdmap with a given epoch. @@ -478,51 +478,17 @@ out: /* * generic requests (currently statfs, mon_get_version) */ -static struct ceph_mon_generic_request *__lookup_generic_req( - struct ceph_mon_client *monc, u64 tid) -{ - struct ceph_mon_generic_request *req; - struct rb_node *n = monc->generic_request_tree.rb_node; - - while (n) { - req = rb_entry(n, struct ceph_mon_generic_request, node); - if (tid < req->tid) - n = n->rb_left; - else if (tid > req->tid) - n = n->rb_right; - else - return req; - } - return NULL; -} - -static void __insert_generic_request(struct ceph_mon_client *monc, - struct ceph_mon_generic_request *new) -{ - struct rb_node **p = &monc->generic_request_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_mon_generic_request *req = NULL; - - while (*p) { - parent = *p; - req = rb_entry(parent, struct ceph_mon_generic_request, node); - if (new->tid < req->tid) - p = &(*p)->rb_left; - else if (new->tid > req->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&new->node, parent, p); - rb_insert_color(&new->node, &monc->generic_request_tree); -} +DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) static void release_generic_request(struct kref *kref) { struct ceph_mon_generic_request *req = container_of(kref, struct ceph_mon_generic_request, kref); + dout("%s greq %p request %p reply %p\n", __func__, req, req->request, + req->reply); + WARN_ON(!RB_EMPTY_NODE(&req->node)); + if (req->reply) ceph_msg_put(req->reply); if (req->request) @@ -533,7 +499,8 @@ static void release_generic_request(struct kref *kref) static void put_generic_request(struct ceph_mon_generic_request *req) { - kref_put(&req->kref, release_generic_request); + if (req) + kref_put(&req->kref, release_generic_request); } static void get_generic_request(struct ceph_mon_generic_request *req) @@ -541,6 +508,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req) kref_get(&req->kref); } +static struct ceph_mon_generic_request * +alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) +{ + struct ceph_mon_generic_request *req; + + req = kzalloc(sizeof(*req), gfp); + if (!req) + return NULL; + + req->monc = monc; + kref_init(&req->kref); + RB_CLEAR_NODE(&req->node); + init_completion(&req->completion); + + dout("%s greq %p\n", __func__, req); + return req; +} + +static void register_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + WARN_ON(req->tid); + + get_generic_request(req); + req->tid = ++monc->last_tid; + insert_generic_request(&monc->generic_request_tree, req); +} + +static void send_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + WARN_ON(!req->tid); + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + req->request->hdr.tid = cpu_to_le64(req->tid); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); +} + +static void __finish_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + erase_generic_request(&monc->generic_request_tree, req); + + ceph_msg_revoke(req->request); + ceph_msg_revoke_incoming(req->reply); +} + +static void finish_generic_request(struct ceph_mon_generic_request *req) +{ + __finish_generic_request(req); + put_generic_request(req); +} + +static void complete_generic_request(struct ceph_mon_generic_request *req) +{ + if (req->complete_cb) + req->complete_cb(req); + else + complete_all(&req->completion); + put_generic_request(req); +} + +void cancel_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + struct ceph_mon_generic_request *lookup_req; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + + mutex_lock(&monc->mutex); + lookup_req = lookup_generic_request(&monc->generic_request_tree, + req->tid); + if (lookup_req) { + WARN_ON(lookup_req != req); + finish_generic_request(req); + } + + mutex_unlock(&monc->mutex); +} + +static int wait_generic_request(struct ceph_mon_generic_request *req) +{ + int ret; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + ret = wait_for_completion_interruptible(&req->completion); + if (ret) + cancel_generic_request(req); + else + ret = req->result; /* completed */ + + return ret; +} + static struct ceph_msg *get_generic_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip) @@ -551,7 +615,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, struct ceph_msg *m; mutex_lock(&monc->mutex); - req = __lookup_generic_req(monc, tid); + req = lookup_generic_request(&monc->generic_request_tree, tid); if (!req) { dout("get_generic_reply %lld dne\n", tid); *skip = 1; @@ -570,42 +634,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, return m; } -static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, - struct ceph_mon_generic_request *req) -{ - int err; - - /* register request */ - req->tid = tid != 0 ? tid : ++monc->last_tid; - req->request->hdr.tid = cpu_to_le64(req->tid); - __insert_generic_request(monc, req); - monc->num_generic_requests++; - ceph_con_send(&monc->con, ceph_msg_get(req->request)); - mutex_unlock(&monc->mutex); - - err = wait_for_completion_interruptible(&req->completion); - - mutex_lock(&monc->mutex); - rb_erase(&req->node, &monc->generic_request_tree); - monc->num_generic_requests--; - - if (!err) - err = req->result; - return err; -} - -static int do_generic_request(struct ceph_mon_client *monc, - struct ceph_mon_generic_request *req) -{ - int err; - - mutex_lock(&monc->mutex); - err = __do_generic_request(monc, 0, req); - mutex_unlock(&monc->mutex); - - return err; -} - /* * statfs */ @@ -616,22 +644,24 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, struct ceph_mon_statfs_reply *reply = msg->front.iov_base; u64 tid = le64_to_cpu(msg->hdr.tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); + if (msg->front.iov_len != sizeof(*reply)) goto bad; - dout("handle_statfs_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); - req = __lookup_generic_req(monc, tid); - if (req) { - *(struct ceph_statfs *)req->buf = reply->st; - req->result = 0; - get_generic_request(req); + req = lookup_generic_request(&monc->generic_request_tree, tid); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + *req->u.st = reply->st; /* struct */ + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + + complete_generic_request(req); return; bad: @@ -646,38 +676,38 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) { struct ceph_mon_generic_request *req; struct ceph_mon_statfs *h; - int err; + int ret = -ENOMEM; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOFS); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - req->buf = buf; - init_completion(&req->completion); + goto out; - err = -ENOMEM; req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, true); if (!req->request) goto out; - req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, - true); + + req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); if (!req->reply) goto out; + req->u.st = buf; + + mutex_lock(&monc->mutex); + register_generic_request(req); /* fill out request */ h = req->request->front.iov_base; h->monhdr.have_version = 0; h->monhdr.session_mon = cpu_to_le16(-1); h->monhdr.session_mon_tid = 0; h->fsid = monc->monmap->fsid; + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = do_generic_request(monc, req); - + ret = wait_generic_request(req); out: put_generic_request(req); - return err; + return ret; } EXPORT_SYMBOL(ceph_monc_do_statfs); @@ -690,7 +720,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, void *end = p + msg->front_alloc_len; u64 handle; - dout("%s %p tid %llu\n", __func__, msg, tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); ceph_decode_need(&p, end, 2*sizeof(u64), bad); handle = ceph_decode_64(&p); @@ -698,77 +728,111 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, goto bad; mutex_lock(&monc->mutex); - req = __lookup_generic_req(monc, handle); - if (req) { - *(u64 *)req->buf = ceph_decode_64(&p); - req->result = 0; - get_generic_request(req); + req = lookup_generic_request(&monc->generic_request_tree, handle); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + req->u.newest = ceph_decode_64(&p); + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + complete_generic_request(req); return; + bad: pr_err("corrupt mon_get_version reply, tid %llu\n", tid); ceph_msg_dump(msg); } -/* - * Send MMonGetVersion and wait for the reply. - * - * @what: one of "mdsmap", "osdmap" or "monmap" - */ -int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, - u64 *newest) +static struct ceph_mon_generic_request * +__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) { struct ceph_mon_generic_request *req; - void *p, *end; - u64 tid; - int err; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOIO); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - req->buf = newest; - init_completion(&req->completion); + goto err_put_req; req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, sizeof(u64) + sizeof(u32) + strlen(what), - GFP_NOFS, true); - if (!req->request) { - err = -ENOMEM; - goto out; - } + GFP_NOIO, true); + if (!req->request) + goto err_put_req; - req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, - GFP_NOFS, true); - if (!req->reply) { - err = -ENOMEM; - goto out; - } + req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, + true); + if (!req->reply) + goto err_put_req; - p = req->request->front.iov_base; - end = p + req->request->front_alloc_len; + req->complete_cb = cb; + req->private_data = private_data; - /* fill out request */ mutex_lock(&monc->mutex); - tid = ++monc->last_tid; - ceph_encode_64(&p, tid); /* handle */ - ceph_encode_string(&p, end, what, strlen(what)); + register_generic_request(req); + { + void *p = req->request->front.iov_base; + void *const end = p + req->request->front_alloc_len; + + ceph_encode_64(&p, req->tid); /* handle */ + ceph_encode_string(&p, end, what, strlen(what)); + WARN_ON(p != end); + } + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = __do_generic_request(monc, tid, req); + return req; - mutex_unlock(&monc->mutex); -out: +err_put_req: put_generic_request(req); - return err; + return ERR_PTR(-ENOMEM); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest) +{ + struct ceph_mon_generic_request *req; + int ret; + + req = __ceph_monc_get_version(monc, what, NULL, 0); + if (IS_ERR(req)) + return PTR_ERR(req); + + ret = wait_generic_request(req); + if (!ret) + *newest = req->u.newest; + + put_generic_request(req); + return ret; } -EXPORT_SYMBOL(ceph_monc_do_get_version); +EXPORT_SYMBOL(ceph_monc_get_version); + +/* + * Send MMonGetVersion, + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) +{ + struct ceph_mon_generic_request *req; + + req = __ceph_monc_get_version(monc, what, cb, private_data); + if (IS_ERR(req)) + return PTR_ERR(req); + + put_generic_request(req); + return 0; +} +EXPORT_SYMBOL(ceph_monc_get_version_async); /* * Resend pending generic requests. @@ -890,7 +954,7 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) if (!monc->m_subscribe_ack) goto out_auth; - monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, + monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS, true); if (!monc->m_subscribe) goto out_subscribe_ack; @@ -914,9 +978,10 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); monc->generic_request_tree = RB_ROOT; - monc->num_generic_requests = 0; monc->last_tid = 0; + monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; + return 0; out_auth_reply: @@ -954,6 +1019,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc) ceph_auth_destroy(monc->auth); + WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); + ceph_msg_put(monc->m_auth); ceph_msg_put(monc->m_auth_reply); ceph_msg_put(monc->m_subscribe); |