summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-12 12:15:23 -0800
committerdormando <dormando@rydia.net>2023-01-12 13:50:55 -0800
commit6537cfe41ead9c81dd3e9c9f5d3929a050692cbd (patch)
tree021699f628b831dee45800ba913ec53ce88a8cf1
parent0e87c6dd86d8a50ddd56fbce445fb18e498e7f5f (diff)
downloadmemcached-6537cfe41ead9c81dd3e9c9f5d3929a050692cbd.tar.gz
proxy: clean logic around lua yielding
We were duck typing the response code for a coroutine yield before. It would also pile random logic for overriding IO's in certain cases. This now makes everything explicit and more clear.
-rw-r--r--proto_proxy.c82
-rw-r--r--proxy.h5
-rw-r--r--proxy_await.c3
-rw-r--r--proxy_lua.c3
4 files changed, 46 insertions, 47 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 01b368d..7cea462 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -14,7 +14,6 @@
#define PROCESS_NORMAL false
static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
-static void proxy_out_errstring(mc_resp *resp, const char *str);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
@@ -469,7 +468,7 @@ void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
}
// Need a custom function so we can prefix lua strings easily.
-static void proxy_out_errstring(mc_resp *resp, const char *str) {
+void proxy_out_errstring(mc_resp *resp, const char *str) {
size_t len;
const static char error_prefix[] = "SERVER_ERROR ";
const static int error_prefix_len = sizeof(error_prefix) - 1;
@@ -580,52 +579,45 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else {
proxy_out_errstring(resp, "bad response");
}
+
} else if (cores == LUA_YIELD) {
- if (nresults == 1) {
- // TODO (v2): try harder to validate; but we have so few yield cases
- // that I'm going to shortcut this here. A single yielded result
- // means it's probably an await(), so attempt to process this.
- if (p != NULL) {
- int coro_ref = p->coro_ref;
- mc_resp *resp = p->resp;
- assert((void *)p == (void *)resp->io_pending);
- resp->io_pending = NULL;
- c = p->c;
- do_cache_free(c->thread->io_cache, p);
- mcplib_await_run(c, resp, Lc, coro_ref);
- } else {
- // coroutine object sitting on the _main_ VM right now, so we grab
- // the reference from there, which also pops it.
- int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
- mcplib_await_run(c, c->resp, Lc, coro_ref);
- }
+ int coro_ref = 0;
+ int yield_type = lua_tointeger(Lc, -1);
+ assert(yield_type != 0);
+ lua_pop(Lc, 1);
+
+ // need to remove and free the io_pending, since c->resp owns it.
+ // so we call mcp_queue_io() again and let it override the
+ // mc_resp's io_pending object.
+ //
+ // p is not null only when being called from proxy_return_cb(),
+ // a pending IO is returning to resume.
+ if (p != NULL) {
+ coro_ref = p->coro_ref;
+ assert((void *)p == (void *)resp->io_pending);
+ resp->io_pending = NULL;
+ c = p->c;
+ // *p is now dead.
+ do_cache_free(c->thread->io_cache, p);
} else {
- // need to remove and free the io_pending, since c->resp owns it.
- // so we call mcp_queue_io() again and let it override the
- // mc_resp's io_pending object.
-
- int coro_ref = 0;
- mc_resp *resp;
- if (p != NULL) {
- coro_ref = p->coro_ref;
- resp = p->resp;
- c = p->c;
- do_cache_free(p->c->thread->io_cache, p);
- // *p is now dead.
- } else {
- // yielding from a top level call to the coroutine,
- // so we need to grab a reference to the coroutine thread.
- // TODO (v2): make this more explicit?
- // we only need to get the reference here, and error conditions
- // should instead drop it, but now it's not obvious to users that
- // we're reaching back into the main thread's stack.
- assert(c != NULL);
- coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
- resp = c->resp;
- }
- // TODO (v2): c only used for cache alloc? push the above into the func?
- mcp_queue_io(c, resp, coro_ref, Lc);
+ // coroutine object sitting on the _main_ VM right now, so we grab
+ // the reference from there, which also pops it.
+ assert(c != NULL);
+ coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
}
+
+ switch (yield_type) {
+ case MCP_YIELD_AWAIT:
+ mcplib_await_run(c, resp, Lc, coro_ref);
+ break;
+ case MCP_YIELD_POOL:
+ // TODO (v2): c only used for cache alloc?
+ mcp_queue_io(c, resp, coro_ref, Lc);
+ break;
+ default:
+ abort();
+ }
+
} else {
WSTAT_DECR(c, proxy_req_active, 1);
P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
diff --git a/proxy.h b/proxy.h
index dce74f5..29eed86 100644
--- a/proxy.h
+++ b/proxy.h
@@ -80,6 +80,10 @@
#define MCP_BACKEND_UPVALUE 3
#define MCP_CONTEXT_UPVALUE 4
+#define MCP_YIELD_POOL 1
+#define MCP_YIELD_AWAIT 2
+#define MCP_YIELD_LOCAL 3
+
// all possible commands.
#define CMD_FIELDS \
X(CMD_MG) \
@@ -507,6 +511,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len);
void proxy_lua_error(lua_State *L, const char *s);
void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
+void proxy_out_errstring(mc_resp *resp, const char *str);
int _start_proxy_config_threads(proxy_ctx_t *ctx);
int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr);
diff --git a/proxy_await.c b/proxy_await.c
index 582c3d5..3dfd16b 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -89,7 +89,8 @@ int mcplib_await(lua_State *L) {
aw->type = type;
P_DEBUG("%s: about to yield [len: %d]\n", __func__, n);
- return lua_yield(L, 1);
+ lua_pushinteger(L, MCP_YIELD_AWAIT);
+ return lua_yield(L, 2);
}
static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
diff --git a/proxy_lua.c b/proxy_lua.c
index 8b1e4d8..91b0840 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -596,7 +596,8 @@ static int mcplib_pool_proxy_call(lua_State *L) {
rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
// now yield request, pool up.
- return lua_yield(L, 2);
+ lua_pushinteger(L, MCP_YIELD_POOL);
+ return lua_yield(L, 3);
}
static int mcplib_tcp_keepalive(lua_State *L) {