diff options
-rw-r--r-- | memcached.c | 76 | ||||
-rw-r--r-- | memcached.h | 31 | ||||
-rw-r--r-- | storage.c | 40 | ||||
-rw-r--r-- | storage.h | 12 | ||||
-rw-r--r-- | thread.c | 5 |
5 files changed, 103 insertions, 61 deletions
diff --git a/memcached.c b/memcached.c index 0c0445e..94597ea 100644 --- a/memcached.c +++ b/memcached.c @@ -531,25 +531,29 @@ void conn_worker_readd(conn *c) { drive_machine(c); return; } - c->state = conn_new_cmd; // If we had IO objects, process if (c->io_queued) { c->io_queued = false; - conn_set_state(c, conn_mwrite); + // state should be conn_io_queue already, so it will know how to + // dequeue and finalize the async work. drive_machine(c); + } else { + conn_set_state(c, conn_new_cmd); } } -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb) { +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb) { io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { q++; } q->type = type; q->ctx = ctx; - q->cb = cb; - q->free_cb = free_cb; + q->stack_ctx = NULL; + q->submit_cb = cb; + q->complete_cb = com_cb; + q->finalize_cb = fin_cb; return; } @@ -564,6 +568,23 @@ io_queue_t *conn_io_queue_get(conn *c, int type) { return NULL; } +// called after returning to the main worker thread. +// users of the queue need to distinguish if the IO was actually consumed or +// not and handle appropriately. +static void conn_io_queue_complete(conn *c) { + io_queue_t *q = c->io_queues; + while (q->type != IO_QUEUE_NONE) { + // Reuse the same submit stack. We zero it out first so callbacks can + // queue new IO's if necessary. + if (q->stack_ctx) { + void *tmp = q->stack_ctx; + q->stack_ctx = NULL; + q->complete_cb(q->ctx, tmp); + } + q++; + } +} + conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, @@ -747,21 +768,6 @@ void conn_release_items(conn *c) { c->item = 0; } - io_queue_t *q = c->io_queues; - while (q->type != IO_QUEUE_NONE) { - if (q->head_pending) { - io_pending_t *tmp = q->head_pending; - while (tmp) { - io_pending_t *next = tmp->next; - q->free_cb(q->ctx, tmp); - do_cache_free(c->thread->io_cache, tmp); // lockless - tmp = next; - } - q->head_pending = NULL; - } - q++; - } - // Cull any unsent responses. if (c->resp_head) { mc_resp *resp = c->resp_head; @@ -885,7 +891,8 @@ static const char *state_text(enum conn_states state) { "conn_closing", "conn_mwrite", "conn_closed", - "conn_watch" }; + "conn_watch", + "conn_io_queue" }; return statenames[state]; } @@ -1110,6 +1117,13 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) { if (resp->write_and_free) { free(resp->write_and_free); } + if (resp->io_pending) { + // If we had a pending IO, tell it to internally clean up then return + // the main object back to our thread cache. + resp->io_pending->q->finalize_cb(resp->io_pending); + do_cache_free(c->thread->io_cache, resp->io_pending); + resp->io_pending = NULL; + } if (c->resp_head == resp) { c->resp_head = next; } @@ -3159,16 +3173,21 @@ static void drive_machine(conn *c) { */ if (c->io_pending) { assert(c->io_queued == false); - // TODO: create proper state for this condition - conn_set_state(c, conn_watch); + conn_set_state(c, conn_io_queue); event_del(&c->event); c->io_queued = true; - io_queue_t *q = c->io_queues; + // TODO: write as for loop? + /*io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { - if (q->head_pending != NULL) { - q->cb(q->ctx, q->head_pending); + if (q->stack_ctx != NULL) { + q->submit_cb(q->ctx, q->stack_ctx); } q++; + }*/ + for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { + if (q->stack_ctx != NULL) { + q->submit_cb(q->ctx, q->stack_ctx); + } } stop = true; break; @@ -3217,6 +3236,11 @@ static void drive_machine(conn *c) { /* We handed off our connection to the logger thread. */ stop = true; break; + case conn_io_queue: + /* Complete our queued IO's from within the worker thread. */ + conn_io_queue_complete(c); + conn_set_state(c, conn_mwrite); + break; case conn_max_state: assert(false); break; diff --git a/memcached.h b/memcached.h index 73253c6..bce4f41 100644 --- a/memcached.h +++ b/memcached.h @@ -195,6 +195,7 @@ enum conn_states { conn_mwrite, /**< writing out many items sequentially */ conn_closed, /**< connection is closed */ conn_watch, /**< held by the logger thread as a watcher */ + conn_io_queue, /**< wait on async. process to get response object */ conn_max_state /**< Max state value (used for assertion) */ }; @@ -626,6 +627,7 @@ typedef struct { /** * Response objects */ +typedef struct _io_pending_t io_pending_t; #define MC_RESP_IOVCOUNT 4 typedef struct _mc_resp { mc_resp_bundle *bundle; // ptr back to bundle @@ -633,6 +635,7 @@ typedef struct _mc_resp { int wbytes; // bytes to write out of wbuf: might be able to nuke this. int tosend; // total bytes to send for this response void *write_and_free; /** free this memory after finishing writing */ + io_pending_t *io_pending; /* pending IO descriptor for this response */ item *item; /* item associated with this response object, with reference held */ struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */ @@ -668,23 +671,25 @@ typedef struct conn conn; #define IO_QUEUE_NONE 0 #define IO_QUEUE_EXTSTORE 1 -typedef struct _io_pending_t { - struct _io_pending_t *next; - conn *c; - mc_resp *resp; /* associated response object */ - char data[120]; -} io_pending_t; -typedef void (*io_queue_add_cb)(void *ctx, io_pending_t *pending); -typedef void (*io_queue_free_cb)(void *ctx, io_pending_t *pending); +typedef void (*io_queue_stack_cb)(void *ctx, void *stack); +typedef void (*io_queue_cb)(io_pending_t *pending); typedef struct { - io_pending_t *head_pending; - void *ctx; - io_queue_add_cb cb; - io_queue_free_cb free_cb; + void *ctx; // untouched ptr for specific context + void *stack_ctx; // module-specific context to be batch-submitted + io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. + io_queue_stack_cb complete_cb; + io_queue_cb finalize_cb; // called back on the worker thread. int type; } io_queue_t; +struct _io_pending_t { + io_queue_t *q; + conn *c; + mc_resp *resp; /* associated response object */ + char data[120]; +}; + /** * The structure representing a connection into memcached. */ @@ -807,7 +812,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb); +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb); io_queue_t *conn_io_queue_get(conn *c, int type); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); @@ -23,7 +23,7 @@ // re-cast an io_pending_t into this more descriptive structure. // the first few items _must_ match the original struct. typedef struct _io_pending_storage_t { - struct _io_pending_t *next; + io_queue_t *q; conn *c; mc_resp *resp; /* original struct ends here */ item *hdr_it; /* original header item. */ @@ -271,6 +271,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { // io_pending owns the reference for this object now. p->hdr_it = it; p->resp = resp; + p->q = q; // quicker access to the queue structure. obj_io *eio = &p->io_ctx; // FIXME: error handling. @@ -324,20 +325,17 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { resp_add_iov(resp, "", iovtotal); } + // We can't bail out anymore, so mc_resp owns the IO from here. + resp->io_pending = (io_pending_t *)p; + eio->buf = (void *)new_it; p->c = c; - // We need to stack the sub-struct IO's together as well. - if (q->head_pending) { - io_pending_storage_t *qh = (io_pending_storage_t *)q->head_pending; - eio->next = &qh->io_ctx; - } else { - eio->next = NULL; - } + // We need to stack the sub-struct IO's together for submission. + eio->next = q->stack_ctx; + q->stack_ctx = eio; - // IO queue for this connection. - p->next = q->head_pending; - q->head_pending = (io_pending_t *)p; + // No need to stack the io_pending's together as they live on mc_resp's. assert(c->io_pending >= 0); c->io_pending++; // reference ourselves for the callback. @@ -367,10 +365,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { return 0; } -void storage_submit_cb(void *ctx, io_pending_t *pending) { - // re-cast to our specific struct. - io_pending_storage_t *p = (io_pending_storage_t *)pending; - extstore_submit(ctx, &p->io_ctx); +void storage_submit_cb(void *ctx, void *stack) { + // Don't need to do anything special for extstore. + extstore_submit(ctx, stack); } static void recache_or_free(io_pending_t *pending) { @@ -438,15 +435,22 @@ static void recache_or_free(io_pending_t *pending) { p->io_ctx.buf = NULL; p->io_ctx.next = NULL; - p->next = NULL; p->active = false; // TODO: reuse lock and/or hv. item_remove(p->hdr_it); } -// TODO: io cache or embed obj_io in space within io_pending_t -void storage_free_cb(void *ctx, io_pending_t *pending) { +// Called after the IO is processed but before the response is transmitted. +// TODO: stubbed with a reminder: should be able to move most of the extstore +// callback code into this code instead, executing on worker thread instead of +// IO thread. +void storage_complete_cb(void *ctx, void *stack_ctx) { + return; +} + +// Called after responses have been transmitted. Need to free up related data. +void storage_finalize_cb(io_pending_t *pending) { recache_or_free(pending); io_pending_storage_t *p = (io_pending_storage_t *)pending; obj_io *io = &p->io_ctx; @@ -11,18 +11,26 @@ void storage_delete(void *e, item *it); #define STORAGE_delete(...) #endif +// API. void storage_stats(ADD_STAT add_stats, conn *c); void process_extstore_stats(ADD_STAT add_stats, conn *c); bool storage_validate_item(void *e, item *it); int storage_get_item(conn *c, item *it, mc_resp *resp); -void storage_submit_cb(void *ctx, io_pending_t *pending); -void storage_free_cb(void *ctx, io_pending_t *pending); + +// callbacks for the IO queue subsystem. +void storage_submit_cb(void *ctx, void *stack); +void storage_complete_cb(void *ctx, void *stack); +void storage_finalize_cb(io_pending_t *pending); + +// Thread functions. int start_storage_write_thread(void *arg); void storage_write_pause(void); void storage_write_resume(void); int start_storage_compact_thread(void *arg); void storage_compact_pause(void); void storage_compact_resume(void); + +// Init functions. struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size); void *storage_init_config(struct settings *s); int storage_read_config(void *conf, char **subopt); @@ -538,10 +538,11 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) c->thread = me; #ifdef EXTSTORE if (c->thread->storage) { - conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, storage_submit_cb, storage_free_cb); + conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, + storage_submit_cb, storage_complete_cb, storage_finalize_cb); } #endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL); + conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); #ifdef TLS if (settings.ssl_enabled && c->ssl != NULL) { |