summaryrefslogtreecommitdiff
path: root/memcached.h
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-07-30 15:06:05 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (patch)
tree24f78fa1f5b5842be2cf2ca7f108367f198d610c /memcached.h
parentc25d0bd68ab0f6226c2d979bf0951923624926dd (diff)
downloadmemcached-57493bfca4d16f19aa6d591d29f19f3d2ad160f8.tar.gz
core: io_queue_t flow mode
instead of passing ownership of (io_queue_t)*q to the side thread, instead the ownership of IO objects are passed to the side thread, which are then individually returned. The worker thread runs return_cb() on each, determining when it's done with the response batch. this interface could use more explicit functions to make it more clear. Ownership of *q isn't actually "passed" anywhere, it's just used or not used depending on which return function the owner wants.
Diffstat (limited to 'memcached.h')
-rw-r--r--memcached.h14
1 files changed, 9 insertions, 5 deletions
diff --git a/memcached.h b/memcached.h
index aab21cd..e8ed48b 100644
--- a/memcached.h
+++ b/memcached.h
@@ -697,8 +697,9 @@ typedef struct conn conn;
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1
-typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
-typedef void (*io_queue_cb)(io_pending_t *pending);
+typedef struct io_queue_s io_queue_t;
+typedef void (*io_queue_stack_cb)(io_queue_t *q);
+typedef int (*io_queue_cb)(io_pending_t *pending);
// this structure's ownership gets passed between threads:
// - owned normally by the worker thread.
// - multiple queues can be submitted at the same time.
@@ -714,15 +715,16 @@ typedef void (*io_queue_cb)(io_pending_t *pending);
//
// All of this is to avoid having to hit a mutex owned by the connection
// thread that gets pinged for each thread (or an equivalent atomic).
-typedef struct {
+struct io_queue_s {
void *ctx; // untouched ptr for specific context
void *stack_ctx; // module-specific context to be batch-submitted
io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
io_queue_stack_cb complete_cb;
+ io_queue_cb return_cb; // called on worker thread.
io_queue_cb finalize_cb; // called back on the worker thread.
int type;
int count; // ios to process before returning. only accessed by queue processor once submitted
-} io_queue_t;
+};
struct _io_pending_t {
io_queue_t *q;
@@ -859,8 +861,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb);
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
io_queue_t *conn_io_queue_get(conn *c, int type);
+void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);
@@ -888,6 +891,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
+void return_io_pending(conn *c, io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);