diff options
author | dormando <dormando@rydia.net> | 2021-08-02 16:25:43 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | 3fc8775bf081f0cf84fe16058f834b951953c269 (patch) | |
tree | 46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /memcached.h | |
parent | 57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff) | |
download | memcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz |
core: io_queue flow second attempt
probably squash into previous commit.
io->c->thead can change for orpahned IO's, so we had to directly add the
original worker thread as a reference.
also tried again to split callbacks onto the thread and off of the
connection for similar reasons; sometimes we just need the callbacks,
sometimes we need both.
Diffstat (limited to 'memcached.h')
-rw-r--r-- | memcached.h | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/memcached.h b/memcached.h index e8ed48b..7b06bb3 100644 --- a/memcached.h +++ b/memcached.h @@ -621,6 +621,47 @@ typedef struct { unsigned short page_id; /* from IO header */ } item_hdr; #endif + +#define IO_QUEUE_COUNT 3 + +#define IO_QUEUE_NONE 0 +#define IO_QUEUE_EXTSTORE 1 + +typedef struct _io_pending_t io_pending_t; +typedef struct io_queue_s io_queue_t; +typedef void (*io_queue_stack_cb)(io_queue_t *q); +typedef void (*io_queue_cb)(io_pending_t *pending); +// this structure's ownership gets passed between threads: +// - owned normally by the worker thread. +// - multiple queues can be submitted at the same time. +// - each queue can be sent to different background threads. +// - each submitted queue needs to know when to return to the worker. +// - the worker needs to know when all queues have returned so it can process. +// +// io_queue_t's count field is owned by worker until submitted. Then owned by +// side thread until returned. +// conn->io_queues_submitted is always owned by the worker thread. it is +// incremented as the worker submits queues, and decremented as it gets pinged +// for returned threads. +// +// All of this is to avoid having to hit a mutex owned by the connection +// thread that gets pinged for each thread (or an equivalent atomic). +struct io_queue_s { + void *ctx; // duplicated from io_queue_cb_t + void *stack_ctx; // module-specific context to be batch-submitted + int count; // ios to process before returning. only accessed by queue processor once submitted + int type; // duplicated from io_queue_cb_t +}; + +typedef struct io_queue_cb_s { + void *ctx; // untouched ptr for specific context + io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. + io_queue_stack_cb complete_cb; + io_queue_cb return_cb; // called on worker thread. + io_queue_cb finalize_cb; // called back on the worker thread. + int type; +} io_queue_cb_t; + typedef struct _mc_resp_bundle mc_resp_bundle; typedef struct { pthread_t thread_id; /* unique ID of this thread */ @@ -633,6 +674,7 @@ typedef struct { int notify_send_fd; /* sending end of notify pipe */ #endif struct thread_stats stats; /* Stats generated by this thread */ + io_queue_cb_t io_queues[IO_QUEUE_COUNT]; struct conn_queue *ev_queue; /* Worker/conn event queue */ cache_t *rbuf_cache; /* static-sized read buffers */ mc_resp_bundle *open_bundle; @@ -652,7 +694,6 @@ typedef struct { /** * Response objects */ -typedef struct _io_pending_t io_pending_t; #define MC_RESP_IOVCOUNT 4 typedef struct _mc_resp { mc_resp_bundle *bundle; // ptr back to bundle @@ -694,40 +735,9 @@ struct _mc_resp_bundle { typedef struct conn conn; -#define IO_QUEUE_NONE 0 -#define IO_QUEUE_EXTSTORE 1 - -typedef struct io_queue_s io_queue_t; -typedef void (*io_queue_stack_cb)(io_queue_t *q); -typedef int (*io_queue_cb)(io_pending_t *pending); -// this structure's ownership gets passed between threads: -// - owned normally by the worker thread. -// - multiple queues can be submitted at the same time. -// - each queue can be sent to different background threads. -// - each submitted queue needs to know when to return to the worker. -// - the worker needs to know when all queues have returned so it can process. -// -// io_queue_t's count field is owned by worker until submitted. Then owned by -// side thread until returned. -// conn->io_queues_submitted is always owned by the worker thread. it is -// incremented as the worker submits queues, and decremented as it gets pinged -// for returned threads. -// -// All of this is to avoid having to hit a mutex owned by the connection -// thread that gets pinged for each thread (or an equivalent atomic). -struct io_queue_s { - void *ctx; // untouched ptr for specific context - void *stack_ctx; // module-specific context to be batch-submitted - io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. - io_queue_stack_cb complete_cb; - io_queue_cb return_cb; // called on worker thread. - io_queue_cb finalize_cb; // called back on the worker thread. - int type; - int count; // ios to process before returning. only accessed by queue processor once submitted -}; - struct _io_pending_t { - io_queue_t *q; + int io_queue_type; // matches one of IO_QUEUE_* + LIBEVENT_THREAD *thread; conn *c; mc_resp *resp; // associated response object char data[120]; @@ -780,7 +790,7 @@ struct conn { int sbytes; /* how many bytes to swallow */ int io_queues_submitted; /* see notes on io_queue_t */ - io_queue_t io_queues[3]; /* set of deferred IO queues. */ + io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */ #ifdef EXTSTORE unsigned int recache_counter; #endif @@ -861,8 +871,10 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); +void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); +void conn_io_queue_setup(conn *c); io_queue_t *conn_io_queue_get(conn *c, int type); +io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type); void conn_io_queue_return(io_pending_t *io); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); @@ -891,7 +903,7 @@ extern int daemonize(int nochdir, int noclose); void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); void timeout_conn(conn *c); -void return_io_pending(conn *c, io_pending_t *io); +void return_io_pending(io_pending_t *io); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); void sidethread_conn_close(conn *c); |