diff options
author | dormando <dormando@rydia.net> | 2023-01-11 13:46:16 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-01-11 21:37:55 -0800 |
commit | e660658748b04f865852e77b0aad1fd8301cd5ec (patch) | |
tree | 21d4017a4858c44c240e10b4c9a4b136812d2e0e | |
parent | fccf7b9efdfb0deb11f111496ce53c5892647dab (diff) | |
download | memcached-e660658748b04f865852e77b0aad1fd8301cd5ec.tar.gz |
core: simplify background IO API
- removes unused "completed" IO callback handler
- moves primary post-IO callback handlers from the queue definition to
the actual IO objects.
- allows IO object callbacks to be handled generically instead of based
on the queue they were submitted from.
-rw-r--r-- | memcached.c | 31 | ||||
-rw-r--r-- | memcached.h | 28 | ||||
-rw-r--r-- | proto_proxy.c | 6 | ||||
-rw-r--r-- | proxy.h | 5 | ||||
-rw-r--r-- | proxy_await.c | 4 | ||||
-rw-r--r-- | storage.c | 49 | ||||
-rw-r--r-- | storage.h | 4 | ||||
-rw-r--r-- | thread.c | 7 |
8 files changed, 54 insertions, 80 deletions
diff --git a/memcached.c b/memcached.c index 7df7e55..4589de8 100644 --- a/memcached.c +++ b/memcached.c @@ -569,7 +569,7 @@ void conn_worker_readd(conn *c) { } } -void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb) { io_queue_cb_t *q = t->io_queues; while (q->type != IO_QUEUE_NONE) { q++; @@ -577,9 +577,6 @@ void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack q->type = type; q->ctx = ctx; q->submit_cb = cb; - q->complete_cb = com_cb; - q->finalize_cb = fin_cb; - q->return_cb = ret_cb; return; } @@ -626,26 +623,9 @@ io_queue_t *conn_io_queue_get(conn *c, int type) { return NULL; } -// called after returning to the main worker thread. -// users of the queue need to distinguish if the IO was actually consumed or -// not and handle appropriately. -static void conn_io_queue_complete(conn *c) { - io_queue_t *q = c->io_queues; - io_queue_cb_t *qcb = c->thread->io_queues; - while (q->type != IO_QUEUE_NONE) { - if (q->stack_ctx) { - qcb->complete_cb(q); - } - qcb++; - q++; - } -} - // called to return a single IO object to the original worker thread. void conn_io_queue_return(io_pending_t *io) { - io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type); - q->return_cb(io); - return; + io->return_cb(io); } conn *conn_new(const int sfd, enum conn_states init_state, @@ -1215,11 +1195,11 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) { free(resp->write_and_free); } if (resp->io_pending) { + io_pending_t *io = resp->io_pending; // If we had a pending IO, tell it to internally clean up then return // the main object back to our thread cache. - io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type); - qcb->finalize_cb(resp->io_pending); - do_cache_free(c->thread->io_cache, resp->io_pending); + io->finalize_cb(io); + do_cache_free(c->thread->io_cache, io); resp->io_pending = NULL; } if (c->resp_head == resp) { @@ -3379,7 +3359,6 @@ static void drive_machine(conn *c) { break; case conn_io_queue: /* Complete our queued IO's from within the worker thread. */ - conn_io_queue_complete(c); conn_set_state(c, conn_mwrite); break; case conn_max_state: diff --git a/memcached.h b/memcached.h index 94a2550..bdfa945 100644 --- a/memcached.h +++ b/memcached.h @@ -672,21 +672,12 @@ typedef struct _io_pending_t io_pending_t; typedef struct io_queue_s io_queue_t; typedef void (*io_queue_stack_cb)(io_queue_t *q); typedef void (*io_queue_cb)(io_pending_t *pending); -// this structure's ownership gets passed between threads: -// - owned normally by the worker thread. -// - multiple queues can be submitted at the same time. -// - each queue can be sent to different background threads. -// - each submitted queue needs to know when to return to the worker. -// - the worker needs to know when all queues have returned so it can process. -// -// io_queue_t's count field is owned by worker until submitted. Then owned by -// side thread until returned. -// conn->io_queues_submitted is always owned by the worker thread. it is -// incremented as the worker submits queues, and decremented as it gets pinged -// for returned threads. -// -// All of this is to avoid having to hit a mutex owned by the connection -// thread that gets pinged for each thread (or an equivalent atomic). +// This structure used to be passed between threads, but is now owned entirely +// by the worker threads. +// IO pending objects are created and stacked into this structure. They are +// then sent off to remote threads. +// The objects are returned one at a time to the worker threads, and this +// structure is then consulted to see when to resume the worker. struct io_queue_s { void *ctx; // duplicated from io_queue_cb_t void *stack_ctx; // module-specific context to be batch-submitted @@ -697,9 +688,6 @@ struct io_queue_s { typedef struct io_queue_cb_s { void *ctx; // untouched ptr for specific context io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. - io_queue_stack_cb complete_cb; - io_queue_cb return_cb; // called on worker thread. - io_queue_cb finalize_cb; // called back on the worker thread. int type; } io_queue_cb_t; @@ -789,6 +777,8 @@ struct _io_pending_t { LIBEVENT_THREAD *thread; conn *c; mc_resp *resp; // associated response object + io_queue_cb return_cb; // called on worker thread. + io_queue_cb finalize_cb; // called back on the worker thread. char data[120]; }; @@ -924,7 +914,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale); -void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb); void conn_io_queue_setup(conn *c); io_queue_t *conn_io_queue_get(conn *c, int type); io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type); diff --git a/proto_proxy.c b/proto_proxy.c index eb6f5e6..01b368d 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -289,10 +289,6 @@ void proxy_submit_cb(io_queue_t *q) { return; } -void proxy_complete_cb(io_queue_t *q) { - // empty/unused. -} - // called from worker thread after an individual IO has been returned back to // the worker thread. Do post-IO run and cleanup work. void proxy_return_cb(io_pending_t *pending) { @@ -892,6 +888,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) { p->client_resp = r; p->flushed = false; p->ascii_multiget = rq->ascii_multiget; + p->return_cb = proxy_return_cb; + p->finalize_cb = proxy_finalize_cb; resp->io_pending = (io_pending_t *)p; // top of the main thread should be our coroutine. @@ -409,7 +409,10 @@ struct _io_pending_proxy_t { int io_queue_type; LIBEVENT_THREAD *thread; conn *c; - mc_resp *resp; // original struct ends here + mc_resp *resp; + io_queue_cb return_cb; // called on worker thread. + io_queue_cb finalize_cb; // called back on the worker thread. + // original struct ends here struct _io_pending_proxy_t *next; // stack for IO submission STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends diff --git a/proxy_await.c b/proxy_await.c index 33e4cd3..582c3d5 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -147,6 +147,8 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw p->client_resp = r; p->flushed = false; p->ascii_multiget = rq->ascii_multiget; + p->return_cb = proxy_return_cb; + p->finalize_cb = proxy_finalize_cb; // io_p needs to hold onto its own response reference, because we may or // may not include it in the final await() result. @@ -201,6 +203,8 @@ static void mcp_queue_await_dummy_io(conn *c, lua_State *Lc, int await_ref) { p->is_await = true; p->await_ref = await_ref; p->await_background = true; + p->return_cb = proxy_return_cb; + p->finalize_cb = proxy_finalize_cb; // Dummy IO has no backend, and no request attached. @@ -19,6 +19,8 @@ /* * API functions */ +static void storage_finalize_cb(io_pending_t *pending); +static void storage_return_cb(io_pending_t *pending); // re-cast an io_pending_t into this more descriptive structure. // the first few items _must_ match the original struct. @@ -26,7 +28,10 @@ typedef struct _io_pending_storage_t { int io_queue_type; LIBEVENT_THREAD *thread; conn *c; - mc_resp *resp; /* original struct ends here */ + mc_resp *resp; + io_queue_cb return_cb; // called on worker thread. + io_queue_cb finalize_cb; // called back on the worker thread. + /* original struct ends here */ item *hdr_it; /* original header item. */ obj_io io_ctx; /* embedded extstore IO header */ unsigned int iovec_data; /* specific index of data iovec */ @@ -119,12 +124,10 @@ void storage_stats(ADD_STAT add_stats, conn *c) { } - -// FIXME: This runs in the IO thread. to get better IO performance this should -// simply mark the io wrapper with the return value and decrement wrapleft, if -// zero redispatching. Still a bit of work being done in the side thread but -// minimized at least. -// TODO: wrap -> p? +// This callback runs in the IO thread. +// TODO: Some or all of this should move to the +// io_pending's callback back in the worker thread. +// It might make sense to keep the crc32c check here though. static void _storage_get_item_cb(void *e, obj_io *io, int ret) { // FIXME: assumes success io_pending_storage_t *p = (io_pending_storage_t *)io->data; @@ -227,13 +230,7 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) { p->active = false; //assert(c->io_wrapleft >= 0); - // All IO's have returned, lets re-attach this connection to our original - // thread. - io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); - q->count--; - if (q->count == 0) { - redispatch_conn(c); - } + return_io_pending((io_pending_t *)p); } int storage_get_item(conn *c, item *it, mc_resp *resp) { @@ -271,6 +268,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { p->miss = false; p->badcrc = false; p->noreply = c->noreply; + p->thread = c->thread; + p->return_cb = storage_return_cb; + p->finalize_cb = storage_finalize_cb; // io_pending owns the reference for this object now. p->hdr_it = it; p->resp = resp; @@ -371,8 +371,12 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { void storage_submit_cb(io_queue_t *q) { // Don't need to do anything special for extstore. extstore_submit(q->ctx, q->stack_ctx); + + // need to reset the stack for next use. + q->stack_ctx = NULL; } +// Runs locally in worker thread. static void recache_or_free(io_pending_t *pending) { // re-cast to our specific struct. io_pending_storage_t *p = (io_pending_storage_t *)pending; @@ -446,18 +450,17 @@ static void recache_or_free(io_pending_t *pending) { item_remove(p->hdr_it); } -// Called after the IO is processed but before the response is transmitted. -// TODO: stubbed with a reminder: should be able to move most of the extstore -// callback code into this code instead, executing on worker thread instead of -// IO thread. -void storage_complete_cb(io_queue_t *q) { - // need to reset the stack for next use. - q->stack_ctx = NULL; - return; +// Called after an IO has been returned to the worker thread. +static void storage_return_cb(io_pending_t *pending) { + io_queue_t *q = conn_io_queue_get(pending->c, pending->io_queue_type); + q->count--; + if (q->count == 0) { + conn_worker_readd(pending->c); + } } // Called after responses have been transmitted. Need to free up related data. -void storage_finalize_cb(io_pending_t *pending) { +static void storage_finalize_cb(io_pending_t *pending) { recache_or_free(pending); io_pending_storage_t *p = (io_pending_storage_t *)pending; obj_io *io = &p->io_ctx; @@ -17,10 +17,8 @@ void process_extstore_stats(ADD_STAT add_stats, conn *c); bool storage_validate_item(void *e, item *it); int storage_get_item(conn *c, item *it, mc_resp *resp); -// callbacks for the IO queue subsystem. +// callback for the IO queue subsystem. void storage_submit_cb(io_queue_t *q); -void storage_complete_cb(io_queue_t *q); -void storage_finalize_cb(io_pending_t *pending); // Thread functions. int start_storage_write_thread(void *arg); @@ -482,12 +482,11 @@ static void setup_thread(LIBEVENT_THREAD *me) { // me->storage is set just before this function is called. if (me->storage) { thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage, - storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb); + storage_submit_cb); } #endif #ifdef PROXY - thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb, - proxy_complete_cb, proxy_return_cb, proxy_finalize_cb); + thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb); // TODO: maybe register hooks to be called here from sub-packages? ie; // extstore, TLS, proxy. @@ -495,7 +494,7 @@ static void setup_thread(LIBEVENT_THREAD *me) { proxy_thread_init(settings.proxy_ctx, me); } #endif - thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); + thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL); } /* |