diff options
author | dormando <dormando@rydia.net> | 2021-07-30 15:06:05 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | 57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (patch) | |
tree | 24f78fa1f5b5842be2cf2ca7f108367f198d610c /memcached.h | |
parent | c25d0bd68ab0f6226c2d979bf0951923624926dd (diff) | |
download | memcached-57493bfca4d16f19aa6d591d29f19f3d2ad160f8.tar.gz |
core: io_queue_t flow mode
instead of passing ownership of (io_queue_t)*q to the side thread,
instead the ownership of IO objects are passed to the side thread, which
are then individually returned. The worker thread runs return_cb() on
each, determining when it's done with the response batch.
this interface could use more explicit functions to make it more clear.
Ownership of *q isn't actually "passed" anywhere, it's just used or not
used depending on which return function the owner wants.
Diffstat (limited to 'memcached.h')
-rw-r--r-- | memcached.h | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/memcached.h b/memcached.h index aab21cd..e8ed48b 100644 --- a/memcached.h +++ b/memcached.h @@ -697,8 +697,9 @@ typedef struct conn conn; #define IO_QUEUE_NONE 0 #define IO_QUEUE_EXTSTORE 1 -typedef void (*io_queue_stack_cb)(void *ctx, void *stack); -typedef void (*io_queue_cb)(io_pending_t *pending); +typedef struct io_queue_s io_queue_t; +typedef void (*io_queue_stack_cb)(io_queue_t *q); +typedef int (*io_queue_cb)(io_pending_t *pending); // this structure's ownership gets passed between threads: // - owned normally by the worker thread. // - multiple queues can be submitted at the same time. @@ -714,15 +715,16 @@ typedef void (*io_queue_cb)(io_pending_t *pending); // // All of this is to avoid having to hit a mutex owned by the connection // thread that gets pinged for each thread (or an equivalent atomic). -typedef struct { +struct io_queue_s { void *ctx; // untouched ptr for specific context void *stack_ctx; // module-specific context to be batch-submitted io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. io_queue_stack_cb complete_cb; + io_queue_cb return_cb; // called on worker thread. io_queue_cb finalize_cb; // called back on the worker thread. int type; int count; // ios to process before returning. only accessed by queue processor once submitted -} io_queue_t; +}; struct _io_pending_t { io_queue_t *q; @@ -859,8 +861,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb); +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); io_queue_t *conn_io_queue_get(conn *c, int type); +void conn_io_queue_return(io_pending_t *io); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); @@ -888,6 +891,7 @@ extern int daemonize(int nochdir, int noclose); void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); void timeout_conn(conn *c); +void return_io_pending(conn *c, io_pending_t *io); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); void sidethread_conn_close(conn *c); |