From b6fd865985dd8285bd963dfd429d7f475d54d77f Mon Sep 17 00:00:00 2001 From: dormando Date: Fri, 4 Feb 2022 15:36:15 -0800 Subject: proxy: more misc fixes - fixes potential memory leaks if an error is generated while creating a pool object. - misc comment updates and error handling. - avoid crash if attempting to route commands that don't have a key. --- proto_proxy.c | 69 +++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 28 deletions(-) (limited to 'proto_proxy.c') diff --git a/proto_proxy.c b/proto_proxy.c index a489a33..525fdb5 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -542,11 +542,8 @@ static void *_proxy_manager_thread(void *arg) { lua_State *L = ctx->proxy_state; mcp_pool_t *p; STAILQ_FOREACH(p, &head, next) { - // walk the hash selector backends and unref. - for (int x = 0; x < p->pool_size; x++) { - luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref); - } - // unref the phc ref. + // we let the pool object _gc() handle backend references. + luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref); // need to... unref self. // NOTE: double check if we really need to self-reference. @@ -554,7 +551,6 @@ static void *_proxy_manager_thread(void *arg) { // before lua garbage collects the object. other things hold a // reference to the object though. luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref); - // that's it? let it float? } pthread_mutex_unlock(&ctx->config_lock); @@ -2675,7 +2671,11 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) { lua_setmetatable(Lc, -2); io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache); - // FIXME: can this fail? Yes, I believe it can. need handling. + if (p == NULL) { + WSTAT_INCR(c, proxy_conn_oom, 1); + proxy_lua_error(Lc, "out of memory allocating from IO cache"); + return; + } // this is a re-cast structure, so assert that we never outsize it. assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t)); @@ -2779,14 +2779,16 @@ static int mcplib_response_gc(lua_State *L) { return 0; } +// NOTE: backends are global objects owned by pool objects. +// Each pool has a "proxy pool object" distributed to each worker VM. +// proxy pool objects are held at the same time as any request exists on a +// backend, in the coroutine stack during yield() +// To free a backend: All proxies for a pool are collected, then the central +// pool is collected, which releases backend references, which allows backend +// to be collected. static int mcplib_backend_gc(lua_State *L) { mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend"); - // TODO: need to validate it's impossible to cause a backend to be garbage - // collected while outstanding requests exist. - // might need some kind of failsafe here to leak memory and warn instead - // of killing the object and crashing? or is that too late since we're in - // __gc? assert(STAILQ_EMPTY(&be->io_head)); mcmc_disconnect(be->client); @@ -2797,7 +2799,7 @@ static int mcplib_backend_gc(lua_State *L) { static int mcplib_backend(lua_State *L) { luaL_checkstring(L, -4); // label for indexing backends. - const char *ip = luaL_checkstring(L, -3); // FIXME: checklstring? + const char *ip = luaL_checkstring(L, -3); const char *port = luaL_checkstring(L, -2); double weight = luaL_checknumber(L, -1); @@ -2892,6 +2894,12 @@ static int mcplib_pool_gc(lua_State *L) { assert(p->refcount == 0); pthread_mutex_destroy(&p->lock); + for (int x = 0; x < p->pool_size; x++) { + if (p->pool[x].ref) { + luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref); + } + } + return 0; } @@ -2902,9 +2910,10 @@ static int mcplib_pool(lua_State *L) { luaL_checktype(L, 1, LUA_TTABLE); int n = luaL_len(L, 1); // get length of array table - mcp_pool_t *p = lua_newuserdatauv(L, sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n, 0); - // FIXME: zero the memory? then __gc will fix up server references on - // errors. + size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n; + mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); + // Zero the memory before use, so we can realibly use __gc to clean up + memset(p, 0, plen); p->pool_size = n; p->refcount = 0; pthread_mutex_init(&p->lock, NULL); @@ -2915,14 +2924,12 @@ static int mcplib_pool(lua_State *L) { lua_pushvalue(L, -1); // dupe self for reference. p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); - // TODO: ensure to increment refcounts for servers. // remember lua arrays are 1 indexed. for (int x = 1; x <= n; x++) { mcp_pool_be_t *s = &p->pool[x-1]; lua_geti(L, 1, x); // get next server into the stack. - // TODO: do we leak memory if we bail here? - // the stack should clear, then release the userdata + etc? - // - yes it should leak memory for the registry indexed items. + // If we bail here, the pool _gc() should handle releasing any backend + // references we made so far. s->be = luaL_checkudata(L, -1, "mcp.backend"); s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. } @@ -2964,7 +2971,7 @@ static int mcplib_pool(lua_State *L) { lua_setfield(L, -2, "addr"); lua_pushstring(L, be->port); lua_setfield(L, -2, "port"); - // TODO: weight/etc? + // TODO (v2): weight/etc? // set the backend table into the new pool table. lua_rawseti(L, -2, x); @@ -2984,10 +2991,12 @@ static int mcplib_pool(lua_State *L) { return 0; } - // TODO: validate response arguments. // -1 is lightuserdata ptr to the struct (which must be owned by the // userdata), which is later used for internal calls. struct proxy_hash_caller *phc; + + luaL_checktype(L, -1, LUA_TUSERDATA); + luaL_checktype(L, -2, LUA_TUSERDATA); phc = lua_touserdata(L, -1); memcpy(&p->phc, phc, sizeof(*phc)); lua_pop(L, 1); @@ -3035,7 +3044,10 @@ static int mcplib_pool_proxy_call(lua_State *L) { mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); // we have a fast path to the key/length. - // FIXME: indicator for if request actually has a key token or not. + if (!rq->pr.keytoken) { + proxy_lua_error(L, "cannot route commands without key"); + return 0; + } const char *key = MCP_PARSER_KEY(rq->pr); size_t len = rq->pr.klen; uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx); @@ -3057,7 +3069,7 @@ static int mcplib_pool_proxy_call(lua_State *L) { rq->be = p->pool[lookup-1].be; } - // now yield request, hash selector up. + // now yield request, pool up. return lua_yield(L, 2); } @@ -3427,6 +3439,7 @@ static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) } else { cl = cmdlen - 2; // FIXME: ensure cmdlen can never be < 2? } + pr->keytoken = 0; pr->has_space = false; pr->parsed = cl + 1; pr->request = command; @@ -4025,8 +4038,8 @@ typedef struct mcp_await_s { mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop) } mcp_await_t; -// local restable = mcp.await(request, hashselectors, num_wait) -// NOTE: need to hold onto the hash selector objects since those hold backend +// local restable = mcp.await(request, pools, num_wait) +// NOTE: need to hold onto the pool objects since those hold backend // references. Here we just keep a reference to the argument table. static int mcplib_await(lua_State *L) { mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); @@ -4041,9 +4054,9 @@ static int mcplib_await(lua_State *L) { wait_for = n; } } - // TODO: bail if selector table was 0 len? else bad things can happen. + // TODO: bail if pool table was 0 len? else bad things can happen. - // TODO: quickly loop table once and ensure they're all hash selectors? + // TODO: quickly loop table once and ensure they're all pools? int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object. -- cgit v1.2.1