summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-11 13:46:16 -0800
committerdormando <dormando@rydia.net>2023-01-11 21:37:55 -0800
commite660658748b04f865852e77b0aad1fd8301cd5ec (patch)
tree21d4017a4858c44c240e10b4c9a4b136812d2e0e
parentfccf7b9efdfb0deb11f111496ce53c5892647dab (diff)
downloadmemcached-e660658748b04f865852e77b0aad1fd8301cd5ec.tar.gz
core: simplify background IO API
- removes unused "completed" IO callback handler - moves primary post-IO callback handlers from the queue definition to the actual IO objects. - allows IO object callbacks to be handled generically instead of based on the queue they were submitted from.
-rw-r--r--memcached.c31
-rw-r--r--memcached.h28
-rw-r--r--proto_proxy.c6
-rw-r--r--proxy.h5
-rw-r--r--proxy_await.c4
-rw-r--r--storage.c49
-rw-r--r--storage.h4
-rw-r--r--thread.c7
8 files changed, 54 insertions, 80 deletions
diff --git a/memcached.c b/memcached.c
index 7df7e55..4589de8 100644
--- a/memcached.c
+++ b/memcached.c
@@ -569,7 +569,7 @@ void conn_worker_readd(conn *c) {
}
}
-void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb) {
io_queue_cb_t *q = t->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
@@ -577,9 +577,6 @@ void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack
q->type = type;
q->ctx = ctx;
q->submit_cb = cb;
- q->complete_cb = com_cb;
- q->finalize_cb = fin_cb;
- q->return_cb = ret_cb;
return;
}
@@ -626,26 +623,9 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
return NULL;
}
-// called after returning to the main worker thread.
-// users of the queue need to distinguish if the IO was actually consumed or
-// not and handle appropriately.
-static void conn_io_queue_complete(conn *c) {
- io_queue_t *q = c->io_queues;
- io_queue_cb_t *qcb = c->thread->io_queues;
- while (q->type != IO_QUEUE_NONE) {
- if (q->stack_ctx) {
- qcb->complete_cb(q);
- }
- qcb++;
- q++;
- }
-}
-
// called to return a single IO object to the original worker thread.
void conn_io_queue_return(io_pending_t *io) {
- io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type);
- q->return_cb(io);
- return;
+ io->return_cb(io);
}
conn *conn_new(const int sfd, enum conn_states init_state,
@@ -1215,11 +1195,11 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) {
free(resp->write_and_free);
}
if (resp->io_pending) {
+ io_pending_t *io = resp->io_pending;
// If we had a pending IO, tell it to internally clean up then return
// the main object back to our thread cache.
- io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type);
- qcb->finalize_cb(resp->io_pending);
- do_cache_free(c->thread->io_cache, resp->io_pending);
+ io->finalize_cb(io);
+ do_cache_free(c->thread->io_cache, io);
resp->io_pending = NULL;
}
if (c->resp_head == resp) {
@@ -3379,7 +3359,6 @@ static void drive_machine(conn *c) {
break;
case conn_io_queue:
/* Complete our queued IO's from within the worker thread. */
- conn_io_queue_complete(c);
conn_set_state(c, conn_mwrite);
break;
case conn_max_state:
diff --git a/memcached.h b/memcached.h
index 94a2550..bdfa945 100644
--- a/memcached.h
+++ b/memcached.h
@@ -672,21 +672,12 @@ typedef struct _io_pending_t io_pending_t;
typedef struct io_queue_s io_queue_t;
typedef void (*io_queue_stack_cb)(io_queue_t *q);
typedef void (*io_queue_cb)(io_pending_t *pending);
-// this structure's ownership gets passed between threads:
-// - owned normally by the worker thread.
-// - multiple queues can be submitted at the same time.
-// - each queue can be sent to different background threads.
-// - each submitted queue needs to know when to return to the worker.
-// - the worker needs to know when all queues have returned so it can process.
-//
-// io_queue_t's count field is owned by worker until submitted. Then owned by
-// side thread until returned.
-// conn->io_queues_submitted is always owned by the worker thread. it is
-// incremented as the worker submits queues, and decremented as it gets pinged
-// for returned threads.
-//
-// All of this is to avoid having to hit a mutex owned by the connection
-// thread that gets pinged for each thread (or an equivalent atomic).
+// This structure used to be passed between threads, but is now owned entirely
+// by the worker threads.
+// IO pending objects are created and stacked into this structure. They are
+// then sent off to remote threads.
+// The objects are returned one at a time to the worker threads, and this
+// structure is then consulted to see when to resume the worker.
struct io_queue_s {
void *ctx; // duplicated from io_queue_cb_t
void *stack_ctx; // module-specific context to be batch-submitted
@@ -697,9 +688,6 @@ struct io_queue_s {
typedef struct io_queue_cb_s {
void *ctx; // untouched ptr for specific context
io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
- io_queue_stack_cb complete_cb;
- io_queue_cb return_cb; // called on worker thread.
- io_queue_cb finalize_cb; // called back on the worker thread.
int type;
} io_queue_cb_t;
@@ -789,6 +777,8 @@ struct _io_pending_t {
LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; // associated response object
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
char data[120];
};
@@ -924,7 +914,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale);
-void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb);
void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
diff --git a/proto_proxy.c b/proto_proxy.c
index eb6f5e6..01b368d 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -289,10 +289,6 @@ void proxy_submit_cb(io_queue_t *q) {
return;
}
-void proxy_complete_cb(io_queue_t *q) {
- // empty/unused.
-}
-
// called from worker thread after an individual IO has been returned back to
// the worker thread. Do post-IO run and cleanup work.
void proxy_return_cb(io_pending_t *pending) {
@@ -892,6 +888,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
p->client_resp = r;
p->flushed = false;
p->ascii_multiget = rq->ascii_multiget;
+ p->return_cb = proxy_return_cb;
+ p->finalize_cb = proxy_finalize_cb;
resp->io_pending = (io_pending_t *)p;
// top of the main thread should be our coroutine.
diff --git a/proxy.h b/proxy.h
index f82630b..dce74f5 100644
--- a/proxy.h
+++ b/proxy.h
@@ -409,7 +409,10 @@ struct _io_pending_proxy_t {
int io_queue_type;
LIBEVENT_THREAD *thread;
conn *c;
- mc_resp *resp; // original struct ends here
+ mc_resp *resp;
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
+ // original struct ends here
struct _io_pending_proxy_t *next; // stack for IO submission
STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
diff --git a/proxy_await.c b/proxy_await.c
index 33e4cd3..582c3d5 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -147,6 +147,8 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
p->client_resp = r;
p->flushed = false;
p->ascii_multiget = rq->ascii_multiget;
+ p->return_cb = proxy_return_cb;
+ p->finalize_cb = proxy_finalize_cb;
// io_p needs to hold onto its own response reference, because we may or
// may not include it in the final await() result.
@@ -201,6 +203,8 @@ static void mcp_queue_await_dummy_io(conn *c, lua_State *Lc, int await_ref) {
p->is_await = true;
p->await_ref = await_ref;
p->await_background = true;
+ p->return_cb = proxy_return_cb;
+ p->finalize_cb = proxy_finalize_cb;
// Dummy IO has no backend, and no request attached.
diff --git a/storage.c b/storage.c
index 257ff2f..d7ca9a1 100644
--- a/storage.c
+++ b/storage.c
@@ -19,6 +19,8 @@
/*
* API functions
*/
+static void storage_finalize_cb(io_pending_t *pending);
+static void storage_return_cb(io_pending_t *pending);
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
@@ -26,7 +28,10 @@ typedef struct _io_pending_storage_t {
int io_queue_type;
LIBEVENT_THREAD *thread;
conn *c;
- mc_resp *resp; /* original struct ends here */
+ mc_resp *resp;
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
+ /* original struct ends here */
item *hdr_it; /* original header item. */
obj_io io_ctx; /* embedded extstore IO header */
unsigned int iovec_data; /* specific index of data iovec */
@@ -119,12 +124,10 @@ void storage_stats(ADD_STAT add_stats, conn *c) {
}
-
-// FIXME: This runs in the IO thread. to get better IO performance this should
-// simply mark the io wrapper with the return value and decrement wrapleft, if
-// zero redispatching. Still a bit of work being done in the side thread but
-// minimized at least.
-// TODO: wrap -> p?
+// This callback runs in the IO thread.
+// TODO: Some or all of this should move to the
+// io_pending's callback back in the worker thread.
+// It might make sense to keep the crc32c check here though.
static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
// FIXME: assumes success
io_pending_storage_t *p = (io_pending_storage_t *)io->data;
@@ -227,13 +230,7 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
p->active = false;
//assert(c->io_wrapleft >= 0);
- // All IO's have returned, lets re-attach this connection to our original
- // thread.
- io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
- q->count--;
- if (q->count == 0) {
- redispatch_conn(c);
- }
+ return_io_pending((io_pending_t *)p);
}
int storage_get_item(conn *c, item *it, mc_resp *resp) {
@@ -271,6 +268,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
p->miss = false;
p->badcrc = false;
p->noreply = c->noreply;
+ p->thread = c->thread;
+ p->return_cb = storage_return_cb;
+ p->finalize_cb = storage_finalize_cb;
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
@@ -371,8 +371,12 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
void storage_submit_cb(io_queue_t *q) {
// Don't need to do anything special for extstore.
extstore_submit(q->ctx, q->stack_ctx);
+
+ // need to reset the stack for next use.
+ q->stack_ctx = NULL;
}
+// Runs locally in worker thread.
static void recache_or_free(io_pending_t *pending) {
// re-cast to our specific struct.
io_pending_storage_t *p = (io_pending_storage_t *)pending;
@@ -446,18 +450,17 @@ static void recache_or_free(io_pending_t *pending) {
item_remove(p->hdr_it);
}
-// Called after the IO is processed but before the response is transmitted.
-// TODO: stubbed with a reminder: should be able to move most of the extstore
-// callback code into this code instead, executing on worker thread instead of
-// IO thread.
-void storage_complete_cb(io_queue_t *q) {
- // need to reset the stack for next use.
- q->stack_ctx = NULL;
- return;
+// Called after an IO has been returned to the worker thread.
+static void storage_return_cb(io_pending_t *pending) {
+ io_queue_t *q = conn_io_queue_get(pending->c, pending->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
+ conn_worker_readd(pending->c);
+ }
}
// Called after responses have been transmitted. Need to free up related data.
-void storage_finalize_cb(io_pending_t *pending) {
+static void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
diff --git a/storage.h b/storage.h
index 55c5e1f..a0fc1a8 100644
--- a/storage.h
+++ b/storage.h
@@ -17,10 +17,8 @@ void process_extstore_stats(ADD_STAT add_stats, conn *c);
bool storage_validate_item(void *e, item *it);
int storage_get_item(conn *c, item *it, mc_resp *resp);
-// callbacks for the IO queue subsystem.
+// callback for the IO queue subsystem.
void storage_submit_cb(io_queue_t *q);
-void storage_complete_cb(io_queue_t *q);
-void storage_finalize_cb(io_pending_t *pending);
// Thread functions.
int start_storage_write_thread(void *arg);
diff --git a/thread.c b/thread.c
index f66aa8f..04aa8f4 100644
--- a/thread.c
+++ b/thread.c
@@ -482,12 +482,11 @@ static void setup_thread(LIBEVENT_THREAD *me) {
// me->storage is set just before this function is called.
if (me->storage) {
thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
- storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
+ storage_submit_cb);
}
#endif
#ifdef PROXY
- thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb,
- proxy_complete_cb, proxy_return_cb, proxy_finalize_cb);
+ thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb);
// TODO: maybe register hooks to be called here from sub-packages? ie;
// extstore, TLS, proxy.
@@ -495,7 +494,7 @@ static void setup_thread(LIBEVENT_THREAD *me) {
proxy_thread_init(settings.proxy_ctx, me);
}
#endif
- thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
+ thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL);
}
/*