diff options
author | dormando <dormando@rydia.net> | 2022-03-02 19:30:26 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-03-02 19:32:20 -0800 |
commit | 1d825ef0a539e03db69984a89a6d47ee5d4d27ee (patch) | |
tree | 8f361189f9fee0a86a33a0c5f820b99cba4b8f76 | |
parent | 31ccfc19fcbe15a5f40c559483c4c95082781ece (diff) | |
download | memcached-1d825ef0a539e03db69984a89a6d47ee5d4d27ee.tar.gz |
proxy: allow await() to be called recursively
previously mcp.await() only worked if it was called before any other
dispatches.
also fixes a bug if the supplied pool table was key=value instead of an
array-type table.
-rw-r--r-- | proto_proxy.c | 28 | ||||
-rw-r--r-- | proxy.h | 2 | ||||
-rw-r--r-- | proxy_await.c | 22 | ||||
-rw-r--r-- | t/startfile.lua | 10 |
4 files changed, 40 insertions, 22 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index f276fe1..6c028f4 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -228,10 +228,15 @@ void proxy_submit_cb(io_queue_t *q) { while (p) { // insert into tail so head is oldest request. STAILQ_INSERT_TAIL(&head, p, io_next); - if (!p->is_await) { + if (p->is_await) { + // need to not count await objects multiple times. + if (p->await_first) { + q->count++; + } // funny workaround: awaiting IOP's don't count toward // resuming a connection, only the completion of the await // condition. + } else { q->count++; } @@ -557,13 +562,20 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con // TODO (v2): try harder to validate; but we have so few yield cases // that I'm going to shortcut this here. A single yielded result // means it's probably an await(), so attempt to process this. - // FIXME (v2): if p, do we need to free it up from the resp? - // resp should not have an IOP I think... - assert(p == NULL); - // coroutine object sitting on the _main_ VM right now, so we grab - // the reference from there, which also pops it. - int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); - mcplib_await_run(c, Lc, coro_ref); + if (p != NULL) { + int coro_ref = p->coro_ref; + mc_resp *resp = p->resp; + assert((void *)p == (void *)resp->io_pending); + resp->io_pending = NULL; + c = p->c; + do_cache_free(c->thread->io_cache, p); + mcplib_await_run(c, resp, Lc, coro_ref); + } else { + // coroutine object sitting on the _main_ VM right now, so we grab + // the reference from there, which also pops it. + int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); + mcplib_await_run(c, c->resp, Lc, coro_ref); + } } else { // need to remove and free the io_pending, since c->resp owns it. // so we call mcp_queue_io() again and let it override the @@ -440,7 +440,7 @@ enum mcp_await_e { AWAIT_FIRST, // return the result from the first pool }; int mcplib_await(lua_State *L); -int mcplib_await_run(conn *c, lua_State *L, int coro_ref); +int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref); int mcplib_await_return(io_pending_proxy_t *p); // user stats interface diff --git a/proxy_await.c b/proxy_await.c index c504b9a..8c277ea 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -26,10 +26,17 @@ typedef struct mcp_await_s { int mcplib_await(lua_State *L) { mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); luaL_checktype(L, 2, LUA_TTABLE); - int n = luaL_len(L, 2); // length of hash selector table + int n = 0; // length of table of pools int wait_for = 0; // 0 means wait for all responses enum mcp_await_e type = AWAIT_GOOD; + lua_pushnil(L); // init table key + while (lua_next(L, 2) != 0) { + luaL_checkudata(L, -1, "mcp.pool_proxy"); + lua_pop(L, 1); // remove value, keep key. + n++; + } + if (n <= 0) { proxy_lua_error(L, "mcp.await arguments must have at least one pool"); } @@ -76,7 +83,7 @@ int mcplib_await(lua_State *L) { aw->rq = rq; aw->req_ref = req_ref; aw->type = type; - P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n); + P_DEBUG("%s: about to yield [len: %d]\n", __func__, n); return lua_yield(L, 1); } @@ -175,7 +182,7 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw // It looks like a bulk of this code can move into mcplib_await(), // and then here post-yield we can add the conn and coro_ref to the right // places. Else these errors currently crash the daemon. -int mcplib_await_run(conn *c, lua_State *L, int coro_ref) { +int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) { P_DEBUG("%s: start\n", __func__); mcp_await_t *aw = lua_touserdata(L, -1); int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped. @@ -217,14 +224,9 @@ int mcplib_await_run(conn *c, lua_State *L, int coro_ref) { } lua_pop(L, 1); // remove table key. - aw->resp = c->resp; // cuddle the current mc_resp to fill later - - // we count the await as the "response pending" since it covers a single - // response object. the sub-IO's don't count toward the redispatch of *c - io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); - q->count++; + aw->resp = resp; // cuddle the current mc_resp to fill later - P_DEBUG("%s\n", __func__); + P_DEBUG("%s: end\n", __func__); return 0; } diff --git a/t/startfile.lua b/t/startfile.lua index 37672e0..25c6699 100644 --- a/t/startfile.lua +++ b/t/startfile.lua @@ -131,12 +131,16 @@ function failover_factory(zones, local_zone) if res:hit() == false then -- example for mcp.log... Don't do this though :) mcp.log("failed to find " .. r:key() .. " in zone: " .. local_zone) - for _, zone in pairs(far_zones) do - res = zone(r) + --for _, zone in pairs(far_zones) do + -- res = zone(r) + local restable = mcp.await(r, far_zones, 1) + for _, res in pairs(restable) do if res:hit() then - break + --break + return res end end + return restable[1] end return res -- send result back to client end |