summaryrefslogtreecommitdiff
path: root/memcached.h
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-08-02 16:25:43 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit3fc8775bf081f0cf84fe16058f834b951953c269 (patch)
tree46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /memcached.h
parent57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff)
downloadmemcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz
core: io_queue flow second attempt
probably squash into previous commit. io->c->thead can change for orpahned IO's, so we had to directly add the original worker thread as a reference. also tried again to split callbacks onto the thread and off of the connection for similar reasons; sometimes we just need the callbacks, sometimes we need both.
Diffstat (limited to 'memcached.h')
-rw-r--r--memcached.h86
1 files changed, 49 insertions, 37 deletions
diff --git a/memcached.h b/memcached.h
index e8ed48b..7b06bb3 100644
--- a/memcached.h
+++ b/memcached.h
@@ -621,6 +621,47 @@ typedef struct {
unsigned short page_id; /* from IO header */
} item_hdr;
#endif
+
+#define IO_QUEUE_COUNT 3
+
+#define IO_QUEUE_NONE 0
+#define IO_QUEUE_EXTSTORE 1
+
+typedef struct _io_pending_t io_pending_t;
+typedef struct io_queue_s io_queue_t;
+typedef void (*io_queue_stack_cb)(io_queue_t *q);
+typedef void (*io_queue_cb)(io_pending_t *pending);
+// this structure's ownership gets passed between threads:
+// - owned normally by the worker thread.
+// - multiple queues can be submitted at the same time.
+// - each queue can be sent to different background threads.
+// - each submitted queue needs to know when to return to the worker.
+// - the worker needs to know when all queues have returned so it can process.
+//
+// io_queue_t's count field is owned by worker until submitted. Then owned by
+// side thread until returned.
+// conn->io_queues_submitted is always owned by the worker thread. it is
+// incremented as the worker submits queues, and decremented as it gets pinged
+// for returned threads.
+//
+// All of this is to avoid having to hit a mutex owned by the connection
+// thread that gets pinged for each thread (or an equivalent atomic).
+struct io_queue_s {
+ void *ctx; // duplicated from io_queue_cb_t
+ void *stack_ctx; // module-specific context to be batch-submitted
+ int count; // ios to process before returning. only accessed by queue processor once submitted
+ int type; // duplicated from io_queue_cb_t
+};
+
+typedef struct io_queue_cb_s {
+ void *ctx; // untouched ptr for specific context
+ io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
+ io_queue_stack_cb complete_cb;
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
+ int type;
+} io_queue_cb_t;
+
typedef struct _mc_resp_bundle mc_resp_bundle;
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
@@ -633,6 +674,7 @@ typedef struct {
int notify_send_fd; /* sending end of notify pipe */
#endif
struct thread_stats stats; /* Stats generated by this thread */
+ io_queue_cb_t io_queues[IO_QUEUE_COUNT];
struct conn_queue *ev_queue; /* Worker/conn event queue */
cache_t *rbuf_cache; /* static-sized read buffers */
mc_resp_bundle *open_bundle;
@@ -652,7 +694,6 @@ typedef struct {
/**
* Response objects
*/
-typedef struct _io_pending_t io_pending_t;
#define MC_RESP_IOVCOUNT 4
typedef struct _mc_resp {
mc_resp_bundle *bundle; // ptr back to bundle
@@ -694,40 +735,9 @@ struct _mc_resp_bundle {
typedef struct conn conn;
-#define IO_QUEUE_NONE 0
-#define IO_QUEUE_EXTSTORE 1
-
-typedef struct io_queue_s io_queue_t;
-typedef void (*io_queue_stack_cb)(io_queue_t *q);
-typedef int (*io_queue_cb)(io_pending_t *pending);
-// this structure's ownership gets passed between threads:
-// - owned normally by the worker thread.
-// - multiple queues can be submitted at the same time.
-// - each queue can be sent to different background threads.
-// - each submitted queue needs to know when to return to the worker.
-// - the worker needs to know when all queues have returned so it can process.
-//
-// io_queue_t's count field is owned by worker until submitted. Then owned by
-// side thread until returned.
-// conn->io_queues_submitted is always owned by the worker thread. it is
-// incremented as the worker submits queues, and decremented as it gets pinged
-// for returned threads.
-//
-// All of this is to avoid having to hit a mutex owned by the connection
-// thread that gets pinged for each thread (or an equivalent atomic).
-struct io_queue_s {
- void *ctx; // untouched ptr for specific context
- void *stack_ctx; // module-specific context to be batch-submitted
- io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
- io_queue_stack_cb complete_cb;
- io_queue_cb return_cb; // called on worker thread.
- io_queue_cb finalize_cb; // called back on the worker thread.
- int type;
- int count; // ios to process before returning. only accessed by queue processor once submitted
-};
-
struct _io_pending_t {
- io_queue_t *q;
+ int io_queue_type; // matches one of IO_QUEUE_*
+ LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; // associated response object
char data[120];
@@ -780,7 +790,7 @@ struct conn {
int sbytes; /* how many bytes to swallow */
int io_queues_submitted; /* see notes on io_queue_t */
- io_queue_t io_queues[3]; /* set of deferred IO queues. */
+ io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
#ifdef EXTSTORE
unsigned int recache_counter;
#endif
@@ -861,8 +871,10 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
+void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
+void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
+io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);
@@ -891,7 +903,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
-void return_io_pending(conn *c, io_pending_t *io);
+void return_io_pending(io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);