diff options
author | dormando <dormando@rydia.net> | 2022-11-22 22:52:08 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-12-01 22:07:32 -0800 |
commit | 1ba5df8410e7ccc035390438e45b26c2d11ede5c (patch) | |
tree | ff0e1b06e15c411d8418a94e762cc7744fd12f07 | |
parent | 683bb98a55ba19f69c4e2a60b9104ed2edc971c3 (diff) | |
download | memcached-1ba5df8410e7ccc035390438e45b26c2d11ede5c.tar.gz |
proxy: add mcp.AWAIT_BACKGROUND
mcp.await(request, pools, 0, mcp.AWAIT_BACKGROUND) will, instead of
waiting on any request to return, simply return an empty table as soon
as the background requests are dispatched.
-rw-r--r-- | proxy.h | 2 | ||||
-rw-r--r-- | proxy_await.c | 56 | ||||
-rw-r--r-- | proxy_lua.c | 1 | ||||
-rw-r--r-- | proxy_network.c | 14 | ||||
-rw-r--r-- | t/startfile.lua | 9 |
5 files changed, 73 insertions, 9 deletions
@@ -401,6 +401,7 @@ struct _io_pending_proxy_t { bool ascii_multiget; // passed on from mcp_r_t bool is_await; // are we an await object? bool await_first; // are we the main route for an await object? + bool await_background; // dummy IO for backgrounded awaits }; // Note: does *be have to be a sub-struct? how stable are userdata pointers? @@ -444,6 +445,7 @@ enum mcp_await_e { AWAIT_OK, // any non-error response AWAIT_FIRST, // return the result from the first pool AWAIT_FASTGOOD, // returns on first hit or majority non-error + AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched }; int mcplib_await(lua_State *L); int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref); diff --git a/proxy_await.c b/proxy_await.c index 1486068..5e379db 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -50,6 +50,7 @@ int mcplib_await(lua_State *L) { case AWAIT_OK: case AWAIT_FIRST: case AWAIT_FASTGOOD: + case AWAIT_BACKGROUND: break; default: proxy_lua_error(L, "invalid type argument tp mcp.await"); @@ -176,6 +177,40 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw return; } +static void mcp_queue_await_dummy_io(conn *c, lua_State *Lc, int await_ref) { + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); + + io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache); + if (p == NULL) { + WSTAT_INCR(c, proxy_conn_oom, 1); + proxy_lua_error(Lc, "out of memory allocating from IO cache"); + return; + } + + // this is a re-cast structure, so assert that we never outsize it. + assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t)); + memset(p, 0, sizeof(io_pending_proxy_t)); + // set up back references. + p->io_queue_type = IO_QUEUE_PROXY; + p->thread = c->thread; + p->c = c; + p->resp = NULL; + + // await specific + p->is_await = true; + p->await_ref = await_ref; + p->await_background = true; + + // Dummy IO has no backend, and no request attached. + + // All we need to do is link into the batch chain. + p->next = q->stack_ctx; + q->stack_ctx = p; + P_DEBUG("%s: queued\n", __func__); + + return; +} + // TODO (v2): need to get this code running under pcall(). // It looks like a bulk of this code can move into mcplib_await(), // and then here post-yield we can add the conn and coro_ref to the right @@ -224,6 +259,12 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) { } P_DEBUG("%s: argtable len: %d\n", __func__, n); + if (aw->type == AWAIT_BACKGROUND) { + mcp_queue_await_dummy_io(c, L, await_ref); + aw->pending++; + aw->wait_for = 0; + } + lua_pop(L, 1); // remove table key. aw->resp = resp; // cuddle the current mc_resp to fill later @@ -262,7 +303,13 @@ int mcplib_await_return(io_pending_proxy_t *p) { // TODO (v2): for GOOD or OK cases, it might be better to return the // last object as valid if there are otherwise zero valids? // Think we just have to count valids... - if (!aw->completed) { + if (aw->type == AWAIT_BACKGROUND) { + // in the background case, we never want to collect responses. + if (p->await_background) { + // found the dummy IO, complete and return conn to worker. + completing = true; + } + } else if (!aw->completed) { valid = true; // always collect results unless we are completed. if (aw->wait_for > 0) { bool is_good = false; @@ -298,6 +345,9 @@ int mcplib_await_return(io_pending_proxy_t *p) { } } break; + case AWAIT_BACKGROUND: + // In background mode we don't wait for any response. + break; } if (is_good) { @@ -335,7 +385,9 @@ int mcplib_await_return(io_pending_proxy_t *p) { } // lose our internal mcpres reference regardless. - luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); + if (p->mcpres_ref) { + luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); + } // our await_ref is shared, so we don't need to release it. if (completing) { diff --git a/proxy_lua.c b/proxy_lua.c index 495c0de..2172733 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -866,6 +866,7 @@ static void proxy_register_defines(lua_State *L) { X(AWAIT_OK); X(AWAIT_FIRST); X(AWAIT_FASTGOOD); + X(AWAIT_BACKGROUND); CMD_FIELDS #undef X } diff --git a/proxy_network.c b/proxy_network.c index c2e8555..9d885e6 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -63,15 +63,23 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { while (!STAILQ_EMPTY(&head)) { io_pending_proxy_t *io = STAILQ_FIRST(&head); io->flushed = false; - mcp_backend_t *be = io->backend; - // So the backend can retrieve its event base. - be->event_thread = t; // _no_ mutex on backends. they are owned by the event thread. STAILQ_REMOVE_HEAD(&head, io_next); // paranoia about moving items between lists. io->io_next.stqe_next = NULL; + // Need to check on await's before looking at backends, in case it + // doesn't have one. + // Here we're letting an await resume without waiting on the network. + if (io->await_background) { + return_io_pending((io_pending_t *)io); + continue; + } + + mcp_backend_t *be = io->backend; + // So the backend can retrieve its event base. + be->event_thread = t; if (be->bad) { P_DEBUG("%s: fast failing request to bad backend\n", __func__); io->client_resp->status = MCMC_ERR; diff --git a/t/startfile.lua b/t/startfile.lua index 9f16434..96a864b 100644 --- a/t/startfile.lua +++ b/t/startfile.lua @@ -212,10 +212,11 @@ function setinvalidate_factory(zones, local_zone) -- example of new request from existing request -- note this isn't trimming the key so it'll make a weird one. -- local dr = new_req("set /bar/" .. r:key() .. " 0 0 " .. r:token(5) .. "\r\n", r) - for _, zone in pairs(far_zones) do - -- NOTE: can check/do things on the specific response here. - zone(dr) - end + -- AWAIT_BACKGROUND allows us to immediately resume processing, executing the + -- delete requests in the background. + mcp.await(dr, far_zones, 0, mcp.AWAIT_BACKGROUND) + --mcp.await(dr, far_zones, 0) + mcp.log_req(r, res, "setinvalidate") -- time against the original request, since we have no result. end -- use original response for client, not DELETE's response. -- else client won't understand. |