summaryrefslogtreecommitdiff
path: root/proxy_lua.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-13 15:22:26 -0800
committerdormando <dormando@rydia.net>2023-02-24 17:43:54 -0800
commit6442017c545a2a5ad076697b8695cd64bd32b542 (patch)
treea95c5443a72f5cfbe5b1c32fe5cf552da3201b0c /proxy_lua.c
parent833a7234bbaed264a9973141850a23df4eb1b939 (diff)
downloadmemcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }` By default the IO thread is not used; instead a backend connection is created for each worker thread. This can be overridden by setting `iothread = true` when creating a pool. `mcp.pool(p, { dist = etc, beprefix = "etc" }` If a `beprefix` is added to pool arguments, it will create unique backend connections for this pool. This allows you to create multiple sockets per backend by making multiple pools with unique prefixes. There are legitimate use cases for sharing backend connections across different pools, which is why that is the default behavior.
Diffstat (limited to 'proxy_lua.c')
-rw-r--r--proxy_lua.c147
1 files changed, 112 insertions, 35 deletions
diff --git a/proxy_lua.c b/proxy_lua.c
index e6b50ae..aeaf1e5 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) {
// Since we're running in the config thread it could just busy poll
// until the connection was picked up.
assert(be->transferred);
- proxy_event_thread_t *e = ctx->proxy_threads;
+ proxy_event_thread_t *e = be->event_thread;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) {
return 1; // return be object.
}
+// Called with the cache label at top of the stack.
static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
// Note: The upvalue won't be found unless we're running from a function with it
// set as an upvalue.
- lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
@@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_
return NULL;
}
-static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
+ proxy_event_thread_t *e) {
// FIXME: remove global.
proxy_ctx_t *ctx = settings.proxy_ctx;
@@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
STAT_UL(ctx);
be->connect_flags = flags;
- proxy_event_thread_t *e = ctx->proxy_threads;
+ be->event_thread = e;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
}
#endif
+ lua_pushvalue(L, 4); // push the label string back to the top.
// Add this new backend connection to the object cache.
- lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
// set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
@@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
// UD now popped from stack.
}
-// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-static int mcplib_pool(lua_State *L) {
- int argc = lua_gettop(L);
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = luaL_len(L, 1); // get length of array table
-
- size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
- mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
- // Zero the memory before use, so we can realibly use __gc to clean up
- memset(p, 0, plen);
- p->pool_size = n;
- // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
- p->key_hasher = XXH3_64bits_withSeed;
- pthread_mutex_init(&p->lock, NULL);
- p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
-
- luaL_setmetatable(L, "mcp.pool");
-
- lua_pushvalue(L, -1); // dupe self for reference.
- p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- // TODO (v2): move to after function check so we can find the right
- // backend label to look up.
+// in the proxy object, we can alias a ptr to the pool to where it needs to be
+// based on worker number or io_thread right?
+static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
// remember lua arrays are 1 indexed.
- for (int x = 1; x <= n; x++) {
- mcp_pool_be_t *s = &p->pool[x-1];
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
// check label for pre-existing backend conn/wrapper
+ // TODO (v2): there're native ways of "from C make lua strings"
+ int toconcat = 1;
+ if (p->beprefix[0] != '\0') {
+ lua_pushstring(L, p->beprefix);
+ toconcat++;
+ }
+ if (p->use_iothread) {
+ lua_pushstring(L, ":io:");
+ toconcat++;
+ } else {
+ lua_pushstring(L, ":w");
+ lua_pushinteger(L, offset);
+ lua_pushstring(L, ":");
+ toconcat += 3;
+ }
+ lua_pushlstring(L, bel->label, bel->llen);
+ lua_concat(L, toconcat);
+
+ lua_pushvalue(L, -1); // copy the label string for the create method.
mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
if (bew == NULL) {
- bew = _mcplib_make_backendconn(L, bel);
+ bew = _mcplib_make_backendconn(L, bel, t);
}
s->be = bew->be; // unwrap the backend connection for direct ref.
+ bew->be->use_io_thread = p->use_iothread;
// If found from cache or made above, the backend wrapper is on the
// top of the stack, so we can now take its reference.
@@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) {
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
lua_pop(L, 1); // pop the mcp.backend label object.
+ lua_pop(L, 1); // drop extra label copy.
+ }
+}
+
+// call with table of backends in 1
+static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
+ if (p->use_iothread) {
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
+ } else {
+ // TODO (v3) globals.
+ for (int n = 0; n < settings.num_threads; n++) {
+ LIBEVENT_THREAD *t = get_worker_thread(n);
+ _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
+ }
}
+}
+
+// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
+static int mcplib_pool(lua_State *L) {
+ int argc = lua_gettop(L);
+ luaL_checktype(L, 1, LUA_TTABLE);
+ int n = luaL_len(L, 1); // get length of array table
+ int workers = settings.num_threads; // TODO (v3): globals usage.
+
+ size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
+ mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
+ // Zero the memory before use, so we can realibly use __gc to clean up
+ memset(p, 0, plen);
+ p->pool_size = n;
+ p->use_iothread = true;
+ // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
+ p->key_hasher = XXH3_64bits_withSeed;
+ pthread_mutex_init(&p->lock, NULL);
+ p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+
+ luaL_setmetatable(L, "mcp.pool");
+
+ lua_pushvalue(L, -1); // dupe self for reference.
+ p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// Allow passing an ignored nil as a second argument. Makes the lua easier
int type = lua_type(L, 2);
if (argc == 1 || type == LUA_TNIL) {
+ _mcplib_pool_make_be(L, p);
lua_getglobal(L, "mcp");
// TODO (v2): decide on a mcp.default_dist and use that instead
if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
@@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) {
// pool, then pass it along to the a constructor if necessary.
luaL_checktype(L, 2, LUA_TTABLE);
+ if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TBOOLEAN);
+ int use_iothread = lua_toboolean(L, -1);
+ if (use_iothread) {
+ p->use_iothread = true;
+ } else {
+ p->use_iothread = false;
+ }
+ lua_pop(L, 1); // remove value.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+
+ if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ size_t len = 0;
+ const char *bepfx = lua_tolstring(L, -1, &len);
+ memcpy(p->beprefix, bepfx, len);
+ p->beprefix[len+1] = '\0';
+ lua_pop(L, 1); // pop beprefix string.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+ _mcplib_pool_make_be(L, p);
+
// stack: backends, options, mcp.pool
if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
// overriding the distribution function.
@@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) {
lua_pop(L, 1); // remove the dist table from stack.
} else {
lua_pop(L, 1); // pop the nil.
+
+ // use the default dist if not specified with an override table.
+ lua_getglobal(L, "mcp");
+ // TODO (v2): decide on a mcp.default_dist and use that instead
+ if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // pop "dist_jump_hash" value.
+ } else {
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // pop "mcp"
}
if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
@@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) {
return 0;
}
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) {
+ mcp_pool_t *p = pp->main;
if (p->key_filter) {
key = p->key_filter(p->key_filter_conf, key, len, &len);
P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
@@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
}
- return p->pool[lookup].be;
+ return pp->pool[lookup].be;
}
// hashfunc(request) -> backend(request)
@@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
static int mcplib_pool_proxy_call(lua_State *L) {
// internal args are the hash selector (self)
mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
// then request object.
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
@@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
}
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
// now yield request, pool up.
lua_pushinteger(L, MCP_YIELD_POOL);