summaryrefslogtreecommitdiff
path: root/memcached.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-08-02 16:25:43 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit3fc8775bf081f0cf84fe16058f834b951953c269 (patch)
tree46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /memcached.c
parent57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff)
downloadmemcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz
core: io_queue flow second attempt
probably squash into previous commit. io->c->thead can change for orpahned IO's, so we had to directly add the original worker thread as a reference. also tried again to split callbacks onto the thread and off of the connection for similar reasons; sometimes we just need the callbacks, sometimes we need both.
Diffstat (limited to 'memcached.c')
-rw-r--r--memcached.c78
1 files changed, 47 insertions, 31 deletions
diff --git a/memcached.c b/memcached.c
index 0daee12..a5cd0a6 100644
--- a/memcached.c
+++ b/memcached.c
@@ -558,14 +558,13 @@ void conn_worker_readd(conn *c) {
}
}
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
- io_queue_t *q = c->io_queues;
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
+ io_queue_cb_t *q = t->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
}
q->type = type;
q->ctx = ctx;
- q->stack_ctx = NULL;
q->submit_cb = cb;
q->complete_cb = com_cb;
q->finalize_cb = fin_cb;
@@ -573,6 +572,30 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu
return;
}
+void conn_io_queue_setup(conn *c) {
+ io_queue_cb_t *qcb = c->thread->io_queues;
+ io_queue_t *q = c->io_queues;
+ while (qcb->type != IO_QUEUE_NONE) {
+ q->type = qcb->type;
+ q->ctx = qcb->ctx;
+ q->stack_ctx = NULL;
+ q->count = 0;
+ qcb++;
+ q++;
+ }
+}
+
+io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type) {
+ io_queue_cb_t *q = t->io_queues;
+ while (q->type != IO_QUEUE_NONE) {
+ if (q->type == type) {
+ return q;
+ }
+ q++;
+ }
+ return NULL;
+}
+
io_queue_t *conn_io_queue_get(conn *c, int type) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
@@ -589,25 +612,20 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
// not and handle appropriately.
static void conn_io_queue_complete(conn *c) {
io_queue_t *q = c->io_queues;
+ io_queue_cb_t *qcb = c->thread->io_queues;
while (q->type != IO_QUEUE_NONE) {
if (q->stack_ctx) {
- q->complete_cb(q);
+ qcb->complete_cb(q);
}
+ qcb++;
q++;
}
}
// called to return a single IO object to the original worker thread.
void conn_io_queue_return(io_pending_t *io) {
- io_queue_t *q = io->q;
- int ret = q->return_cb(io);
- // An IO may or may not count against the pending responses.
- if (ret) {
- q->count--;
- if (q->count == 0) {
- conn_worker_readd(io->c);
- }
- }
+ io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type);
+ q->return_cb(io);
return;
}
@@ -1164,7 +1182,8 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) {
if (resp->io_pending) {
// If we had a pending IO, tell it to internally clean up then return
// the main object back to our thread cache.
- resp->io_pending->q->finalize_cb(resp->io_pending);
+ io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type);
+ qcb->finalize_cb(resp->io_pending);
do_cache_free(c->thread->io_cache, resp->io_pending);
resp->io_pending = NULL;
}
@@ -3230,25 +3249,22 @@ static void drive_machine(conn *c) {
* remove the connection from the worker thread and dispatch the
* IO queue
*/
- if (c->io_queues[0].type != IO_QUEUE_NONE) {
- assert(c->io_queues_submitted == 0);
- bool hit = false;
-
- for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
- if (q->count != 0) {
- assert(q->stack_ctx != NULL);
- hit = true;
- q->submit_cb(q);
- c->io_queues_submitted++;
- }
+ assert(c->io_queues_submitted == 0);
+
+ for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
+ if (q->count != 0) {
+ assert(q->stack_ctx != NULL);
+ io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
+ qcb->submit_cb(q);
+ c->io_queues_submitted++;
}
- if (hit) {
- conn_set_state(c, conn_io_queue);
- event_del(&c->event);
+ }
+ if (c->io_queues_submitted != 0) {
+ conn_set_state(c, conn_io_queue);
+ event_del(&c->event);
- stop = true;
- break;
- }
+ stop = true;
+ break;
}
switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {