summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2020-10-20 17:16:05 -0700
committerdormando <dormando@rydia.net>2020-10-30 15:50:12 -0700
commit5d4785936e3e8937047daafd874c792668dc8528 (patch)
treeb39b15e5369faea0c2cf9b13e2613a2ed13ecf7e
parent1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (diff)
downloadmemcached-5d4785936e3e8937047daafd874c792668dc8528.tar.gz
queue: replace c->io_pending to avoid a mutex
since multiple queues can be sent to different sidethreads, we need a new mechanism for knowing when to return everything. In the common case only one queue will be active, so adding a mutex would be excessive.
-rw-r--r--memcached.c50
-rw-r--r--memcached.h21
-rw-r--r--storage.c14
3 files changed, 50 insertions, 35 deletions
diff --git a/memcached.c b/memcached.c
index 94597ea..1c5fce0 100644
--- a/memcached.c
+++ b/memcached.c
@@ -517,6 +517,14 @@ void conn_close_idle(conn *c) {
/* bring conn back from a sidethread. could have had its event base moved. */
void conn_worker_readd(conn *c) {
+ if (c->state == conn_io_queue) {
+ c->io_queues_submitted--;
+ // If we're still waiting for other queues to return, don't re-add the
+ // connection yet.
+ if (c->io_queues_submitted != 0) {
+ return;
+ }
+ }
c->ev_flags = EV_READ | EV_PERSIST;
event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
event_base_set(c->thread->base, &c->event);
@@ -530,13 +538,8 @@ void conn_worker_readd(conn *c) {
if (c->state == conn_closing) {
drive_machine(c);
return;
- }
-
- // If we had IO objects, process
- if (c->io_queued) {
- c->io_queued = false;
- // state should be conn_io_queue already, so it will know how to
- // dequeue and finalize the async work.
+ } else if (c->state == conn_io_queue) {
+ // machine will know how to return based on secondary state.
drive_machine(c);
} else {
conn_set_state(c, conn_new_cmd);
@@ -692,7 +695,7 @@ conn *conn_new(const int sfd, enum conn_states init_state,
c->last_cmd_time = current_time; /* initialize for idle kicker */
// wipe all queues.
memset(c->io_queues, 0, sizeof(c->io_queues));
- c->io_pending = 0;
+ c->io_queues_submitted = 0;
c->item = 0;
@@ -3171,26 +3174,25 @@ static void drive_machine(conn *c) {
* remove the connection from the worker thread and dispatch the
* IO queue
*/
- if (c->io_pending) {
- assert(c->io_queued == false);
- conn_set_state(c, conn_io_queue);
- event_del(&c->event);
- c->io_queued = true;
- // TODO: write as for loop?
- /*io_queue_t *q = c->io_queues;
- while (q->type != IO_QUEUE_NONE) {
- if (q->stack_ctx != NULL) {
- q->submit_cb(q->ctx, q->stack_ctx);
- }
- q++;
- }*/
+ if (c->io_queues[0].type != IO_QUEUE_NONE) {
+ assert(c->io_queues_submitted == 0);
+ bool hit = false;
+
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
- if (q->stack_ctx != NULL) {
+ if (q->count != 0) {
+ assert(q->stack_ctx != NULL);
+ hit = true;
q->submit_cb(q->ctx, q->stack_ctx);
+ c->io_queues_submitted++;
}
}
- stop = true;
- break;
+ if (hit) {
+ conn_set_state(c, conn_io_queue);
+ event_del(&c->event);
+
+ stop = true;
+ break;
+ }
}
switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {
diff --git a/memcached.h b/memcached.h
index bce4f41..2eb9ebb 100644
--- a/memcached.h
+++ b/memcached.h
@@ -674,6 +674,21 @@ typedef struct conn conn;
typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
typedef void (*io_queue_cb)(io_pending_t *pending);
+// this structure's ownership gets passed between threads:
+// - owned normally by the worker thread.
+// - multiple queues can be submitted at the same time.
+// - each queue can be sent to different background threads.
+// - each submitted queue needs to know when to return to the worker.
+// - the worker needs to know when all queues have returned so it can process.
+//
+// io_queue_t's count field is owned by worker until submitted. Then owned by
+// side thread until returned.
+// conn->io_queues_submitted is always owned by the worker thread. it is
+// incremented as the worker submits queues, and decremented as it gets pinged
+// for returned threads.
+//
+// All of this is to avoid having to hit a mutex owned by the connection
+// thread that gets pinged for each thread (or an equivalent atomic).
typedef struct {
void *ctx; // untouched ptr for specific context
void *stack_ctx; // module-specific context to be batch-submitted
@@ -681,12 +696,13 @@ typedef struct {
io_queue_stack_cb complete_cb;
io_queue_cb finalize_cb; // called back on the worker thread.
int type;
+ int count; // ios to process before returning. only accessed by queue processor once submitted
} io_queue_t;
struct _io_pending_t {
io_queue_t *q;
conn *c;
- mc_resp *resp; /* associated response object */
+ mc_resp *resp; // associated response object
char data[120];
};
@@ -735,9 +751,8 @@ struct conn {
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
- int io_pending; /* number of deferred IO requests */
+ int io_queues_submitted; /* see notes on io_queue_t */
io_queue_t io_queues[3]; /* set of deferred IO queues. */
- bool io_queued; /* IO's were queued. */
#ifdef EXTSTORE
unsigned int recache_counter;
#endif
diff --git a/storage.c b/storage.c
index cfa0206..3c0af88 100644
--- a/storage.c
+++ b/storage.c
@@ -220,14 +220,13 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
p->miss = false;
}
- c->io_pending--;
+ p->q->count--;
p->active = false;
//assert(c->io_wrapleft >= 0);
// All IO's have returned, lets re-attach this connection to our original
// thread.
- if (c->io_pending == 0) {
- assert(c->io_queued == true);
+ if (p->q->count == 0) {
redispatch_conn(c);
}
}
@@ -256,7 +255,6 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
}
if (new_it == NULL)
return -1;
- assert(!c->io_queued); // FIXME: debugging.
// so we can free the chunk on a miss
new_it->slabs_clsid = clsid;
@@ -336,8 +334,8 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
q->stack_ctx = eio;
// No need to stack the io_pending's together as they live on mc_resp's.
- assert(c->io_pending >= 0);
- c->io_pending++;
+ assert(q->count >= 0);
+ q->count++;
// reference ourselves for the callback.
eio->data = (void *)p;
@@ -386,8 +384,8 @@ static void recache_or_free(io_pending_t *pending) {
do_free = false;
size_t ntotal = ITEM_ntotal(p->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
- c->io_pending--;
- assert(c->io_pending >= 0);
+ p->q->count--;
+ assert(p->q->count >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);