diff options
author | dormando <dormando@rydia.net> | 2021-08-02 16:25:43 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | 3fc8775bf081f0cf84fe16058f834b951953c269 (patch) | |
tree | 46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /memcached.c | |
parent | 57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff) | |
download | memcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz |
core: io_queue flow second attempt
probably squash into previous commit.
io->c->thead can change for orpahned IO's, so we had to directly add the
original worker thread as a reference.
also tried again to split callbacks onto the thread and off of the
connection for similar reasons; sometimes we just need the callbacks,
sometimes we need both.
Diffstat (limited to 'memcached.c')
-rw-r--r-- | memcached.c | 78 |
1 files changed, 47 insertions, 31 deletions
diff --git a/memcached.c b/memcached.c index 0daee12..a5cd0a6 100644 --- a/memcached.c +++ b/memcached.c @@ -558,14 +558,13 @@ void conn_worker_readd(conn *c) { } } -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { - io_queue_t *q = c->io_queues; +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) { + io_queue_cb_t *q = t->io_queues; while (q->type != IO_QUEUE_NONE) { q++; } q->type = type; q->ctx = ctx; - q->stack_ctx = NULL; q->submit_cb = cb; q->complete_cb = com_cb; q->finalize_cb = fin_cb; @@ -573,6 +572,30 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu return; } +void conn_io_queue_setup(conn *c) { + io_queue_cb_t *qcb = c->thread->io_queues; + io_queue_t *q = c->io_queues; + while (qcb->type != IO_QUEUE_NONE) { + q->type = qcb->type; + q->ctx = qcb->ctx; + q->stack_ctx = NULL; + q->count = 0; + qcb++; + q++; + } +} + +io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type) { + io_queue_cb_t *q = t->io_queues; + while (q->type != IO_QUEUE_NONE) { + if (q->type == type) { + return q; + } + q++; + } + return NULL; +} + io_queue_t *conn_io_queue_get(conn *c, int type) { io_queue_t *q = c->io_queues; while (q->type != IO_QUEUE_NONE) { @@ -589,25 +612,20 @@ io_queue_t *conn_io_queue_get(conn *c, int type) { // not and handle appropriately. static void conn_io_queue_complete(conn *c) { io_queue_t *q = c->io_queues; + io_queue_cb_t *qcb = c->thread->io_queues; while (q->type != IO_QUEUE_NONE) { if (q->stack_ctx) { - q->complete_cb(q); + qcb->complete_cb(q); } + qcb++; q++; } } // called to return a single IO object to the original worker thread. void conn_io_queue_return(io_pending_t *io) { - io_queue_t *q = io->q; - int ret = q->return_cb(io); - // An IO may or may not count against the pending responses. - if (ret) { - q->count--; - if (q->count == 0) { - conn_worker_readd(io->c); - } - } + io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type); + q->return_cb(io); return; } @@ -1164,7 +1182,8 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) { if (resp->io_pending) { // If we had a pending IO, tell it to internally clean up then return // the main object back to our thread cache. - resp->io_pending->q->finalize_cb(resp->io_pending); + io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type); + qcb->finalize_cb(resp->io_pending); do_cache_free(c->thread->io_cache, resp->io_pending); resp->io_pending = NULL; } @@ -3230,25 +3249,22 @@ static void drive_machine(conn *c) { * remove the connection from the worker thread and dispatch the * IO queue */ - if (c->io_queues[0].type != IO_QUEUE_NONE) { - assert(c->io_queues_submitted == 0); - bool hit = false; - - for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { - if (q->count != 0) { - assert(q->stack_ctx != NULL); - hit = true; - q->submit_cb(q); - c->io_queues_submitted++; - } + assert(c->io_queues_submitted == 0); + + for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { + if (q->count != 0) { + assert(q->stack_ctx != NULL); + io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type); + qcb->submit_cb(q); + c->io_queues_submitted++; } - if (hit) { - conn_set_state(c, conn_io_queue); - event_del(&c->event); + } + if (c->io_queues_submitted != 0) { + conn_set_state(c, conn_io_queue); + event_del(&c->event); - stop = true; - break; - } + stop = true; + break; } switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { |