summaryrefslogtreecommitdiff
path: root/memcached.h
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2020-08-30 23:27:34 -0700
committerdormando <dormando@rydia.net>2020-10-30 15:50:12 -0700
commit1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (patch)
treed3db2b10a187dcb2b7f6b43086095d05e00c26fc /memcached.h
parent0d4cd8af759ff29562cf17a91ed7f909f88f6e32 (diff)
downloadmemcached-1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f.tar.gz
core: restructure IO queue callbacks
mc_resp is the proper owner of a pending IO once it's been initialized; release it during resp_finish(). Also adds a completion callback which runs on the submitted stack after returning to the worker thread but before the response is transmitted. allows re-queueing for pending IO if processing a response generates another pending IO. also allows a further refactor to run more extstore code on the worker thread instead of the IO threads. uses proper conn_io_queue state to describe connections waiting for pending IO's.
Diffstat (limited to 'memcached.h')
-rw-r--r--memcached.h31
1 files changed, 18 insertions, 13 deletions
diff --git a/memcached.h b/memcached.h
index 73253c6..bce4f41 100644
--- a/memcached.h
+++ b/memcached.h
@@ -195,6 +195,7 @@ enum conn_states {
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
conn_watch, /**< held by the logger thread as a watcher */
+ conn_io_queue, /**< wait on async. process to get response object */
conn_max_state /**< Max state value (used for assertion) */
};
@@ -626,6 +627,7 @@ typedef struct {
/**
* Response objects
*/
+typedef struct _io_pending_t io_pending_t;
#define MC_RESP_IOVCOUNT 4
typedef struct _mc_resp {
mc_resp_bundle *bundle; // ptr back to bundle
@@ -633,6 +635,7 @@ typedef struct _mc_resp {
int wbytes; // bytes to write out of wbuf: might be able to nuke this.
int tosend; // total bytes to send for this response
void *write_and_free; /** free this memory after finishing writing */
+ io_pending_t *io_pending; /* pending IO descriptor for this response */
item *item; /* item associated with this response object, with reference held */
struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
@@ -668,23 +671,25 @@ typedef struct conn conn;
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1
-typedef struct _io_pending_t {
- struct _io_pending_t *next;
- conn *c;
- mc_resp *resp; /* associated response object */
- char data[120];
-} io_pending_t;
-typedef void (*io_queue_add_cb)(void *ctx, io_pending_t *pending);
-typedef void (*io_queue_free_cb)(void *ctx, io_pending_t *pending);
+typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
+typedef void (*io_queue_cb)(io_pending_t *pending);
typedef struct {
- io_pending_t *head_pending;
- void *ctx;
- io_queue_add_cb cb;
- io_queue_free_cb free_cb;
+ void *ctx; // untouched ptr for specific context
+ void *stack_ctx; // module-specific context to be batch-submitted
+ io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
+ io_queue_stack_cb complete_cb;
+ io_queue_cb finalize_cb; // called back on the worker thread.
int type;
} io_queue_t;
+struct _io_pending_t {
+ io_queue_t *q;
+ conn *c;
+ mc_resp *resp; /* associated response object */
+ char data[120];
+};
+
/**
* The structure representing a connection into memcached.
*/
@@ -807,7 +812,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb);
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb);
io_queue_t *conn_io_queue_get(conn *c, int type);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);