summaryrefslogtreecommitdiff
path: root/proxy_lua.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-11-22 19:27:28 -0800
committerdormando <dormando@rydia.net>2022-12-12 16:07:09 -0800
commitd401611ba88db17c38fedf97d336f8085ce24bab (patch)
tree92ab4f8e947a59eb12bf7a87219667fb5c054116 /proxy_lua.c
parent3cbd069ed883d1405c068d0bc104a2a0b2ebeecb (diff)
downloadmemcached-d401611ba88db17c38fedf97d336f8085ce24bab.tar.gz
proxy: fix lifecycle of backend connections
A backend's connection object is technically owned by the IO thread after it has been created. An error in how this was done lead to invalid backends being infinitely retried despite the underlying object being collected. This change adds an extra indirection to backend objects: a backend_wrap object, which just turns the backend connection into an arbitrary pointer instead of lua memory owned by the config VM. - When backend connections are created, this pointer is shipped to the IO thread to have its connection instantiated. - When the wrap object is garbage collected (ie; no longer referenced by any pool object), the be conn. pointer is again shipped to the IO thread, which then removes any pending events, closes the sock, and frees data.
Diffstat (limited to 'proxy_lua.c')
-rw-r--r--proxy_lua.c171
1 files changed, 129 insertions, 42 deletions
diff --git a/proxy_lua.c b/proxy_lua.c
index 2172733..7fa07bb 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -90,67 +90,130 @@ static int mcplib_response_gc(lua_State *L) {
// To free a backend: All proxies for a pool are collected, then the central
// pool is collected, which releases backend references, which allows backend
// to be collected.
-static int mcplib_backend_gc(lua_State *L) {
- mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
-
- assert(STAILQ_EMPTY(&be->io_head));
+static int mcplib_backend_wrap_gc(lua_State *L) {
+ mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap");
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- mcmc_disconnect(be->client);
+ if (bew->be != NULL) {
+ mcp_backend_t *be = bew->be;
+ // TODO (v3): technically a race where a backend could be created,
+ // queued, but not picked up before being gc'ed again. In practice
+ // this is impossible but at some point we should close the loop here.
+ // Since we're running in the config thread it could just busy poll
+ // until the connection was picked up.
+ assert(be->transferred);
+ proxy_event_thread_t *e = ctx->proxy_threads;
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->be_notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
+#endif
+ }
- // FIXME (v2): can we ensure a backend always has be->event_thread set, so
- // we can use be->event_thread->ctx?
- proxy_ctx_t *ctx = settings.proxy_ctx;
STAT_DECR(ctx, backend_total, 1);
- free(be->client);
return 0;
}
+static int mcplib_backend_gc(lua_State *L) {
+ return 0; // no-op.
+}
+
+// backend label object; given to pools which then find or create backend
+// objects as necessary.
static int mcplib_backend(lua_State *L) {
- luaL_checkstring(L, -3); // label for indexing backends.
+ size_t llen = 0;
size_t nlen = 0;
- const char *name = luaL_checklstring(L, -2, &nlen);
- const char *port = luaL_checkstring(L, -1);
- proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+ size_t plen = 0;
+ const char *label = luaL_checklstring(L, 1, &llen);
+ const char *name = luaL_checklstring(L, 2, &nlen);
+ const char *port = luaL_checklstring(L, 3, &plen);
+
+ if (llen > MAX_LABELLEN-1) {
+ proxy_lua_error(L, "backend label too long");
+ return 0;
+ }
if (nlen > MAX_NAMELEN-1) {
proxy_lua_error(L, "backend name too long");
return 0;
}
+ if (plen > MAX_PORTLEN-1) {
+ proxy_lua_error(L, "backend port too long");
+ return 0;
+ }
+
+ mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
+ memset(be, 0, sizeof(*be));
+ memcpy(be->label, label, llen);
+ be->label[llen] = '\0';
+ memcpy(be->name, name, nlen);
+ be->name[nlen] = '\0';
+ memcpy(be->port, port, plen);
+ be->port[plen] = '\0';
+ be->llen = llen;
+ luaL_getmetatable(L, "mcp.backend");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ return 1; // return be object.
+}
+
+static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
- lua_pushvalue(L, 1);
+ // Note: The upvalue won't be found unless we're running from a function with it
+ // set as an upvalue.
+ lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
- mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend");
- if (strncmp(be_orig->name, name, MAX_NAMELEN) == 0
- && strncmp(be_orig->port, port, MAX_PORTLEN) == 0) {
+ mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
+ if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
+ && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) {
// backend is the same, return it.
- return 1;
+ return be_orig;
} else {
// backend not the same, pop from stack and make new one.
lua_pop(L, 1);
}
} else {
- lua_pop(L, 1);
+ lua_pop(L, 1); // pop the nil.
}
- // This might shift to internal objects?
- mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0);
+ return NULL;
+}
+
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- // FIXME (v2): remove some of the excess zero'ing below?
- memset(be, 0, sizeof(mcp_backend_t));
- strncpy(be->name, name, MAX_NAMELEN);
- strncpy(be->port, port, MAX_PORTLEN);
- be->depth = 0;
- be->rbufused = 0;
- be->failed_count = 0;
+ mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0);
+ luaL_getmetatable(L, "mcp.backendwrap");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t));
+ if (be == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend connection");
+ return NULL;
+ }
+ bew->be = be;
+
+ strncpy(be->name, bel->name, MAX_NAMELEN+1);
+ strncpy(be->port, bel->port, MAX_PORTLEN+1);
STAILQ_INIT(&be->io_head);
be->state = mcp_backend_read;
- be->connecting = false;
- be->can_write = false;
- be->stacked = false;
- be->bad = false;
// this leaves a permanent buffer on the backend, which is fine
// unless you have billions of backends.
@@ -158,7 +221,7 @@ static int mcplib_backend(lua_State *L) {
be->rbuf = malloc(READ_BUFFER_SIZE);
if (be->rbuf == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// initialize libevent.
@@ -168,7 +231,7 @@ static int mcplib_backend(lua_State *L) {
be->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
if (be->client == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
// This is a trivial fix if we ensure a backend's owning event thread is
@@ -203,18 +266,16 @@ static int mcplib_backend(lua_State *L) {
}
#endif
- luaL_getmetatable(L, "mcp.backend");
- lua_setmetatable(L, -2); // set metatable to userdata.
-
- lua_pushvalue(L, 1); // put the label at the top for settable later.
+ // Add this new backend connection to the object cache.
+ lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
- // set our new backend object into the reference table.
+ // set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
// stack is back to having backend on the top.
STAT_INCR(ctx, backend_total, 1);
- return 1;
+ return bew;
}
static int mcplib_pool_gc(lua_State *L) {
@@ -360,14 +421,30 @@ static int mcplib_pool(lua_State *L) {
lua_pushvalue(L, -1); // dupe self for reference.
p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ // TODO (v2): move to after function check so we can find the right
+ // backend label to look up.
// remember lua arrays are 1 indexed.
for (int x = 1; x <= n; x++) {
mcp_pool_be_t *s = &p->pool[x-1];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
- s->be = luaL_checkudata(L, -1, "mcp.backend");
+ mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
+
+ // check label for pre-existing backend conn/wrapper
+ mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
+ if (bew == NULL) {
+ bew = _mcplib_make_backendconn(L, bel);
+ }
+ s->be = bew->be; // unwrap the backend connection for direct ref.
+
+ // If found from cache or made above, the backend wrapper is on the
+ // top of the stack, so we can now take its reference.
+ // The wrapper abstraction allows the be memory to be owned by its
+ // destination thread (IO thread/etc).
+
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
+ lua_pop(L, 1); // pop the mcp.backend label object.
}
// Allow passing an ignored nil as a second argument. Makes the lua easier
@@ -876,11 +953,15 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
lua_State *L = state;
const struct luaL_Reg mcplib_backend_m[] = {
- {"set", NULL},
{"__gc", mcplib_backend_gc},
{NULL, NULL}
};
+ const struct luaL_Reg mcplib_backend_wrap_m[] = {
+ {"__gc", mcplib_backend_wrap_gc},
+ {NULL, NULL}
+ };
+
const struct luaL_Reg mcplib_request_m[] = {
{"command", mcplib_request_command},
{"key", mcplib_request_key},
@@ -942,6 +1023,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
lua_pop(L, 1);
+ luaL_newmetatable(L, "mcp.backendwrap");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods
+ lua_pop(L, 1);
+
luaL_newmetatable(L, "mcp.request");
lua_pushvalue(L, -1); // duplicate metatable.
lua_setfield(L, -2, "__index"); // mt.__index = mt