diff options
-rw-r--r-- | memcached.c | 78 | ||||
-rw-r--r-- | memcached.h | 86 | ||||
-rw-r--r-- | storage.c | 19 | ||||
-rw-r--r-- | storage.h | 2 | ||||
-rw-r--r-- | thread.c | 23 |
5 files changed, 120 insertions, 88 deletions
diff --git a/memcached.c b/memcached.c index 0daee12..a5cd0a6 100644 --- a/memcached.c +++ b/memcached.c @@ -558,14 +558,13 @@ void conn_worker_readd(conn *c) { } } -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { - io_queue_t *q = c->io_queues; +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { + io_queue_cb_t *q = t->io_queues; while (q->type != IO_QUEUE_NONE) { q++; } q->type = type; q->ctx = ctx; - q->stack_ctx = NULL; q->submit_cb = cb; q->complete_cb = com_cb; q->finalize_cb = fin_cb; @@ -573,6 +572,30 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu return; } +void conn_io_queue_setup(conn *c) { + io_queue_cb_t *qcb = c->thread->io_queues; + io_queue_t *q = c->io_queues; + while (qcb->type != IO_QUEUE_NONE) { + q->type = qcb->type; + q->ctx = qcb->ctx; + q->stack_ctx = NULL; + q->count = 0; + qcb++; + q++; + } +} + +io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type) { + io_queue_cb_t *q = t->io_queues; + while (q->type != IO_QUEUE_NONE) { + if (q->type == type) { + return q; + } + q++; + } + return NULL; +} + io_queue_t *conn_io_queue_get(conn *c, int type) { io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { @@ -589,25 +612,20 @@ io_queue_t *conn_io_queue_get(conn *c, int type) { // not and handle appropriately. static void conn_io_queue_complete(conn *c) { io_queue_t *q = c->io_queues; + io_queue_cb_t *qcb = c->thread->io_queues; while (q->type != IO_QUEUE_NONE) { if (q->stack_ctx) { - q->complete_cb(q); + qcb->complete_cb(q); } + qcb++; q++; } } // called to return a single IO object to the original worker thread. void conn_io_queue_return(io_pending_t *io) { - io_queue_t *q = io->q; - int ret = q->return_cb(io); - // An IO may or may not count against the pending responses. - if (ret) { - q->count--; - if (q->count == 0) { - conn_worker_readd(io->c); - } - } + io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type); + q->return_cb(io); return; } @@ -1164,7 +1182,8 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) { if (resp->io_pending) { // If we had a pending IO, tell it to internally clean up then return // the main object back to our thread cache. - resp->io_pending->q->finalize_cb(resp->io_pending); + io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type); + qcb->finalize_cb(resp->io_pending); do_cache_free(c->thread->io_cache, resp->io_pending); resp->io_pending = NULL; } @@ -3230,25 +3249,22 @@ static void drive_machine(conn *c) { * remove the connection from the worker thread and dispatch the * IO queue */ - if (c->io_queues[0].type != IO_QUEUE_NONE) { - assert(c->io_queues_submitted == 0); - bool hit = false; - - for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { - if (q->count != 0) { - assert(q->stack_ctx != NULL); - hit = true; - q->submit_cb(q); - c->io_queues_submitted++; - } + assert(c->io_queues_submitted == 0); + + for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { + if (q->count != 0) { + assert(q->stack_ctx != NULL); + io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type); + qcb->submit_cb(q); + c->io_queues_submitted++; } - if (hit) { - conn_set_state(c, conn_io_queue); - event_del(&c->event); + } + if (c->io_queues_submitted != 0) { + conn_set_state(c, conn_io_queue); + event_del(&c->event); - stop = true; - break; - } + stop = true; + break; } switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { diff --git a/memcached.h b/memcached.h index e8ed48b..7b06bb3 100644 --- a/memcached.h +++ b/memcached.h @@ -621,6 +621,47 @@ typedef struct { unsigned short page_id; /* from IO header */ } item_hdr; #endif + +#define IO_QUEUE_COUNT 3 + +#define IO_QUEUE_NONE 0 +#define IO_QUEUE_EXTSTORE 1 + +typedef struct _io_pending_t io_pending_t; +typedef struct io_queue_s io_queue_t; +typedef void (*io_queue_stack_cb)(io_queue_t *q); +typedef void (*io_queue_cb)(io_pending_t *pending); +// this structure's ownership gets passed between threads: +// - owned normally by the worker thread. +// - multiple queues can be submitted at the same time. +// - each queue can be sent to different background threads. +// - each submitted queue needs to know when to return to the worker. +// - the worker needs to know when all queues have returned so it can process. +// +// io_queue_t's count field is owned by worker until submitted. Then owned by +// side thread until returned. +// conn->io_queues_submitted is always owned by the worker thread. it is +// incremented as the worker submits queues, and decremented as it gets pinged +// for returned threads. +// +// All of this is to avoid having to hit a mutex owned by the connection +// thread that gets pinged for each thread (or an equivalent atomic). +struct io_queue_s { + void *ctx; // duplicated from io_queue_cb_t + void *stack_ctx; // module-specific context to be batch-submitted + int count; // ios to process before returning. only accessed by queue processor once submitted + int type; // duplicated from io_queue_cb_t +}; + +typedef struct io_queue_cb_s { + void *ctx; // untouched ptr for specific context + io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. + io_queue_stack_cb complete_cb; + io_queue_cb return_cb; // called on worker thread. + io_queue_cb finalize_cb; // called back on the worker thread. + int type; +} io_queue_cb_t; + typedef struct _mc_resp_bundle mc_resp_bundle; typedef struct { pthread_t thread_id; /* unique ID of this thread */ @@ -633,6 +674,7 @@ typedef struct { int notify_send_fd; /* sending end of notify pipe */ #endif struct thread_stats stats; /* Stats generated by this thread */ + io_queue_cb_t io_queues[IO_QUEUE_COUNT]; struct conn_queue *ev_queue; /* Worker/conn event queue */ cache_t *rbuf_cache; /* static-sized read buffers */ mc_resp_bundle *open_bundle; @@ -652,7 +694,6 @@ typedef struct { /** * Response objects */ -typedef struct _io_pending_t io_pending_t; #define MC_RESP_IOVCOUNT 4 typedef struct _mc_resp { mc_resp_bundle *bundle; // ptr back to bundle @@ -694,40 +735,9 @@ struct _mc_resp_bundle { typedef struct conn conn; -#define IO_QUEUE_NONE 0 -#define IO_QUEUE_EXTSTORE 1 - -typedef struct io_queue_s io_queue_t; -typedef void (*io_queue_stack_cb)(io_queue_t *q); -typedef int (*io_queue_cb)(io_pending_t *pending); -// this structure's ownership gets passed between threads: -// - owned normally by the worker thread. -// - multiple queues can be submitted at the same time. -// - each queue can be sent to different background threads. -// - each submitted queue needs to know when to return to the worker. -// - the worker needs to know when all queues have returned so it can process. -// -// io_queue_t's count field is owned by worker until submitted. Then owned by -// side thread until returned. -// conn->io_queues_submitted is always owned by the worker thread. it is -// incremented as the worker submits queues, and decremented as it gets pinged -// for returned threads. -// -// All of this is to avoid having to hit a mutex owned by the connection -// thread that gets pinged for each thread (or an equivalent atomic). -struct io_queue_s { - void *ctx; // untouched ptr for specific context - void *stack_ctx; // module-specific context to be batch-submitted - io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. - io_queue_stack_cb complete_cb; - io_queue_cb return_cb; // called on worker thread. - io_queue_cb finalize_cb; // called back on the worker thread. - int type; - int count; // ios to process before returning. only accessed by queue processor once submitted -}; - struct _io_pending_t { - io_queue_t *q; + int io_queue_type; // matches one of IO_QUEUE_* + LIBEVENT_THREAD *thread; conn *c; mc_resp *resp; // associated response object char data[120]; @@ -780,7 +790,7 @@ struct conn { int sbytes; /* how many bytes to swallow */ int io_queues_submitted; /* see notes on io_queue_t */ - io_queue_t io_queues[3]; /* set of deferred IO queues. */ + io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */ #ifdef EXTSTORE unsigned int recache_counter; #endif @@ -861,8 +871,10 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); +void conn_io_queue_setup(conn *c); io_queue_t *conn_io_queue_get(conn *c, int type); +io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type); void conn_io_queue_return(io_pending_t *io); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); @@ -891,7 +903,7 @@ extern int daemonize(int nochdir, int noclose); void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); void timeout_conn(conn *c); -void return_io_pending(conn *c, io_pending_t *io); +void return_io_pending(io_pending_t *io); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); void sidethread_conn_close(conn *c); @@ -23,7 +23,8 @@ // re-cast an io_pending_t into this more descriptive structure. // the first few items _must_ match the original struct. typedef struct _io_pending_storage_t { - io_queue_t *q; + int io_queue_type; + LIBEVENT_THREAD *thread; conn *c; mc_resp *resp; /* original struct ends here */ item *hdr_it; /* original header item. */ @@ -223,13 +224,14 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) { p->miss = false; } - p->q->count--; p->active = false; //assert(c->io_wrapleft >= 0); // All IO's have returned, lets re-attach this connection to our original // thread. - if (p->q->count == 0) { + io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); + q->count--; + if (q->count == 0) { redispatch_conn(c); } } @@ -272,7 +274,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { // io_pending owns the reference for this object now. p->hdr_it = it; p->resp = resp; - p->q = q; // quicker access to the queue structure. + p->io_queue_type = IO_QUEUE_EXTSTORE; obj_io *eio = &p->io_ctx; // FIXME: error handling. @@ -387,8 +389,10 @@ static void recache_or_free(io_pending_t *pending) { do_free = false; size_t ntotal = ITEM_ntotal(p->hdr_it); slabs_free(it, ntotal, slabs_clsid(ntotal)); - p->q->count--; - assert(p->q->count >= 0); + + io_queue_t *q = conn_io_queue_get(c, p->io_queue_type); + q->count--; + assert(q->count >= 0); pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.get_aborted_extstore++; pthread_mutex_unlock(&c->thread->stats.mutex); @@ -453,7 +457,7 @@ void storage_complete_cb(io_queue_t *q) { } // Called after responses have been transmitted. Need to free up related data. -int storage_finalize_cb(io_pending_t *pending) { +void storage_finalize_cb(io_pending_t *pending) { recache_or_free(pending); io_pending_storage_t *p = (io_pending_storage_t *)pending; obj_io *io = &p->io_ctx; @@ -463,7 +467,6 @@ int storage_finalize_cb(io_pending_t *pending) { io->iov = NULL; } // don't need to free the main context, since it's embedded. - return 0; // return code ignored. } /* @@ -20,7 +20,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp); // callbacks for the IO queue subsystem. void storage_submit_cb(io_queue_t *q); void storage_complete_cb(io_queue_t *q); -int storage_finalize_cb(io_pending_t *pending); +void storage_finalize_cb(io_pending_t *pending); // Thread functions. int start_storage_write_thread(void *arg); @@ -468,6 +468,14 @@ static void setup_thread(LIBEVENT_THREAD *me) { } } #endif +#ifdef EXTSTORE + // me->storage is set just before this function is called. + if (me->storage) { + thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage, + storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb); + } +#endif + thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); } /* @@ -563,14 +571,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) } } else { c->thread = me; -#ifdef EXTSTORE - if (c->thread->storage) { - conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, - storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb); - } -#endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); - + conn_io_queue_setup(c); #ifdef TLS if (settings.ssl_enabled && c->ssl != NULL) { assert(c->thread && c->thread->ssl_wbuf); @@ -726,8 +727,8 @@ void timeout_conn(conn *c) { notify_worker_fd(c->thread, c->sfd, queue_timeout); } -void return_io_pending(conn *c, io_pending_t *io) { - CQ_ITEM *item = cqi_new(c->thread->ev_queue); +void return_io_pending(io_pending_t *io) { + CQ_ITEM *item = cqi_new(io->thread->ev_queue); if (item == NULL) { // TODO: how can we avoid this? // In the main case I just loop, since a malloc failure here for a @@ -739,7 +740,7 @@ void return_io_pending(conn *c, io_pending_t *io) { item->mode = queue_return_io; item->io = io; - notify_worker(c->thread, item); + notify_worker(io->thread, item); } /* This misses the allow_new_conns flag :( */ |