summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-11-22 19:27:28 -0800
committerdormando <dormando@rydia.net>2022-12-12 16:07:09 -0800
commitd401611ba88db17c38fedf97d336f8085ce24bab (patch)
tree92ab4f8e947a59eb12bf7a87219667fb5c054116
parent3cbd069ed883d1405c068d0bc104a2a0b2ebeecb (diff)
downloadmemcached-d401611ba88db17c38fedf97d336f8085ce24bab.tar.gz
proxy: fix lifecycle of backend connections
A backend's connection object is technically owned by the IO thread after it has been created. An error in how this was done lead to invalid backends being infinitely retried despite the underlying object being collected. This change adds an extra indirection to backend objects: a backend_wrap object, which just turns the backend connection into an arbitrary pointer instead of lua memory owned by the config VM. - When backend connections are created, this pointer is shipped to the IO thread to have its connection instantiated. - When the wrap object is garbage collected (ie; no longer referenced by any pool object), the be conn. pointer is again shipped to the IO thread, which then removes any pending events, closes the sock, and frees data.
-rw-r--r--proxy.h26
-rw-r--r--proxy_config.c5
-rw-r--r--proxy_lua.c171
-rw-r--r--proxy_network.c56
4 files changed, 202 insertions, 56 deletions
diff --git a/proxy.h b/proxy.h
index f12ec1e..073401b 100644
--- a/proxy.h
+++ b/proxy.h
@@ -242,6 +242,8 @@ enum mcp_backend_states {
mcp_backend_next, // advance to the next IO
};
+typedef struct mcp_backend_wrap_s mcp_backend_wrap_t;
+typedef struct mcp_backend_label_s mcp_backend_label_t;
typedef struct mcp_backend_s mcp_backend_t;
typedef struct mcp_request_s mcp_request_t;
typedef struct mcp_parser_s mcp_parser_t;
@@ -287,6 +289,7 @@ struct mcp_request_s {
};
typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
+#define MAX_LABELLEN 512
#define MAX_NAMELEN 255
#define MAX_PORTLEN 6
// TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
@@ -297,10 +300,28 @@ typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
#else
#define BE_IOV_MAX IOV_MAX
#endif
+// lua descriptor object: passed to pools, which create wrappers.
+struct mcp_backend_label_s {
+ char name[MAX_NAMELEN+1];
+ char port[MAX_PORTLEN+1];
+ char label[MAX_LABELLEN+1];
+ size_t llen; // cache label length for small speedup in pool creation.
+};
+
+// lua object wrapper meant to own a malloc'ed conn structure
+// when this object is created, it ships its connection to the real owner
+// (worker, IO thread, etc)
+// when this object is garbage collected, it ships a notice to the owner
+// thread to stop using and free the backend conn memory.
+struct mcp_backend_wrap_s {
+ mcp_backend_t *be;
+};
+
+// FIXME: inline the mcmc client data.
+// TODO: event_thread -> something? union of owner type?
struct mcp_backend_s {
int depth;
int failed_count; // number of fails (timeouts) in a row
- pthread_mutex_t mutex; // covers stack.
proxy_event_thread_t *event_thread; // event thread owning this backend.
void *client; // mcmc client
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
@@ -317,6 +338,7 @@ struct mcp_backend_s {
#endif
enum mcp_backend_states state; // readback state machine
int connect_flags; // flags to pass to mcmc_connect
+ bool transferred; // if beconn has been shipped to owner thread.
bool connecting; // in the process of an asynch connection.
bool validating; // in process of validating a new backend connection.
bool can_write; // recently got a WANT_WRITE or are connecting.
@@ -409,7 +431,7 @@ struct _io_pending_proxy_t {
// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
// - says no.
typedef struct {
- int ref; // luaL_ref reference.
+ int ref; // luaL_ref reference of backend_wrap_t obj.
mcp_backend_t *be;
} mcp_pool_be_t;
diff --git a/proxy_config.c b/proxy_config.c
index 47414fa..6bb54be 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -78,6 +78,11 @@ static void *_proxy_manager_thread(void *arg) {
luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref);
}
pthread_mutex_unlock(&ctx->config_lock);
+ // force lua garbage collection so any resources close out quickly.
+ lua_gc(L, LUA_GCCOLLECT);
+ // twice because objects with garbage collector handlers are only
+ // marked on the first collection cycle.
+ lua_gc(L, LUA_GCCOLLECT);
// done.
pthread_mutex_lock(&ctx->manager_lock);
diff --git a/proxy_lua.c b/proxy_lua.c
index 2172733..7fa07bb 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -90,67 +90,130 @@ static int mcplib_response_gc(lua_State *L) {
// To free a backend: All proxies for a pool are collected, then the central
// pool is collected, which releases backend references, which allows backend
// to be collected.
-static int mcplib_backend_gc(lua_State *L) {
- mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
-
- assert(STAILQ_EMPTY(&be->io_head));
+static int mcplib_backend_wrap_gc(lua_State *L) {
+ mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap");
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- mcmc_disconnect(be->client);
+ if (bew->be != NULL) {
+ mcp_backend_t *be = bew->be;
+ // TODO (v3): technically a race where a backend could be created,
+ // queued, but not picked up before being gc'ed again. In practice
+ // this is impossible but at some point we should close the loop here.
+ // Since we're running in the config thread it could just busy poll
+ // until the connection was picked up.
+ assert(be->transferred);
+ proxy_event_thread_t *e = ctx->proxy_threads;
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->be_notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
+#endif
+ }
- // FIXME (v2): can we ensure a backend always has be->event_thread set, so
- // we can use be->event_thread->ctx?
- proxy_ctx_t *ctx = settings.proxy_ctx;
STAT_DECR(ctx, backend_total, 1);
- free(be->client);
return 0;
}
+static int mcplib_backend_gc(lua_State *L) {
+ return 0; // no-op.
+}
+
+// backend label object; given to pools which then find or create backend
+// objects as necessary.
static int mcplib_backend(lua_State *L) {
- luaL_checkstring(L, -3); // label for indexing backends.
+ size_t llen = 0;
size_t nlen = 0;
- const char *name = luaL_checklstring(L, -2, &nlen);
- const char *port = luaL_checkstring(L, -1);
- proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+ size_t plen = 0;
+ const char *label = luaL_checklstring(L, 1, &llen);
+ const char *name = luaL_checklstring(L, 2, &nlen);
+ const char *port = luaL_checklstring(L, 3, &plen);
+
+ if (llen > MAX_LABELLEN-1) {
+ proxy_lua_error(L, "backend label too long");
+ return 0;
+ }
if (nlen > MAX_NAMELEN-1) {
proxy_lua_error(L, "backend name too long");
return 0;
}
+ if (plen > MAX_PORTLEN-1) {
+ proxy_lua_error(L, "backend port too long");
+ return 0;
+ }
+
+ mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
+ memset(be, 0, sizeof(*be));
+ memcpy(be->label, label, llen);
+ be->label[llen] = '\0';
+ memcpy(be->name, name, nlen);
+ be->name[nlen] = '\0';
+ memcpy(be->port, port, plen);
+ be->port[plen] = '\0';
+ be->llen = llen;
+ luaL_getmetatable(L, "mcp.backend");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ return 1; // return be object.
+}
+
+static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
- lua_pushvalue(L, 1);
+ // Note: The upvalue won't be found unless we're running from a function with it
+ // set as an upvalue.
+ lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
- mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend");
- if (strncmp(be_orig->name, name, MAX_NAMELEN) == 0
- && strncmp(be_orig->port, port, MAX_PORTLEN) == 0) {
+ mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
+ if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
+ && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) {
// backend is the same, return it.
- return 1;
+ return be_orig;
} else {
// backend not the same, pop from stack and make new one.
lua_pop(L, 1);
}
} else {
- lua_pop(L, 1);
+ lua_pop(L, 1); // pop the nil.
}
- // This might shift to internal objects?
- mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0);
+ return NULL;
+}
+
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- // FIXME (v2): remove some of the excess zero'ing below?
- memset(be, 0, sizeof(mcp_backend_t));
- strncpy(be->name, name, MAX_NAMELEN);
- strncpy(be->port, port, MAX_PORTLEN);
- be->depth = 0;
- be->rbufused = 0;
- be->failed_count = 0;
+ mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0);
+ luaL_getmetatable(L, "mcp.backendwrap");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t));
+ if (be == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend connection");
+ return NULL;
+ }
+ bew->be = be;
+
+ strncpy(be->name, bel->name, MAX_NAMELEN+1);
+ strncpy(be->port, bel->port, MAX_PORTLEN+1);
STAILQ_INIT(&be->io_head);
be->state = mcp_backend_read;
- be->connecting = false;
- be->can_write = false;
- be->stacked = false;
- be->bad = false;
// this leaves a permanent buffer on the backend, which is fine
// unless you have billions of backends.
@@ -158,7 +221,7 @@ static int mcplib_backend(lua_State *L) {
be->rbuf = malloc(READ_BUFFER_SIZE);
if (be->rbuf == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// initialize libevent.
@@ -168,7 +231,7 @@ static int mcplib_backend(lua_State *L) {
be->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
if (be->client == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
// This is a trivial fix if we ensure a backend's owning event thread is
@@ -203,18 +266,16 @@ static int mcplib_backend(lua_State *L) {
}
#endif
- luaL_getmetatable(L, "mcp.backend");
- lua_setmetatable(L, -2); // set metatable to userdata.
-
- lua_pushvalue(L, 1); // put the label at the top for settable later.
+ // Add this new backend connection to the object cache.
+ lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
- // set our new backend object into the reference table.
+ // set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
// stack is back to having backend on the top.
STAT_INCR(ctx, backend_total, 1);
- return 1;
+ return bew;
}
static int mcplib_pool_gc(lua_State *L) {
@@ -360,14 +421,30 @@ static int mcplib_pool(lua_State *L) {
lua_pushvalue(L, -1); // dupe self for reference.
p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ // TODO (v2): move to after function check so we can find the right
+ // backend label to look up.
// remember lua arrays are 1 indexed.
for (int x = 1; x <= n; x++) {
mcp_pool_be_t *s = &p->pool[x-1];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
- s->be = luaL_checkudata(L, -1, "mcp.backend");
+ mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
+
+ // check label for pre-existing backend conn/wrapper
+ mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
+ if (bew == NULL) {
+ bew = _mcplib_make_backendconn(L, bel);
+ }
+ s->be = bew->be; // unwrap the backend connection for direct ref.
+
+ // If found from cache or made above, the backend wrapper is on the
+ // top of the stack, so we can now take its reference.
+ // The wrapper abstraction allows the be memory to be owned by its
+ // destination thread (IO thread/etc).
+
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
+ lua_pop(L, 1); // pop the mcp.backend label object.
}
// Allow passing an ignored nil as a second argument. Makes the lua easier
@@ -876,11 +953,15 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
lua_State *L = state;
const struct luaL_Reg mcplib_backend_m[] = {
- {"set", NULL},
{"__gc", mcplib_backend_gc},
{NULL, NULL}
};
+ const struct luaL_Reg mcplib_backend_wrap_m[] = {
+ {"__gc", mcplib_backend_wrap_gc},
+ {NULL, NULL}
+ };
+
const struct luaL_Reg mcplib_request_m[] = {
{"command", mcplib_request_command},
{"key", mcplib_request_key},
@@ -942,6 +1023,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
lua_pop(L, 1);
+ luaL_newmetatable(L, "mcp.backendwrap");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods
+ lua_pop(L, 1);
+
luaL_newmetatable(L, "mcp.request");
lua_pushvalue(L, -1); // duplicate metatable.
lua_setfield(L, -2, "__index"); // mt.__index = mt
diff --git a/proxy_network.c b/proxy_network.c
index adebeee..000666d 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -449,6 +449,28 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
STAT_UL(ctx);
}
+static void _cleanup_backend(mcp_backend_t *be) {
+ // remove any pending events.
+ int pending = 0;
+ if (event_initialized(&be->event)) {
+ pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ }
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
+ event_del(&be->event); // an error to call event_del() without event.
+ }
+
+ // - assert on empty queue
+ assert(STAILQ_EMPTY(&be->io_head));
+
+ mcmc_disconnect(be->client);
+ // - free be->client
+ free(be->client);
+ // - free be->rbuf
+ free(be->rbuf);
+ // - free *be
+ free(be);
+}
+
// event handler for injecting backends for processing
// currently just for initiating connections the first time.
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
@@ -484,19 +506,29 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
// Either that or remove the STAILQ code and just using an array of
// ptr's.
mcp_backend_t *be = NULL;
- STAILQ_FOREACH(be, &head, beconn_next) {
- be->event_thread = t;
- int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
- // if we're already connected for some reason, still push it
- // through the connection handler to keep the code unified. It
- // will auto-wake because the socket is writeable.
- be->connecting = true;
- be->can_write = false;
- _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ // be can be freed by the loop, so can't use STAILQ_FOREACH.
+ while (!STAILQ_EMPTY(&head)) {
+ be = STAILQ_FIRST(&head);
+ STAILQ_REMOVE_HEAD(&head, beconn_next);
+ if (be->transferred) {
+ // If this object was already transferred here, we're being
+ // signalled to clean it up and free.
+ _cleanup_backend(be);
} else {
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed(be);
+ be->transferred = true;
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ }
}
}
}