summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-16 21:20:13 -0800
committerdormando <dormando@rydia.net>2022-02-17 10:12:08 -0800
commit774630ea7a8f5a60f28af076df838d2f12eadc37 (patch)
treeee74c92a83660e61800d528b7b691885661c8764 /proto_proxy.c
parent90b9e5c6fb63ae2b252ee32c9639acb94b1044c1 (diff)
downloadmemcached-774630ea7a8f5a60f28af076df838d2f12eadc37.tar.gz
proxy: await improvements
Adds MCP_AWAIT_* options as 4th argument. If waiting for a subset of requests instead of all requests, the 4th argument can specify what type of responses are considered valid. Also now includes _all_ observed results in the result table, when a specific number of "good" results are requested. The caller is supposed to filter for its intent. This allows observation of error codes if all requested results fail or it otherwise fails to meet its result conditions (via AWAIT_*)
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c130
1 files changed, 101 insertions, 29 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 7dc3e44..7afa587 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -406,6 +406,7 @@ struct _io_pending_proxy_t {
bool flushed; // whether we've fully written this request to a backend.
bool ascii_multiget; // passed on from mcp_r_t
bool is_await; // are we an await object?
+ bool await_first; // are we the main route for an await object?
};
// Note: does *be have to be a sub-struct? how stable are userdata pointers?
@@ -3402,18 +3403,6 @@ static int mcplib_attach(lua_State *L) {
return 0;
}
-static void proxy_register_defines(lua_State *L) {
-#define X(x) \
- lua_pushinteger(L, x); \
- lua_setfield(L, -2, #x);
-
- X(P_OK);
- X(CMD_ANY);
- X(CMD_ANY_STORAGE);
- CMD_FIELDS
-#undef X
-}
-
/*** REQUEST PARSER AND OBJECT ***/
#define PARSER_MAXLEN USHRT_MAX-1
@@ -4252,6 +4241,13 @@ static int mcplib_stat(lua_State *L) {
/*** START lua await() object interface ***/
+enum mcp_await_e {
+ AWAIT_GOOD = 0, // looks for OK + NOT MISS
+ AWAIT_ANY, // any response, including errors,
+ AWAIT_OK, // any non-error response
+ AWAIT_FIRST, // return the result from the first pool
+};
+
typedef struct mcp_await_s {
int pending;
int wait_for;
@@ -4259,11 +4255,17 @@ typedef struct mcp_await_s {
int argtable_ref; // need to hold refs to any potential hash selectors
int restable_ref; // table of result objects
int coro_ref; // reference to parent coroutine
+ enum mcp_await_e type;
bool completed; // have we completed the parent coroutine or not
mcp_request_t *rq;
mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
} mcp_await_t;
+// TODO (v2): mcplib_await_gc()
+// - needs to handle cases where an await is created, but a rare error happens
+// before it completes and the coroutine is killed. must check and free its
+// references.
+
// local restable = mcp.await(request, pools, num_wait)
// NOTE: need to hold onto the pool objects since those hold backend
// references. Here we just keep a reference to the argument table.
@@ -4272,6 +4274,7 @@ static int mcplib_await(lua_State *L) {
luaL_checktype(L, 2, LUA_TTABLE);
int n = luaL_len(L, 2); // length of hash selector table
int wait_for = 0; // 0 means wait for all responses
+ enum mcp_await_e type = AWAIT_GOOD;
if (n <= 0) {
proxy_lua_error(L, "mcp.await arguments must have at least one pool");
@@ -4284,25 +4287,47 @@ static int mcplib_await(lua_State *L) {
}
}
+ if (lua_isnumber(L, 4)) {
+ type = lua_tointeger(L, 4);
+ switch (type) {
+ case AWAIT_GOOD:
+ case AWAIT_ANY:
+ case AWAIT_OK:
+ case AWAIT_FIRST:
+ break;
+ default:
+ proxy_lua_error(L, "invalid type argument tp mcp.await");
+ }
+ }
+
+ // FIRST is only looking for one valid request.
+ if (type == AWAIT_FIRST) {
+ wait_for = 1;
+ }
+
// TODO (v2): quickly loop table once and ensure they're all pools?
+ // TODO (v2) in case of newuserdatauv throwing an error, we need to grab
+ // these references after allocating *aw else can leak memory.
int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.
// stack will be only the await object now
+ // allocate before grabbing references so an error won't cause leaks.
mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0);
memset(aw, 0, sizeof(mcp_await_t));
+
aw->wait_for = wait_for;
aw->pending = n;
aw->argtable_ref = argtable_ref;
aw->rq = rq;
aw->req_ref = req_ref;
+ aw->type = type;
P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
- //dump_stack(L);
return lua_yield(L, 1);
}
-static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref) {
+static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
mcp_backend_t *be = rq->be;
@@ -4310,14 +4335,9 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// Then we push a response object, which we'll re-use later.
// reserve one uservalue for a lua-supplied response.
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
- if (r == NULL) {
- proxy_lua_error(Lc, "out of memory allocating response");
- return;
- }
memset(r, 0, sizeof(mcp_resp_t));
- r->buf = NULL;
- r->blen = 0;
r->start = rq->start;
+
int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
for (x = 0; x < end; x++) {
@@ -4361,6 +4381,7 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// await specific
p->is_await = true;
p->await_ref = await_ref;
+ p->await_first = await_first;
// The direct backend object. await object is holding reference
p->backend = be;
@@ -4375,6 +4396,10 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
return;
}
+// TODO (v2): need to get this code running under pcall().
+// It looks like a bulk of this code can move into mcplib_await(),
+// and then here post-yield we can add the conn and coro_ref to the right
+// places. Else these errors currently crash the daemon.
static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
P_DEBUG("%s: start\n", __func__);
mcp_await_t *aw = lua_touserdata(L, -1);
@@ -4393,6 +4418,7 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
// prepare the request key
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
+ bool await_first = true;
// loop arg table and run each hash selector
lua_pushnil(L); // -> 3
while (lua_next(L, 1) != 0) {
@@ -4408,7 +4434,8 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
// mcp_queue call. Could be a local variable and an argument too.
rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);
- mcp_queue_await_io(c, L, rq, await_ref);
+ mcp_queue_await_io(c, L, rq, await_ref, await_first);
+ await_first = false;
// pop value, keep key.
lua_pop(L, 1);
@@ -4423,17 +4450,20 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
q->count++;
P_DEBUG("%s\n", __func__);
- //dump_stack(L); // should be empty
return 0;
}
-//lua_rawseti(L, -2, x++);
+// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown
+// purposefully as of this writing, but it's still not safe. Either the code
+// can be restructured to use less lua (which I think is better long term
+// anyway) or it can be pushed behind a cfunc pcall so we don't crash the
+// daemon if something bad happens.
static int mcplib_await_return(io_pending_proxy_t *p) {
mcp_await_t *aw;
lua_State *L = p->thread->L; // use the main VM coroutine for work
bool cleanup = false;
- bool valid = false;
+ bool valid = false; // is response valid to add to the result table.
bool completing = false;
// TODO (v2): just push the await ptr into *p?
@@ -4450,18 +4480,44 @@ static int mcplib_await_return(io_pending_proxy_t *p) {
// if success and wait_for is *now* 0, we complete.
// add successful response to response table
// Also, if no wait_for, add response to response table
+ // TODO (v2): for GOOD or OK cases, it might be better to return the
+ // last object as valid if there are otherwise zero valids?
+ // Think we just have to count valids...
if (!aw->completed) {
+ valid = true; // always collect results unless we are completed.
if (aw->wait_for > 0) {
- if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
- valid = true;
+ bool is_good = false;
+ switch (aw->type) {
+ case AWAIT_GOOD:
+ if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
+ is_good = true;
+ }
+ break;
+ case AWAIT_ANY:
+ is_good = true;
+ break;
+ case AWAIT_OK:
+ if (p->client_resp->status == MCMC_OK) {
+ is_good = true;
+ }
+ break;
+ case AWAIT_FIRST:
+ if (p->await_first) {
+ is_good = true;
+ } else {
+ // user only wants the first pool's result.
+ valid = false;
+ }
+ break;
+ }
+
+ if (is_good) {
+ aw->wait_for--;
}
- aw->wait_for--;
if (aw->wait_for == 0) {
completing = true;
}
- } else {
- valid = true;
}
}
@@ -4546,6 +4602,22 @@ static int mcplib_open_hash_xxhash(lua_State *L) {
/*** END xxhash module ***/
+static void proxy_register_defines(lua_State *L) {
+#define X(x) \
+ lua_pushinteger(L, x); \
+ lua_setfield(L, -2, #x);
+
+ X(P_OK);
+ X(CMD_ANY);
+ X(CMD_ANY_STORAGE);
+ X(AWAIT_GOOD);
+ X(AWAIT_ANY);
+ X(AWAIT_OK);
+ X(AWAIT_FIRST);
+ CMD_FIELDS
+#undef X
+}
+
// Creates and returns the top level "mcp" module
int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
lua_State *L = ctx;