diff options
author | dormando <dormando@rydia.net> | 2023-01-12 12:15:23 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-01-12 13:50:55 -0800 |
commit | 6537cfe41ead9c81dd3e9c9f5d3929a050692cbd (patch) | |
tree | 021699f628b831dee45800ba913ec53ce88a8cf1 | |
parent | 0e87c6dd86d8a50ddd56fbce445fb18e498e7f5f (diff) | |
download | memcached-6537cfe41ead9c81dd3e9c9f5d3929a050692cbd.tar.gz |
proxy: clean logic around lua yielding
We were duck typing the response code for a coroutine yield before. It
would also pile random logic for overriding IO's in certain cases. This
now makes everything explicit and more clear.
-rw-r--r-- | proto_proxy.c | 82 | ||||
-rw-r--r-- | proxy.h | 5 | ||||
-rw-r--r-- | proxy_await.c | 3 | ||||
-rw-r--r-- | proxy_lua.c | 3 |
4 files changed, 46 insertions, 47 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 01b368d..7cea462 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -14,7 +14,6 @@ #define PROCESS_NORMAL false static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget); static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc); -static void proxy_out_errstring(mc_resp *resp, const char *str); /******** EXTERNAL FUNCTIONS ******/ // functions starting with _ are breakouts for the public functions. @@ -469,7 +468,7 @@ void proxy_lua_ferror(lua_State *L, const char *fmt, ...) { } // Need a custom function so we can prefix lua strings easily. -static void proxy_out_errstring(mc_resp *resp, const char *str) { +void proxy_out_errstring(mc_resp *resp, const char *str) { size_t len; const static char error_prefix[] = "SERVER_ERROR "; const static int error_prefix_len = sizeof(error_prefix) - 1; @@ -580,52 +579,45 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con } else { proxy_out_errstring(resp, "bad response"); } + } else if (cores == LUA_YIELD) { - if (nresults == 1) { - // TODO (v2): try harder to validate; but we have so few yield cases - // that I'm going to shortcut this here. A single yielded result - // means it's probably an await(), so attempt to process this. - if (p != NULL) { - int coro_ref = p->coro_ref; - mc_resp *resp = p->resp; - assert((void *)p == (void *)resp->io_pending); - resp->io_pending = NULL; - c = p->c; - do_cache_free(c->thread->io_cache, p); - mcplib_await_run(c, resp, Lc, coro_ref); - } else { - // coroutine object sitting on the _main_ VM right now, so we grab - // the reference from there, which also pops it. - int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); - mcplib_await_run(c, c->resp, Lc, coro_ref); - } + int coro_ref = 0; + int yield_type = lua_tointeger(Lc, -1); + assert(yield_type != 0); + lua_pop(Lc, 1); + + // need to remove and free the io_pending, since c->resp owns it. + // so we call mcp_queue_io() again and let it override the + // mc_resp's io_pending object. + // + // p is not null only when being called from proxy_return_cb(), + // a pending IO is returning to resume. + if (p != NULL) { + coro_ref = p->coro_ref; + assert((void *)p == (void *)resp->io_pending); + resp->io_pending = NULL; + c = p->c; + // *p is now dead. + do_cache_free(c->thread->io_cache, p); } else { - // need to remove and free the io_pending, since c->resp owns it. - // so we call mcp_queue_io() again and let it override the - // mc_resp's io_pending object. - - int coro_ref = 0; - mc_resp *resp; - if (p != NULL) { - coro_ref = p->coro_ref; - resp = p->resp; - c = p->c; - do_cache_free(p->c->thread->io_cache, p); - // *p is now dead. - } else { - // yielding from a top level call to the coroutine, - // so we need to grab a reference to the coroutine thread. - // TODO (v2): make this more explicit? - // we only need to get the reference here, and error conditions - // should instead drop it, but now it's not obvious to users that - // we're reaching back into the main thread's stack. - assert(c != NULL); - coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); - resp = c->resp; - } - // TODO (v2): c only used for cache alloc? push the above into the func? - mcp_queue_io(c, resp, coro_ref, Lc); + // coroutine object sitting on the _main_ VM right now, so we grab + // the reference from there, which also pops it. + assert(c != NULL); + coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); } + + switch (yield_type) { + case MCP_YIELD_AWAIT: + mcplib_await_run(c, resp, Lc, coro_ref); + break; + case MCP_YIELD_POOL: + // TODO (v2): c only used for cache alloc? + mcp_queue_io(c, resp, coro_ref, Lc); + break; + default: + abort(); + } + } else { WSTAT_DECR(c, proxy_req_active, 1); P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1)); @@ -80,6 +80,10 @@ #define MCP_BACKEND_UPVALUE 3 #define MCP_CONTEXT_UPVALUE 4 +#define MCP_YIELD_POOL 1 +#define MCP_YIELD_AWAIT 2 +#define MCP_YIELD_LOCAL 3 + // all possible commands. #define CMD_FIELDS \ X(CMD_MG) \ @@ -507,6 +511,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p); int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len); void proxy_lua_error(lua_State *L, const char *s); void proxy_lua_ferror(lua_State *L, const char *fmt, ...); +void proxy_out_errstring(mc_resp *resp, const char *str); int _start_proxy_config_threads(proxy_ctx_t *ctx); int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr); diff --git a/proxy_await.c b/proxy_await.c index 582c3d5..3dfd16b 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -89,7 +89,8 @@ int mcplib_await(lua_State *L) { aw->type = type; P_DEBUG("%s: about to yield [len: %d]\n", __func__, n); - return lua_yield(L, 1); + lua_pushinteger(L, MCP_YIELD_AWAIT); + return lua_yield(L, 2); } static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) { diff --git a/proxy_lua.c b/proxy_lua.c index 8b1e4d8..91b0840 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -596,7 +596,8 @@ static int mcplib_pool_proxy_call(lua_State *L) { rq->be = mcplib_pool_proxy_call_helper(L, p, key, len); // now yield request, pool up. - return lua_yield(L, 2); + lua_pushinteger(L, MCP_YIELD_POOL); + return lua_yield(L, 3); } static int mcplib_tcp_keepalive(lua_State *L) { |