diff options
author | dormando <dormando@rydia.net> | 2020-08-30 23:27:34 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2020-10-30 15:50:12 -0700 |
commit | 1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (patch) | |
tree | d3db2b10a187dcb2b7f6b43086095d05e00c26fc /memcached.h | |
parent | 0d4cd8af759ff29562cf17a91ed7f909f88f6e32 (diff) | |
download | memcached-1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f.tar.gz |
core: restructure IO queue callbacks
mc_resp is the proper owner of a pending IO once it's been initialized;
release it during resp_finish(). Also adds a completion callback which
runs on the submitted stack after returning to the worker thread but
before the response is transmitted.
allows re-queueing for pending IO if processing a response generates
another pending IO. also allows a further refactor to run more extstore
code on the worker thread instead of the IO threads.
uses proper conn_io_queue state to describe connections waiting for
pending IO's.
Diffstat (limited to 'memcached.h')
-rw-r--r-- | memcached.h | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/memcached.h b/memcached.h index 73253c6..bce4f41 100644 --- a/memcached.h +++ b/memcached.h @@ -195,6 +195,7 @@ enum conn_states { conn_mwrite, /**< writing out many items sequentially */ conn_closed, /**< connection is closed */ conn_watch, /**< held by the logger thread as a watcher */ + conn_io_queue, /**< wait on async. process to get response object */ conn_max_state /**< Max state value (used for assertion) */ }; @@ -626,6 +627,7 @@ typedef struct { /** * Response objects */ +typedef struct _io_pending_t io_pending_t; #define MC_RESP_IOVCOUNT 4 typedef struct _mc_resp { mc_resp_bundle *bundle; // ptr back to bundle @@ -633,6 +635,7 @@ typedef struct _mc_resp { int wbytes; // bytes to write out of wbuf: might be able to nuke this. int tosend; // total bytes to send for this response void *write_and_free; /** free this memory after finishing writing */ + io_pending_t *io_pending; /* pending IO descriptor for this response */ item *item; /* item associated with this response object, with reference held */ struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */ @@ -668,23 +671,25 @@ typedef struct conn conn; #define IO_QUEUE_NONE 0 #define IO_QUEUE_EXTSTORE 1 -typedef struct _io_pending_t { - struct _io_pending_t *next; - conn *c; - mc_resp *resp; /* associated response object */ - char data[120]; -} io_pending_t; -typedef void (*io_queue_add_cb)(void *ctx, io_pending_t *pending); -typedef void (*io_queue_free_cb)(void *ctx, io_pending_t *pending); +typedef void (*io_queue_stack_cb)(void *ctx, void *stack); +typedef void (*io_queue_cb)(io_pending_t *pending); typedef struct { - io_pending_t *head_pending; - void *ctx; - io_queue_add_cb cb; - io_queue_free_cb free_cb; + void *ctx; // untouched ptr for specific context + void *stack_ctx; // module-specific context to be batch-submitted + io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once. + io_queue_stack_cb complete_cb; + io_queue_cb finalize_cb; // called back on the worker thread. int type; } io_queue_t; +struct _io_pending_t { + io_queue_t *q; + conn *c; + mc_resp *resp; /* associated response object */ + char data[120]; +}; + /** * The structure representing a connection into memcached. */ @@ -807,7 +812,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, uint64_t *cas, const uint32_t hv, item **it_ret); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); -void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb); +void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb); io_queue_t *conn_io_queue_get(conn *c, int type); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base, void *ssl); |