diff options
author | dormando <dormando@rydia.net> | 2020-10-20 17:16:05 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2020-10-30 15:50:12 -0700 |
commit | 5d4785936e3e8937047daafd874c792668dc8528 (patch) | |
tree | b39b15e5369faea0c2cf9b13e2613a2ed13ecf7e | |
parent | 1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (diff) | |
download | memcached-5d4785936e3e8937047daafd874c792668dc8528.tar.gz |
queue: replace c->io_pending to avoid a mutex
since multiple queues can be sent to different sidethreads, we need a
new mechanism for knowing when to return everything. In the common case
only one queue will be active, so adding a mutex would be excessive.
-rw-r--r-- | memcached.c | 50 | ||||
-rw-r--r-- | memcached.h | 21 | ||||
-rw-r--r-- | storage.c | 14 |
3 files changed, 50 insertions, 35 deletions
diff --git a/memcached.c b/memcached.c index 94597ea..1c5fce0 100644 --- a/memcached.c +++ b/memcached.c @@ -517,6 +517,14 @@ void conn_close_idle(conn *c) { /* bring conn back from a sidethread. could have had its event base moved. */ void conn_worker_readd(conn *c) { + if (c->state == conn_io_queue) { + c->io_queues_submitted--; + // If we're still waiting for other queues to return, don't re-add the + // connection yet. + if (c->io_queues_submitted != 0) { + return; + } + } c->ev_flags = EV_READ | EV_PERSIST; event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c); event_base_set(c->thread->base, &c->event); @@ -530,13 +538,8 @@ void conn_worker_readd(conn *c) { if (c->state == conn_closing) { drive_machine(c); return; - } - - // If we had IO objects, process - if (c->io_queued) { - c->io_queued = false; - // state should be conn_io_queue already, so it will know how to - // dequeue and finalize the async work. + } else if (c->state == conn_io_queue) { + // machine will know how to return based on secondary state. drive_machine(c); } else { conn_set_state(c, conn_new_cmd); @@ -692,7 +695,7 @@ conn *conn_new(const int sfd, enum conn_states init_state, c->last_cmd_time = current_time; /* initialize for idle kicker */ // wipe all queues. memset(c->io_queues, 0, sizeof(c->io_queues)); - c->io_pending = 0; + c->io_queues_submitted = 0; c->item = 0; @@ -3171,26 +3174,25 @@ static void drive_machine(conn *c) { * remove the connection from the worker thread and dispatch the * IO queue */ - if (c->io_pending) { - assert(c->io_queued == false); - conn_set_state(c, conn_io_queue); - event_del(&c->event); - c->io_queued = true; - // TODO: write as for loop? - /*io_queue_t *q = c->io_queues; - while (q->type != IO_QUEUE_NONE) { - if (q->stack_ctx != NULL) { - q->submit_cb(q->ctx, q->stack_ctx); - } - q++; - }*/ + if (c->io_queues[0].type != IO_QUEUE_NONE) { + assert(c->io_queues_submitted == 0); + bool hit = false; + for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) { - if (q->stack_ctx != NULL) { + if (q->count != 0) { + assert(q->stack_ctx != NULL); + hit = true; q->submit_cb(q->ctx, q->stack_ctx); + c->io_queues_submitted++; } } - stop = true; - break; + if (hit) { + conn_set_state(c, conn_io_queue); + event_del(&c->event); + + stop = true; + break; + } } switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { diff --git a/memcached.h b/memcached.h index bce4f41..2eb9ebb 100644 --- a/memcached.h +++ b/memcached.h @@ -674,6 +674,21 @@ typedef struct conn conn; typedef void (*io_queue_stack_cb)(void *ctx, void *stack); typedef void (*io_queue_cb)(io_pending_t *pending); +// this structure's ownership gets passed between threads: +// - owned normally by the worker thread. +// - multiple queues can be submitted at the same time. +// - each queue can be sent to different background threads. +// - each submitted queue needs to know when to return to the worker. +// - the worker needs to know when all queues have returned so it can process. +// +// io_queue_t's count field is owned by worker until submitted. Then owned by +// side thread until returned. +// conn->io_queues_submitted is always owned by the worker thread. it is +// incremented as the worker submits queues, and decremented as it gets pinged +// for returned threads. +// +// All of this is to avoid having to hit a mutex owned by the connection +// thread that gets pinged for each thread (or an equivalent atomic). typedef struct { void *ctx; // untouched ptr for specific context void *stack_ctx; // module-specific context to be batch-submitted @@ -681,12 +696,13 @@ typedef struct { io_queue_stack_cb complete_cb; io_queue_cb finalize_cb; // called back on the worker thread. int type; + int count; // ios to process before returning. only accessed by queue processor once submitted } io_queue_t; struct _io_pending_t { io_queue_t *q; conn *c; - mc_resp *resp; /* associated response object */ + mc_resp *resp; // associated response object char data[120]; }; @@ -735,9 +751,8 @@ struct conn { /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ - int io_pending; /* number of deferred IO requests */ + int io_queues_submitted; /* see notes on io_queue_t */ io_queue_t io_queues[3]; /* set of deferred IO queues. */ - bool io_queued; /* IO's were queued. */ #ifdef EXTSTORE unsigned int recache_counter; #endif @@ -220,14 +220,13 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) { p->miss = false; } - c->io_pending--; + p->q->count--; p->active = false; //assert(c->io_wrapleft >= 0); // All IO's have returned, lets re-attach this connection to our original // thread. - if (c->io_pending == 0) { - assert(c->io_queued == true); + if (p->q->count == 0) { redispatch_conn(c); } } @@ -256,7 +255,6 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { } if (new_it == NULL) return -1; - assert(!c->io_queued); // FIXME: debugging. // so we can free the chunk on a miss new_it->slabs_clsid = clsid; @@ -336,8 +334,8 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { q->stack_ctx = eio; // No need to stack the io_pending's together as they live on mc_resp's. - assert(c->io_pending >= 0); - c->io_pending++; + assert(q->count >= 0); + q->count++; // reference ourselves for the callback. eio->data = (void *)p; @@ -386,8 +384,8 @@ static void recache_or_free(io_pending_t *pending) { do_free = false; size_t ntotal = ITEM_ntotal(p->hdr_it); slabs_free(it, ntotal, slabs_clsid(ntotal)); - c->io_pending--; - assert(c->io_pending >= 0); + p->q->count--; + assert(p->q->count >= 0); pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.get_aborted_extstore++; pthread_mutex_unlock(&c->thread->stats.mutex); |