summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-03-02 19:30:26 -0800
committerdormando <dormando@rydia.net>2022-03-02 19:32:20 -0800
commit1d825ef0a539e03db69984a89a6d47ee5d4d27ee (patch)
tree8f361189f9fee0a86a33a0c5f820b99cba4b8f76
parent31ccfc19fcbe15a5f40c559483c4c95082781ece (diff)
downloadmemcached-1d825ef0a539e03db69984a89a6d47ee5d4d27ee.tar.gz
proxy: allow await() to be called recursively
previously mcp.await() only worked if it was called before any other dispatches. also fixes a bug if the supplied pool table was key=value instead of an array-type table.
-rw-r--r--proto_proxy.c28
-rw-r--r--proxy.h2
-rw-r--r--proxy_await.c22
-rw-r--r--t/startfile.lua10
4 files changed, 40 insertions, 22 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index f276fe1..6c028f4 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -228,10 +228,15 @@ void proxy_submit_cb(io_queue_t *q) {
while (p) {
// insert into tail so head is oldest request.
STAILQ_INSERT_TAIL(&head, p, io_next);
- if (!p->is_await) {
+ if (p->is_await) {
+ // need to not count await objects multiple times.
+ if (p->await_first) {
+ q->count++;
+ }
// funny workaround: awaiting IOP's don't count toward
// resuming a connection, only the completion of the await
// condition.
+ } else {
q->count++;
}
@@ -557,13 +562,20 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// TODO (v2): try harder to validate; but we have so few yield cases
// that I'm going to shortcut this here. A single yielded result
// means it's probably an await(), so attempt to process this.
- // FIXME (v2): if p, do we need to free it up from the resp?
- // resp should not have an IOP I think...
- assert(p == NULL);
- // coroutine object sitting on the _main_ VM right now, so we grab
- // the reference from there, which also pops it.
- int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
- mcplib_await_run(c, Lc, coro_ref);
+ if (p != NULL) {
+ int coro_ref = p->coro_ref;
+ mc_resp *resp = p->resp;
+ assert((void *)p == (void *)resp->io_pending);
+ resp->io_pending = NULL;
+ c = p->c;
+ do_cache_free(c->thread->io_cache, p);
+ mcplib_await_run(c, resp, Lc, coro_ref);
+ } else {
+ // coroutine object sitting on the _main_ VM right now, so we grab
+ // the reference from there, which also pops it.
+ int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
+ mcplib_await_run(c, c->resp, Lc, coro_ref);
+ }
} else {
// need to remove and free the io_pending, since c->resp owns it.
// so we call mcp_queue_io() again and let it override the
diff --git a/proxy.h b/proxy.h
index fea6879..86b4aa9 100644
--- a/proxy.h
+++ b/proxy.h
@@ -440,7 +440,7 @@ enum mcp_await_e {
AWAIT_FIRST, // return the result from the first pool
};
int mcplib_await(lua_State *L);
-int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
+int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref);
int mcplib_await_return(io_pending_proxy_t *p);
// user stats interface
diff --git a/proxy_await.c b/proxy_await.c
index c504b9a..8c277ea 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -26,10 +26,17 @@ typedef struct mcp_await_s {
int mcplib_await(lua_State *L) {
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
luaL_checktype(L, 2, LUA_TTABLE);
- int n = luaL_len(L, 2); // length of hash selector table
+ int n = 0; // length of table of pools
int wait_for = 0; // 0 means wait for all responses
enum mcp_await_e type = AWAIT_GOOD;
+ lua_pushnil(L); // init table key
+ while (lua_next(L, 2) != 0) {
+ luaL_checkudata(L, -1, "mcp.pool_proxy");
+ lua_pop(L, 1); // remove value, keep key.
+ n++;
+ }
+
if (n <= 0) {
proxy_lua_error(L, "mcp.await arguments must have at least one pool");
}
@@ -76,7 +83,7 @@ int mcplib_await(lua_State *L) {
aw->rq = rq;
aw->req_ref = req_ref;
aw->type = type;
- P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
+ P_DEBUG("%s: about to yield [len: %d]\n", __func__, n);
return lua_yield(L, 1);
}
@@ -175,7 +182,7 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// It looks like a bulk of this code can move into mcplib_await(),
// and then here post-yield we can add the conn and coro_ref to the right
// places. Else these errors currently crash the daemon.
-int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
+int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
P_DEBUG("%s: start\n", __func__);
mcp_await_t *aw = lua_touserdata(L, -1);
int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped.
@@ -217,14 +224,9 @@ int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
}
lua_pop(L, 1); // remove table key.
- aw->resp = c->resp; // cuddle the current mc_resp to fill later
-
- // we count the await as the "response pending" since it covers a single
- // response object. the sub-IO's don't count toward the redispatch of *c
- io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
- q->count++;
+ aw->resp = resp; // cuddle the current mc_resp to fill later
- P_DEBUG("%s\n", __func__);
+ P_DEBUG("%s: end\n", __func__);
return 0;
}
diff --git a/t/startfile.lua b/t/startfile.lua
index 37672e0..25c6699 100644
--- a/t/startfile.lua
+++ b/t/startfile.lua
@@ -131,12 +131,16 @@ function failover_factory(zones, local_zone)
if res:hit() == false then
-- example for mcp.log... Don't do this though :)
mcp.log("failed to find " .. r:key() .. " in zone: " .. local_zone)
- for _, zone in pairs(far_zones) do
- res = zone(r)
+ --for _, zone in pairs(far_zones) do
+ -- res = zone(r)
+ local restable = mcp.await(r, far_zones, 1)
+ for _, res in pairs(restable) do
if res:hit() then
- break
+ --break
+ return res
end
end
+ return restable[1]
end
return res -- send result back to client
end