summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-11-22 22:52:08 -0800
committerdormando <dormando@rydia.net>2022-12-01 22:07:32 -0800
commit1ba5df8410e7ccc035390438e45b26c2d11ede5c (patch)
treeff0e1b06e15c411d8418a94e762cc7744fd12f07
parent683bb98a55ba19f69c4e2a60b9104ed2edc971c3 (diff)
downloadmemcached-1ba5df8410e7ccc035390438e45b26c2d11ede5c.tar.gz
proxy: add mcp.AWAIT_BACKGROUND
mcp.await(request, pools, 0, mcp.AWAIT_BACKGROUND) will, instead of waiting on any request to return, simply return an empty table as soon as the background requests are dispatched.
-rw-r--r--proxy.h2
-rw-r--r--proxy_await.c56
-rw-r--r--proxy_lua.c1
-rw-r--r--proxy_network.c14
-rw-r--r--t/startfile.lua9
5 files changed, 73 insertions, 9 deletions
diff --git a/proxy.h b/proxy.h
index f7647d6..66b4748 100644
--- a/proxy.h
+++ b/proxy.h
@@ -401,6 +401,7 @@ struct _io_pending_proxy_t {
bool ascii_multiget; // passed on from mcp_r_t
bool is_await; // are we an await object?
bool await_first; // are we the main route for an await object?
+ bool await_background; // dummy IO for backgrounded awaits
};
// Note: does *be have to be a sub-struct? how stable are userdata pointers?
@@ -444,6 +445,7 @@ enum mcp_await_e {
AWAIT_OK, // any non-error response
AWAIT_FIRST, // return the result from the first pool
AWAIT_FASTGOOD, // returns on first hit or majority non-error
+ AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched
};
int mcplib_await(lua_State *L);
int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref);
diff --git a/proxy_await.c b/proxy_await.c
index 1486068..5e379db 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -50,6 +50,7 @@ int mcplib_await(lua_State *L) {
case AWAIT_OK:
case AWAIT_FIRST:
case AWAIT_FASTGOOD:
+ case AWAIT_BACKGROUND:
break;
default:
proxy_lua_error(L, "invalid type argument tp mcp.await");
@@ -176,6 +177,40 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
return;
}
+static void mcp_queue_await_dummy_io(conn *c, lua_State *Lc, int await_ref) {
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+
+ io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
+ if (p == NULL) {
+ WSTAT_INCR(c, proxy_conn_oom, 1);
+ proxy_lua_error(Lc, "out of memory allocating from IO cache");
+ return;
+ }
+
+ // this is a re-cast structure, so assert that we never outsize it.
+ assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
+ memset(p, 0, sizeof(io_pending_proxy_t));
+ // set up back references.
+ p->io_queue_type = IO_QUEUE_PROXY;
+ p->thread = c->thread;
+ p->c = c;
+ p->resp = NULL;
+
+ // await specific
+ p->is_await = true;
+ p->await_ref = await_ref;
+ p->await_background = true;
+
+ // Dummy IO has no backend, and no request attached.
+
+ // All we need to do is link into the batch chain.
+ p->next = q->stack_ctx;
+ q->stack_ctx = p;
+ P_DEBUG("%s: queued\n", __func__);
+
+ return;
+}
+
// TODO (v2): need to get this code running under pcall().
// It looks like a bulk of this code can move into mcplib_await(),
// and then here post-yield we can add the conn and coro_ref to the right
@@ -224,6 +259,12 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
}
P_DEBUG("%s: argtable len: %d\n", __func__, n);
+ if (aw->type == AWAIT_BACKGROUND) {
+ mcp_queue_await_dummy_io(c, L, await_ref);
+ aw->pending++;
+ aw->wait_for = 0;
+ }
+
lua_pop(L, 1); // remove table key.
aw->resp = resp; // cuddle the current mc_resp to fill later
@@ -262,7 +303,13 @@ int mcplib_await_return(io_pending_proxy_t *p) {
// TODO (v2): for GOOD or OK cases, it might be better to return the
// last object as valid if there are otherwise zero valids?
// Think we just have to count valids...
- if (!aw->completed) {
+ if (aw->type == AWAIT_BACKGROUND) {
+ // in the background case, we never want to collect responses.
+ if (p->await_background) {
+ // found the dummy IO, complete and return conn to worker.
+ completing = true;
+ }
+ } else if (!aw->completed) {
valid = true; // always collect results unless we are completed.
if (aw->wait_for > 0) {
bool is_good = false;
@@ -298,6 +345,9 @@ int mcplib_await_return(io_pending_proxy_t *p) {
}
}
break;
+ case AWAIT_BACKGROUND:
+ // In background mode we don't wait for any response.
+ break;
}
if (is_good) {
@@ -335,7 +385,9 @@ int mcplib_await_return(io_pending_proxy_t *p) {
}
// lose our internal mcpres reference regardless.
- luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
+ if (p->mcpres_ref) {
+ luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
+ }
// our await_ref is shared, so we don't need to release it.
if (completing) {
diff --git a/proxy_lua.c b/proxy_lua.c
index 495c0de..2172733 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -866,6 +866,7 @@ static void proxy_register_defines(lua_State *L) {
X(AWAIT_OK);
X(AWAIT_FIRST);
X(AWAIT_FASTGOOD);
+ X(AWAIT_BACKGROUND);
CMD_FIELDS
#undef X
}
diff --git a/proxy_network.c b/proxy_network.c
index c2e8555..9d885e6 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -63,15 +63,23 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
while (!STAILQ_EMPTY(&head)) {
io_pending_proxy_t *io = STAILQ_FIRST(&head);
io->flushed = false;
- mcp_backend_t *be = io->backend;
- // So the backend can retrieve its event base.
- be->event_thread = t;
// _no_ mutex on backends. they are owned by the event thread.
STAILQ_REMOVE_HEAD(&head, io_next);
// paranoia about moving items between lists.
io->io_next.stqe_next = NULL;
+ // Need to check on await's before looking at backends, in case it
+ // doesn't have one.
+ // Here we're letting an await resume without waiting on the network.
+ if (io->await_background) {
+ return_io_pending((io_pending_t *)io);
+ continue;
+ }
+
+ mcp_backend_t *be = io->backend;
+ // So the backend can retrieve its event base.
+ be->event_thread = t;
if (be->bad) {
P_DEBUG("%s: fast failing request to bad backend\n", __func__);
io->client_resp->status = MCMC_ERR;
diff --git a/t/startfile.lua b/t/startfile.lua
index 9f16434..96a864b 100644
--- a/t/startfile.lua
+++ b/t/startfile.lua
@@ -212,10 +212,11 @@ function setinvalidate_factory(zones, local_zone)
-- example of new request from existing request
-- note this isn't trimming the key so it'll make a weird one.
-- local dr = new_req("set /bar/" .. r:key() .. " 0 0 " .. r:token(5) .. "\r\n", r)
- for _, zone in pairs(far_zones) do
- -- NOTE: can check/do things on the specific response here.
- zone(dr)
- end
+ -- AWAIT_BACKGROUND allows us to immediately resume processing, executing the
+ -- delete requests in the background.
+ mcp.await(dr, far_zones, 0, mcp.AWAIT_BACKGROUND)
+ --mcp.await(dr, far_zones, 0)
+ mcp.log_req(r, res, "setinvalidate") -- time against the original request, since we have no result.
end
-- use original response for client, not DELETE's response.
-- else client won't understand.