summaryrefslogtreecommitdiff
path: root/memcached.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-11 13:46:16 -0800
committerdormando <dormando@rydia.net>2023-01-11 21:37:55 -0800
commite660658748b04f865852e77b0aad1fd8301cd5ec (patch)
tree21d4017a4858c44c240e10b4c9a4b136812d2e0e /memcached.c
parentfccf7b9efdfb0deb11f111496ce53c5892647dab (diff)
downloadmemcached-e660658748b04f865852e77b0aad1fd8301cd5ec.tar.gz
core: simplify background IO API
- removes unused "completed" IO callback handler - moves primary post-IO callback handlers from the queue definition to the actual IO objects. - allows IO object callbacks to be handled generically instead of based on the queue they were submitted from.
Diffstat (limited to 'memcached.c')
-rw-r--r--memcached.c31
1 files changed, 5 insertions, 26 deletions
diff --git a/memcached.c b/memcached.c
index 7df7e55..4589de8 100644
--- a/memcached.c
+++ b/memcached.c
@@ -569,7 +569,7 @@ void conn_worker_readd(conn *c) {
}
}
-void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb) {
io_queue_cb_t *q = t->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
@@ -577,9 +577,6 @@ void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack
q->type = type;
q->ctx = ctx;
q->submit_cb = cb;
- q->complete_cb = com_cb;
- q->finalize_cb = fin_cb;
- q->return_cb = ret_cb;
return;
}
@@ -626,26 +623,9 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
return NULL;
}
-// called after returning to the main worker thread.
-// users of the queue need to distinguish if the IO was actually consumed or
-// not and handle appropriately.
-static void conn_io_queue_complete(conn *c) {
- io_queue_t *q = c->io_queues;
- io_queue_cb_t *qcb = c->thread->io_queues;
- while (q->type != IO_QUEUE_NONE) {
- if (q->stack_ctx) {
- qcb->complete_cb(q);
- }
- qcb++;
- q++;
- }
-}
-
// called to return a single IO object to the original worker thread.
void conn_io_queue_return(io_pending_t *io) {
- io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type);
- q->return_cb(io);
- return;
+ io->return_cb(io);
}
conn *conn_new(const int sfd, enum conn_states init_state,
@@ -1215,11 +1195,11 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) {
free(resp->write_and_free);
}
if (resp->io_pending) {
+ io_pending_t *io = resp->io_pending;
// If we had a pending IO, tell it to internally clean up then return
// the main object back to our thread cache.
- io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type);
- qcb->finalize_cb(resp->io_pending);
- do_cache_free(c->thread->io_cache, resp->io_pending);
+ io->finalize_cb(io);
+ do_cache_free(c->thread->io_cache, io);
resp->io_pending = NULL;
}
if (c->resp_head == resp) {
@@ -3379,7 +3359,6 @@ static void drive_machine(conn *c) {
break;
case conn_io_queue:
/* Complete our queued IO's from within the worker thread. */
- conn_io_queue_complete(c);
conn_set_state(c, conn_mwrite);
break;
case conn_max_state: