diff options
author | dormando <dormando@rydia.net> | 2023-01-13 15:22:26 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-02-24 17:43:54 -0800 |
commit | 6442017c545a2a5ad076697b8695cd64bd32b542 (patch) | |
tree | a95c5443a72f5cfbe5b1c32fe5cf552da3201b0c /proxy.h | |
parent | 833a7234bbaed264a9973141850a23df4eb1b939 (diff) | |
download | memcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz |
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }`
By default the IO thread is not used; instead a backend connection is
created for each worker thread. This can be overridden by setting
`iothread = true` when creating a pool.
`mcp.pool(p, { dist = etc, beprefix = "etc" }`
If a `beprefix` is added to pool arguments, it will create unique
backend connections for this pool. This allows you to create multiple
sockets per backend by making multiple pools with unique prefixes.
There are legitimate use cases for sharing backend connections across
different pools, which is why that is the default behavior.
Diffstat (limited to 'proxy.h')
-rw-r--r-- | proxy.h | 12 |
1 files changed, 9 insertions, 3 deletions
@@ -196,7 +196,7 @@ typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t; typedef struct { lua_State *proxy_state; void *proxy_code; - proxy_event_thread_t *proxy_threads; + proxy_event_thread_t *proxy_io_thread; pthread_mutex_t config_lock; pthread_cond_t config_cond; pthread_t config_tid; @@ -209,6 +209,7 @@ typedef struct { bool worker_done; // signal variable for the worker lock/cond system. bool worker_failed; // covered by worker_lock as well. bool use_uring; // use IO_URING for backend connections. + bool loading; // bool indicating an active config load. struct proxy_global_stats global_stats; struct proxy_user_stats user_stats; struct proxy_tunables tunables; // NOTE: updates covered by stats_lock @@ -351,6 +352,7 @@ struct mcp_backend_s { bool can_write; // recently got a WANT_WRITE or are connecting. bool stacked; // if backend already queued for syscalls. bool bad; // timed out, marked as bad. + bool use_io_thread; // note if this backend is worker-local or not. struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes char name[MAX_NAMELEN+1]; char port[MAX_PORTLEN+1]; @@ -456,21 +458,25 @@ struct mcp_pool_s { proxy_ctx_t *ctx; // main context. STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator. char key_filter_conf[KEY_HASH_FILTER_MAX+1]; + char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter. uint64_t hash_seed; // calculated from a string. int refcount; int phc_ref; int self_ref; // TODO (v2): double check that this is needed. int pool_size; + bool use_iothread; mcp_pool_be_t pool[]; }; typedef struct { mcp_pool_t *main; // ptr to original + mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread. } mcp_pool_proxy_t; // networking interface -void proxy_init_evthread_events(proxy_event_thread_t *t); +void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base); void *proxy_event_thread(void *arg); +void proxy_run_backend_queue(be_head_t *head); // await interface enum mcp_await_e { @@ -509,7 +515,7 @@ int mcplib_open_dist_jump_hash(lua_State *L); int mcplib_open_dist_ring_hash(lua_State *L); int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c); -mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len); +mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len); void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p); int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len); void proxy_lua_error(lua_State *L, const char *s); |