summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-04 15:36:15 -0800
committerdormando <dormando@rydia.net>2022-02-04 16:56:46 -0800
commitb6fd865985dd8285bd963dfd429d7f475d54d77f (patch)
treeb15989aab41b9aa98d7e83d52dd82e5c3ff79702 /proto_proxy.c
parent8da2e098e3e649712170275a7629474311118752 (diff)
downloadmemcached-b6fd865985dd8285bd963dfd429d7f475d54d77f.tar.gz
proxy: more misc fixes
- fixes potential memory leaks if an error is generated while creating a pool object. - misc comment updates and error handling. - avoid crash if attempting to route commands that don't have a key.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c69
1 files changed, 41 insertions, 28 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index a489a33..525fdb5 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -542,11 +542,8 @@ static void *_proxy_manager_thread(void *arg) {
lua_State *L = ctx->proxy_state;
mcp_pool_t *p;
STAILQ_FOREACH(p, &head, next) {
- // walk the hash selector backends and unref.
- for (int x = 0; x < p->pool_size; x++) {
- luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
- }
- // unref the phc ref.
+ // we let the pool object _gc() handle backend references.
+
luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
// need to... unref self.
// NOTE: double check if we really need to self-reference.
@@ -554,7 +551,6 @@ static void *_proxy_manager_thread(void *arg) {
// before lua garbage collects the object. other things hold a
// reference to the object though.
luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref);
- // that's it? let it float?
}
pthread_mutex_unlock(&ctx->config_lock);
@@ -2675,7 +2671,11 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
lua_setmetatable(Lc, -2);
io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
- // FIXME: can this fail? Yes, I believe it can. need handling.
+ if (p == NULL) {
+ WSTAT_INCR(c, proxy_conn_oom, 1);
+ proxy_lua_error(Lc, "out of memory allocating from IO cache");
+ return;
+ }
// this is a re-cast structure, so assert that we never outsize it.
assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
@@ -2779,14 +2779,16 @@ static int mcplib_response_gc(lua_State *L) {
return 0;
}
+// NOTE: backends are global objects owned by pool objects.
+// Each pool has a "proxy pool object" distributed to each worker VM.
+// proxy pool objects are held at the same time as any request exists on a
+// backend, in the coroutine stack during yield()
+// To free a backend: All proxies for a pool are collected, then the central
+// pool is collected, which releases backend references, which allows backend
+// to be collected.
static int mcplib_backend_gc(lua_State *L) {
mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
- // TODO: need to validate it's impossible to cause a backend to be garbage
- // collected while outstanding requests exist.
- // might need some kind of failsafe here to leak memory and warn instead
- // of killing the object and crashing? or is that too late since we're in
- // __gc?
assert(STAILQ_EMPTY(&be->io_head));
mcmc_disconnect(be->client);
@@ -2797,7 +2799,7 @@ static int mcplib_backend_gc(lua_State *L) {
static int mcplib_backend(lua_State *L) {
luaL_checkstring(L, -4); // label for indexing backends.
- const char *ip = luaL_checkstring(L, -3); // FIXME: checklstring?
+ const char *ip = luaL_checkstring(L, -3);
const char *port = luaL_checkstring(L, -2);
double weight = luaL_checknumber(L, -1);
@@ -2892,6 +2894,12 @@ static int mcplib_pool_gc(lua_State *L) {
assert(p->refcount == 0);
pthread_mutex_destroy(&p->lock);
+ for (int x = 0; x < p->pool_size; x++) {
+ if (p->pool[x].ref) {
+ luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
+ }
+ }
+
return 0;
}
@@ -2902,9 +2910,10 @@ static int mcplib_pool(lua_State *L) {
luaL_checktype(L, 1, LUA_TTABLE);
int n = luaL_len(L, 1); // get length of array table
- mcp_pool_t *p = lua_newuserdatauv(L, sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n, 0);
- // FIXME: zero the memory? then __gc will fix up server references on
- // errors.
+ size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
+ mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
+ // Zero the memory before use, so we can realibly use __gc to clean up
+ memset(p, 0, plen);
p->pool_size = n;
p->refcount = 0;
pthread_mutex_init(&p->lock, NULL);
@@ -2915,14 +2924,12 @@ static int mcplib_pool(lua_State *L) {
lua_pushvalue(L, -1); // dupe self for reference.
p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- // TODO: ensure to increment refcounts for servers.
// remember lua arrays are 1 indexed.
for (int x = 1; x <= n; x++) {
mcp_pool_be_t *s = &p->pool[x-1];
lua_geti(L, 1, x); // get next server into the stack.
- // TODO: do we leak memory if we bail here?
- // the stack should clear, then release the userdata + etc?
- // - yes it should leak memory for the registry indexed items.
+ // If we bail here, the pool _gc() should handle releasing any backend
+ // references we made so far.
s->be = luaL_checkudata(L, -1, "mcp.backend");
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
}
@@ -2964,7 +2971,7 @@ static int mcplib_pool(lua_State *L) {
lua_setfield(L, -2, "addr");
lua_pushstring(L, be->port);
lua_setfield(L, -2, "port");
- // TODO: weight/etc?
+ // TODO (v2): weight/etc?
// set the backend table into the new pool table.
lua_rawseti(L, -2, x);
@@ -2984,10 +2991,12 @@ static int mcplib_pool(lua_State *L) {
return 0;
}
- // TODO: validate response arguments.
// -1 is lightuserdata ptr to the struct (which must be owned by the
// userdata), which is later used for internal calls.
struct proxy_hash_caller *phc;
+
+ luaL_checktype(L, -1, LUA_TUSERDATA);
+ luaL_checktype(L, -2, LUA_TUSERDATA);
phc = lua_touserdata(L, -1);
memcpy(&p->phc, phc, sizeof(*phc));
lua_pop(L, 1);
@@ -3035,7 +3044,10 @@ static int mcplib_pool_proxy_call(lua_State *L) {
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
// we have a fast path to the key/length.
- // FIXME: indicator for if request actually has a key token or not.
+ if (!rq->pr.keytoken) {
+ proxy_lua_error(L, "cannot route commands without key");
+ return 0;
+ }
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx);
@@ -3057,7 +3069,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
rq->be = p->pool[lookup-1].be;
}
- // now yield request, hash selector up.
+ // now yield request, pool up.
return lua_yield(L, 2);
}
@@ -3427,6 +3439,7 @@ static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen)
} else {
cl = cmdlen - 2; // FIXME: ensure cmdlen can never be < 2?
}
+ pr->keytoken = 0;
pr->has_space = false;
pr->parsed = cl + 1;
pr->request = command;
@@ -4025,8 +4038,8 @@ typedef struct mcp_await_s {
mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
} mcp_await_t;
-// local restable = mcp.await(request, hashselectors, num_wait)
-// NOTE: need to hold onto the hash selector objects since those hold backend
+// local restable = mcp.await(request, pools, num_wait)
+// NOTE: need to hold onto the pool objects since those hold backend
// references. Here we just keep a reference to the argument table.
static int mcplib_await(lua_State *L) {
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
@@ -4041,9 +4054,9 @@ static int mcplib_await(lua_State *L) {
wait_for = n;
}
}
- // TODO: bail if selector table was 0 len? else bad things can happen.
+ // TODO: bail if pool table was 0 len? else bad things can happen.
- // TODO: quickly loop table once and ensure they're all hash selectors?
+ // TODO: quickly loop table once and ensure they're all pools?
int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.