diff options
-rw-r--r-- | memcached.c | 25 | ||||
-rw-r--r-- | memcached.h | 14 | ||||
-rw-r--r-- | storage.c | 11 | ||||
-rw-r--r-- | storage.h | 6 | ||||
-rw-r--r-- | thread.c | 32 |
5 files changed, 67 insertions, 21 deletions
diff --git a/memcached.c b/memcached.c index a029061..0daee12 100644 --- a/memcached.c +++ b/memcached.c @@ -558,7 +558,7 @@ void conn_worker_readd(conn *c) { } } -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb) { +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { q++; @@ -569,6 +569,7 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu q->submit_cb = cb; q->complete_cb = com_cb; q->finalize_cb = fin_cb; + q->return_cb = ret_cb; return; } @@ -589,17 +590,27 @@ io_queue_t *conn_io_queue_get(conn *c, int type) { static void conn_io_queue_complete(conn *c) { io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { - // Reuse the same submit stack. We zero it out first so callbacks can - // queue new IO's if necessary. if (q->stack_ctx) { - void *tmp = q->stack_ctx; - q->stack_ctx = NULL; - q->complete_cb(q->ctx, tmp); + q->complete_cb(q); } q++; } } +// called to return a single IO object to the original worker thread. +void conn_io_queue_return(io_pending_t *io) { + io_queue_t *q = io->q; + int ret = q->return_cb(io); + // An IO may or may not count against the pending responses. + if (ret) { + q->count--; + if (q->count == 0) { + conn_worker_readd(io->c); + } + } + return; +} + conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, @@ -3227,7 +3238,7 @@ static void drive_machine(conn *c) { if (q->count != 0) { assert(q->stack_ctx != NULL); hit = true; - q->submit_cb(q->ctx, q->stack_ctx); + q->submit_cb(q); c->io_queues_submitted++; } } diff --git a/memcached.h b/memcached.h index aab21cd..e8ed48b 100644 --- a/memcached.h +++ b/memcached.h @@ -697,8 +697,9 @@ typedef struct conn conn; #define IO_QUEUE_NONE 0 #define IO_QUEUE_EXTSTORE 1 -typedef void (*io_queue_stack_cb)(void *ctx, void *stack); -typedef void (*io_queue_cb)(io_pending_t *pending); +typedef struct io_queue_s io_queue_t; +typedef void (*io_queue_stack_cb)(io_queue_t *q); +typedef int (*io_queue_cb)(io_pending_t *pending); // this structure's ownership gets passed between threads: // - owned normally by the worker thread. // - multiple queues can be submitted at the same time. @@ -714,15 +715,16 @@ typedef void (*io_queue_cb)(io_pending_t *pending); // // All of this is to avoid having to hit a mutex owned by the connection // thread that gets pinged for each thread (or an equivalent atomic). -typedef struct { +struct io_queue_s { void *ctx; // untouched ptr for specific context void *stack_ctx; // module-specific context to be batch-submitted io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. io_queue_stack_cb complete_cb; + io_queue_cb return_cb; // called on worker thread. io_queue_cb finalize_cb; // called back on the worker thread. int type; int count; // ios to process before returning. only accessed by queue processor once submitted -} io_queue_t; +}; struct _io_pending_t { io_queue_t *q; @@ -859,8 +861,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb); +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); io_queue_t *conn_io_queue_get(conn *c, int type); +void conn_io_queue_return(io_pending_t *io); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); @@ -888,6 +891,7 @@ extern int daemonize(int nochdir, int noclose); void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); void timeout_conn(conn *c); +void return_io_pending(conn *c, io_pending_t *io); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); void sidethread_conn_close(conn *c); @@ -366,9 +366,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { return 0; } -void storage_submit_cb(void *ctx, void *stack) { +void storage_submit_cb(io_queue_t *q) { // Don't need to do anything special for extstore. - extstore_submit(ctx, stack); + extstore_submit(q->ctx, q->stack_ctx); } static void recache_or_free(io_pending_t *pending) { @@ -446,12 +446,14 @@ static void recache_or_free(io_pending_t *pending) { // TODO: stubbed with a reminder: should be able to move most of the extstore // callback code into this code instead, executing on worker thread instead of // IO thread. -void storage_complete_cb(void *ctx, void *stack_ctx) { +void storage_complete_cb(io_queue_t *q) { + // need to reset the stack for next use. + q->stack_ctx = NULL; return; } // Called after responses have been transmitted. Need to free up related data. -void storage_finalize_cb(io_pending_t *pending) { +int storage_finalize_cb(io_pending_t *pending) { recache_or_free(pending); io_pending_storage_t *p = (io_pending_storage_t *)pending; obj_io *io = &p->io_ctx; @@ -461,6 +463,7 @@ void storage_finalize_cb(io_pending_t *pending) { io->iov = NULL; } // don't need to free the main context, since it's embedded. + return 0; // return code ignored. } /* @@ -18,9 +18,9 @@ bool storage_validate_item(void *e, item *it); int storage_get_item(conn *c, item *it, mc_resp *resp); // callbacks for the IO queue subsystem. -void storage_submit_cb(void *ctx, void *stack); -void storage_complete_cb(void *ctx, void *stack); -void storage_finalize_cb(io_pending_t *pending); +void storage_submit_cb(io_queue_t *q); +void storage_complete_cb(io_queue_t *q); +int storage_finalize_cb(io_pending_t *pending); // Thread functions. int start_storage_write_thread(void *arg); @@ -35,6 +35,7 @@ enum conn_queue_item_modes { queue_timeout, /* socket sfd timed out */ queue_redispatch, /* return conn from side thread */ queue_stop, /* exit thread */ + queue_return_io, /* returning a pending IO object immediately */ }; typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { @@ -46,6 +47,7 @@ struct conn_queue_item { enum conn_queue_item_modes mode; conn *c; void *ssl; + io_pending_t *io; // IO when used for deferred IO handling. STAILQ_ENTRY(conn_queue_item) i_next; }; @@ -321,6 +323,12 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) { cache_free(cq->cache, item); } +// TODO: Skip notify if queue wasn't empty? +// - Requires cq_push() returning a "was empty" flag +// - Requires event handling loop to pop the entire queue and work from that +// instead of the ev_count work there now. +// In testing this does result in a large performance uptick, but unclear how +// much that will transfer from a synthetic benchmark. static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) { cq_push(t->ev_queue, item); #ifdef HAVE_EVENTFD @@ -558,10 +566,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) #ifdef EXTSTORE if (c->thread->storage) { conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, - storage_submit_cb, storage_complete_cb, storage_finalize_cb); + storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb); } #endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); + conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); #ifdef TLS if (settings.ssl_enabled && c->ssl != NULL) { @@ -587,6 +595,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) /* asked to stop */ event_base_loopexit(me->base, NULL); break; + case queue_return_io: + /* getting an individual IO object back */ + conn_io_queue_return(item->io); + break; } cqi_free(me->ev_queue, item); @@ -714,6 +726,22 @@ void timeout_conn(conn *c) { notify_worker_fd(c->thread, c->sfd, queue_timeout); } +void return_io_pending(conn *c, io_pending_t *io) { + CQ_ITEM *item = cqi_new(c->thread->ev_queue); + if (item == NULL) { + // TODO: how can we avoid this? + // In the main case I just loop, since a malloc failure here for a + // tiny object that's generally in a fixed size queue is going to + // implode shortly. + return; + } + + item->mode = queue_return_io; + item->io = io; + + notify_worker(c->thread, item); +} + /* This misses the allow_new_conns flag :( */ void sidethread_conn_close(conn *c) { if (settings.verbose > 1) |