summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-08-02 16:25:43 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit3fc8775bf081f0cf84fe16058f834b951953c269 (patch)
tree46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17
parent57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff)
downloadmemcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz
core: io_queue flow second attempt
probably squash into previous commit. io->c->thead can change for orpahned IO's, so we had to directly add the original worker thread as a reference. also tried again to split callbacks onto the thread and off of the connection for similar reasons; sometimes we just need the callbacks, sometimes we need both.
-rw-r--r--memcached.c78
-rw-r--r--memcached.h86
-rw-r--r--storage.c19
-rw-r--r--storage.h2
-rw-r--r--thread.c23
5 files changed, 120 insertions, 88 deletions
diff --git a/memcached.c b/memcached.c
index 0daee12..a5cd0a6 100644
--- a/memcached.c
+++ b/memcached.c
@@ -558,14 +558,13 @@ void conn_worker_readd(conn *c) {
}
}
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
- io_queue_t *q = c->io_queues;
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
+ io_queue_cb_t *q = t->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
}
q->type = type;
q->ctx = ctx;
- q->stack_ctx = NULL;
q->submit_cb = cb;
q->complete_cb = com_cb;
q->finalize_cb = fin_cb;
@@ -573,6 +572,30 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu
return;
}
+void conn_io_queue_setup(conn *c) {
+ io_queue_cb_t *qcb = c->thread->io_queues;
+ io_queue_t *q = c->io_queues;
+ while (qcb->type != IO_QUEUE_NONE) {
+ q->type = qcb->type;
+ q->ctx = qcb->ctx;
+ q->stack_ctx = NULL;
+ q->count = 0;
+ qcb++;
+ q++;
+ }
+}
+
+io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type) {
+ io_queue_cb_t *q = t->io_queues;
+ while (q->type != IO_QUEUE_NONE) {
+ if (q->type == type) {
+ return q;
+ }
+ q++;
+ }
+ return NULL;
+}
+
io_queue_t *conn_io_queue_get(conn *c, int type) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
@@ -589,25 +612,20 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
// not and handle appropriately.
static void conn_io_queue_complete(conn *c) {
io_queue_t *q = c->io_queues;
+ io_queue_cb_t *qcb = c->thread->io_queues;
while (q->type != IO_QUEUE_NONE) {
if (q->stack_ctx) {
- q->complete_cb(q);
+ qcb->complete_cb(q);
}
+ qcb++;
q++;
}
}
// called to return a single IO object to the original worker thread.
void conn_io_queue_return(io_pending_t *io) {
- io_queue_t *q = io->q;
- int ret = q->return_cb(io);
- // An IO may or may not count against the pending responses.
- if (ret) {
- q->count--;
- if (q->count == 0) {
- conn_worker_readd(io->c);
- }
- }
+ io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type);
+ q->return_cb(io);
return;
}
@@ -1164,7 +1182,8 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) {
if (resp->io_pending) {
// If we had a pending IO, tell it to internally clean up then return
// the main object back to our thread cache.
- resp->io_pending->q->finalize_cb(resp->io_pending);
+ io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type);
+ qcb->finalize_cb(resp->io_pending);
do_cache_free(c->thread->io_cache, resp->io_pending);
resp->io_pending = NULL;
}
@@ -3230,25 +3249,22 @@ static void drive_machine(conn *c) {
* remove the connection from the worker thread and dispatch the
* IO queue
*/
- if (c->io_queues[0].type != IO_QUEUE_NONE) {
- assert(c->io_queues_submitted == 0);
- bool hit = false;
-
- for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
- if (q->count != 0) {
- assert(q->stack_ctx != NULL);
- hit = true;
- q->submit_cb(q);
- c->io_queues_submitted++;
- }
+ assert(c->io_queues_submitted == 0);
+
+ for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
+ if (q->count != 0) {
+ assert(q->stack_ctx != NULL);
+ io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
+ qcb->submit_cb(q);
+ c->io_queues_submitted++;
}
- if (hit) {
- conn_set_state(c, conn_io_queue);
- event_del(&c->event);
+ }
+ if (c->io_queues_submitted != 0) {
+ conn_set_state(c, conn_io_queue);
+ event_del(&c->event);
- stop = true;
- break;
- }
+ stop = true;
+ break;
}
switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {
diff --git a/memcached.h b/memcached.h
index e8ed48b..7b06bb3 100644
--- a/memcached.h
+++ b/memcached.h
@@ -621,6 +621,47 @@ typedef struct {
unsigned short page_id; /* from IO header */
} item_hdr;
#endif
+
+#define IO_QUEUE_COUNT 3
+
+#define IO_QUEUE_NONE 0
+#define IO_QUEUE_EXTSTORE 1
+
+typedef struct _io_pending_t io_pending_t;
+typedef struct io_queue_s io_queue_t;
+typedef void (*io_queue_stack_cb)(io_queue_t *q);
+typedef void (*io_queue_cb)(io_pending_t *pending);
+// this structure's ownership gets passed between threads:
+// - owned normally by the worker thread.
+// - multiple queues can be submitted at the same time.
+// - each queue can be sent to different background threads.
+// - each submitted queue needs to know when to return to the worker.
+// - the worker needs to know when all queues have returned so it can process.
+//
+// io_queue_t's count field is owned by worker until submitted. Then owned by
+// side thread until returned.
+// conn->io_queues_submitted is always owned by the worker thread. it is
+// incremented as the worker submits queues, and decremented as it gets pinged
+// for returned threads.
+//
+// All of this is to avoid having to hit a mutex owned by the connection
+// thread that gets pinged for each thread (or an equivalent atomic).
+struct io_queue_s {
+ void *ctx; // duplicated from io_queue_cb_t
+ void *stack_ctx; // module-specific context to be batch-submitted
+ int count; // ios to process before returning. only accessed by queue processor once submitted
+ int type; // duplicated from io_queue_cb_t
+};
+
+typedef struct io_queue_cb_s {
+ void *ctx; // untouched ptr for specific context
+ io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
+ io_queue_stack_cb complete_cb;
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
+ int type;
+} io_queue_cb_t;
+
typedef struct _mc_resp_bundle mc_resp_bundle;
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
@@ -633,6 +674,7 @@ typedef struct {
int notify_send_fd; /* sending end of notify pipe */
#endif
struct thread_stats stats; /* Stats generated by this thread */
+ io_queue_cb_t io_queues[IO_QUEUE_COUNT];
struct conn_queue *ev_queue; /* Worker/conn event queue */
cache_t *rbuf_cache; /* static-sized read buffers */
mc_resp_bundle *open_bundle;
@@ -652,7 +694,6 @@ typedef struct {
/**
* Response objects
*/
-typedef struct _io_pending_t io_pending_t;
#define MC_RESP_IOVCOUNT 4
typedef struct _mc_resp {
mc_resp_bundle *bundle; // ptr back to bundle
@@ -694,40 +735,9 @@ struct _mc_resp_bundle {
typedef struct conn conn;
-#define IO_QUEUE_NONE 0
-#define IO_QUEUE_EXTSTORE 1
-
-typedef struct io_queue_s io_queue_t;
-typedef void (*io_queue_stack_cb)(io_queue_t *q);
-typedef int (*io_queue_cb)(io_pending_t *pending);
-// this structure's ownership gets passed between threads:
-// - owned normally by the worker thread.
-// - multiple queues can be submitted at the same time.
-// - each queue can be sent to different background threads.
-// - each submitted queue needs to know when to return to the worker.
-// - the worker needs to know when all queues have returned so it can process.
-//
-// io_queue_t's count field is owned by worker until submitted. Then owned by
-// side thread until returned.
-// conn->io_queues_submitted is always owned by the worker thread. it is
-// incremented as the worker submits queues, and decremented as it gets pinged
-// for returned threads.
-//
-// All of this is to avoid having to hit a mutex owned by the connection
-// thread that gets pinged for each thread (or an equivalent atomic).
-struct io_queue_s {
- void *ctx; // untouched ptr for specific context
- void *stack_ctx; // module-specific context to be batch-submitted
- io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
- io_queue_stack_cb complete_cb;
- io_queue_cb return_cb; // called on worker thread.
- io_queue_cb finalize_cb; // called back on the worker thread.
- int type;
- int count; // ios to process before returning. only accessed by queue processor once submitted
-};
-
struct _io_pending_t {
- io_queue_t *q;
+ int io_queue_type; // matches one of IO_QUEUE_*
+ LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; // associated response object
char data[120];
@@ -780,7 +790,7 @@ struct conn {
int sbytes; /* how many bytes to swallow */
int io_queues_submitted; /* see notes on io_queue_t */
- io_queue_t io_queues[3]; /* set of deferred IO queues. */
+ io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
#ifdef EXTSTORE
unsigned int recache_counter;
#endif
@@ -861,8 +871,10 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
+void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
+io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);
@@ -891,7 +903,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
-void return_io_pending(conn *c, io_pending_t *io);
+void return_io_pending(io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);
diff --git a/storage.c b/storage.c
index 3c81a02..10567ca 100644
--- a/storage.c
+++ b/storage.c
@@ -23,7 +23,8 @@
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
typedef struct _io_pending_storage_t {
- io_queue_t *q;
+ int io_queue_type;
+ LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; /* original struct ends here */
item *hdr_it; /* original header item. */
@@ -223,13 +224,14 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
p->miss = false;
}
- p->q->count--;
p->active = false;
//assert(c->io_wrapleft >= 0);
// All IO's have returned, lets re-attach this connection to our original
// thread.
- if (p->q->count == 0) {
+ io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
redispatch_conn(c);
}
}
@@ -272,7 +274,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
- p->q = q; // quicker access to the queue structure.
+ p->io_queue_type = IO_QUEUE_EXTSTORE;
obj_io *eio = &p->io_ctx;
// FIXME: error handling.
@@ -387,8 +389,10 @@ static void recache_or_free(io_pending_t *pending) {
do_free = false;
size_t ntotal = ITEM_ntotal(p->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
- p->q->count--;
- assert(p->q->count >= 0);
+
+ io_queue_t *q = conn_io_queue_get(c, p->io_queue_type);
+ q->count--;
+ assert(q->count >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
@@ -453,7 +457,7 @@ void storage_complete_cb(io_queue_t *q) {
}
// Called after responses have been transmitted. Need to free up related data.
-int storage_finalize_cb(io_pending_t *pending) {
+void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
@@ -463,7 +467,6 @@ int storage_finalize_cb(io_pending_t *pending) {
io->iov = NULL;
}
// don't need to free the main context, since it's embedded.
- return 0; // return code ignored.
}
/*
diff --git a/storage.h b/storage.h
index 9257c3f..55c5e1f 100644
--- a/storage.h
+++ b/storage.h
@@ -20,7 +20,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp);
// callbacks for the IO queue subsystem.
void storage_submit_cb(io_queue_t *q);
void storage_complete_cb(io_queue_t *q);
-int storage_finalize_cb(io_pending_t *pending);
+void storage_finalize_cb(io_pending_t *pending);
// Thread functions.
int start_storage_write_thread(void *arg);
diff --git a/thread.c b/thread.c
index c64de52..948e4fe 100644
--- a/thread.c
+++ b/thread.c
@@ -468,6 +468,14 @@ static void setup_thread(LIBEVENT_THREAD *me) {
}
}
#endif
+#ifdef EXTSTORE
+ // me->storage is set just before this function is called.
+ if (me->storage) {
+ thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
+ storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
+ }
+#endif
+ thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
}
/*
@@ -563,14 +571,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
}
} else {
c->thread = me;
-#ifdef EXTSTORE
- if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
- }
-#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
-
+ conn_io_queue_setup(c);
#ifdef TLS
if (settings.ssl_enabled && c->ssl != NULL) {
assert(c->thread && c->thread->ssl_wbuf);
@@ -726,8 +727,8 @@ void timeout_conn(conn *c) {
notify_worker_fd(c->thread, c->sfd, queue_timeout);
}
-void return_io_pending(conn *c, io_pending_t *io) {
- CQ_ITEM *item = cqi_new(c->thread->ev_queue);
+void return_io_pending(io_pending_t *io) {
+ CQ_ITEM *item = cqi_new(io->thread->ev_queue);
if (item == NULL) {
// TODO: how can we avoid this?
// In the main case I just loop, since a malloc failure here for a
@@ -739,7 +740,7 @@ void return_io_pending(conn *c, io_pending_t *io) {
item->mode = queue_return_io;
item->io = io;
- notify_worker(c->thread, item);
+ notify_worker(io->thread, item);
}
/* This misses the allow_new_conns flag :( */