summaryrefslogtreecommitdiff
path: root/proxy_config.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-13 15:22:26 -0800
committerdormando <dormando@rydia.net>2023-02-24 17:43:54 -0800
commit6442017c545a2a5ad076697b8695cd64bd32b542 (patch)
treea95c5443a72f5cfbe5b1c32fe5cf552da3201b0c /proxy_config.c
parent833a7234bbaed264a9973141850a23df4eb1b939 (diff)
downloadmemcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }` By default the IO thread is not used; instead a backend connection is created for each worker thread. This can be overridden by setting `iothread = true` when creating a pool. `mcp.pool(p, { dist = etc, beprefix = "etc" }` If a `beprefix` is added to pool arguments, it will create unique backend connections for this pool. This allows you to create multiple sockets per backend by making multiple pools with unique prefixes. There are legitimate use cases for sharing backend connections across different pools, which is why that is the default behavior.
Diffstat (limited to 'proxy_config.c')
-rw-r--r--proxy_config.c48
1 files changed, 42 insertions, 6 deletions
diff --git a/proxy_config.c b/proxy_config.c
index 6bb54be..e45711e 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -41,11 +41,40 @@ static const char * _load_helper(lua_State *L, void *data, size_t *size) {
void proxy_start_reload(void *arg) {
proxy_ctx_t *ctx = arg;
if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
+ ctx->loading = true;
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
}
}
+int proxy_first_confload(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ pthread_mutex_lock(&ctx->config_lock);
+ ctx->loading = true;
+ pthread_cond_signal(&ctx->config_cond);
+ pthread_mutex_unlock(&ctx->config_lock);
+
+ while (1) {
+ bool stop = false;
+ pthread_mutex_lock(&ctx->config_lock);
+ if (!ctx->loading) {
+ stop = true;
+ }
+ pthread_mutex_unlock(&ctx->config_lock);
+ if (stop)
+ break;
+ }
+ int fails = 0;
+ STAT_L(ctx);
+ fails = ctx->global_stats.config_reload_fails;
+ STAT_UL(ctx);
+ if (fails) {
+ return -1;
+ }
+
+ return 0;
+}
+
// Manages a queue of inbound objects destined to be deallocated.
static void *_proxy_manager_thread(void *arg) {
proxy_ctx_t *ctx = arg;
@@ -108,6 +137,7 @@ static void *_proxy_config_thread(void *arg) {
logger_create();
pthread_mutex_lock(&ctx->config_lock);
while (1) {
+ ctx->loading = false;
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
@@ -233,7 +263,7 @@ int proxy_load_config(void *arg) {
return 0;
}
-static int _copy_pool(lua_State *from, lua_State *to) {
+static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
// from, -3 should have he userdata.
mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
size_t size = sizeof(mcp_pool_proxy_t);
@@ -241,16 +271,22 @@ static int _copy_pool(lua_State *from, lua_State *to) {
luaL_setmetatable(to, "mcp.pool_proxy");
pp->main = p;
+ if (p->use_iothread) {
+ pp->pool = p->pool;
+ } else {
+ // allow 0 indexing for backends when unique to each worker thread
+ pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
+ }
pthread_mutex_lock(&p->lock);
p->refcount++;
pthread_mutex_unlock(&p->lock);
return 0;
}
-static void _copy_config_table(lua_State *from, lua_State *to);
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
// (from, -1) is the source value
// should end with (to, -1) being the new value.
-static void _copy_config_table(lua_State *from, lua_State *to) {
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
int type = lua_type(from, -1);
bool found = false;
luaL_checkstack(from, 4, "configuration error: table recursion too deep");
@@ -266,7 +302,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
if (lua_rawget(from, -2) != LUA_TNIL) {
const char *name = lua_tostring(from, -1);
if (strcmp(name, "mcp.pool") == 0) {
- _copy_pool(from, to);
+ _copy_pool(from, to, thr);
found = true;
}
}
@@ -323,7 +359,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
// lua_settable(to, n) - n being the table
// takes -2 key -1 value, pops both.
// use lua_absindex(L, -1) and so to convert easier?
- _copy_config_table(from, to); // push next value.
+ _copy_config_table(from, to, thr); // push next value.
lua_settable(to, nt);
lua_pop(from, 1); // drop value, keep key.
}
@@ -385,7 +421,7 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
// If the setjump/longjump combos are compatible a pcall for from and
// atpanic for to might work best, since the config VM is/should be long
// running and worker VM's should be rotated.
- _copy_config_table(ctx->proxy_state, L);
+ _copy_config_table(ctx->proxy_state, L, thr);
// copied value is in front of route function, now call it.
if (lua_pcall(L, 1, 1, 0) != LUA_OK) {