diff options
author | dormando <dormando@rydia.net> | 2020-10-20 17:16:05 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2020-10-30 15:50:12 -0700 |
commit | 5d4785936e3e8937047daafd874c792668dc8528 (patch) | |
tree | b39b15e5369faea0c2cf9b13e2613a2ed13ecf7e /memcached.c | |
parent | 1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (diff) | |
download | memcached-5d4785936e3e8937047daafd874c792668dc8528.tar.gz |
queue: replace c->io_pending to avoid a mutex
since multiple queues can be sent to different sidethreads, we need a
new mechanism for knowing when to return everything. In the common case
only one queue will be active, so adding a mutex would be excessive.
Diffstat (limited to 'memcached.c')
-rw-r--r-- | memcached.c | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/memcached.c b/memcached.c index 94597ea..1c5fce0 100644 --- a/memcached.c +++ b/memcached.c @@ -517,6 +517,14 @@ void conn_close_idle(conn *c) { /* bring conn back from a sidethread. could have had its event base moved. */ void conn_worker_readd(conn *c) { + if (c->state == conn_io_queue) { + c->io_queues_submitted--; + // If we're still waiting for other queues to return, don't re-add the + // connection yet. + if (c->io_queues_submitted != 0) { + return; + } + } c->ev_flags = EV_READ | EV_PERSIST; event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c); event_base_set(c->thread->base, &c->event); @@ -530,13 +538,8 @@ void conn_worker_readd(conn *c) { if (c->state == conn_closing) { drive_machine(c); return; - } - - // If we had IO objects, process - if (c->io_queued) { - c->io_queued = false; - // state should be conn_io_queue already, so it will know how to - // dequeue and finalize the async work. + } else if (c->state == conn_io_queue) { + // machine will know how to return based on secondary state. drive_machine(c); } else { conn_set_state(c, conn_new_cmd); @@ -692,7 +695,7 @@ conn *conn_new(const int sfd, enum conn_states init_state, c->last_cmd_time = current_time; /* initialize for idle kicker */ // wipe all queues. memset(c->io_queues, 0, sizeof(c->io_queues)); - c->io_pending = 0; + c->io_queues_submitted = 0; c->item = 0; @@ -3171,26 +3174,25 @@ static void drive_machine(conn *c) { * remove the connection from the worker thread and dispatch the * IO queue */ - if (c->io_pending) { - assert(c->io_queued == false); - conn_set_state(c, conn_io_queue); - event_del(&c->event); - c->io_queued = true; - // TODO: write as for loop? - /*io_queue_t *q = c->io_queues; - while (q->type != IO_QUEUE_NONE) { - if (q->stack_ctx != NULL) { - q->submit_cb(q->ctx, q->stack_ctx); - } - q++; - }*/ + if (c->io_queues[0].type != IO_QUEUE_NONE) { + assert(c->io_queues_submitted == 0); + bool hit = false; + for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { - if (q->stack_ctx != NULL) { + if (q->count != 0) { + assert(q->stack_ctx != NULL); + hit = true; q->submit_cb(q->ctx, q->stack_ctx); + c->io_queues_submitted++; } } - stop = true; - break; + if (hit) { + conn_set_state(c, conn_io_queue); + event_del(&c->event); + + stop = true; + break; + } } switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { |