path: root/proxy_await.c
diff options
authordormando <>2022-02-18 15:19:09 -0800
committerdormando <>2022-02-18 16:13:52 -0800
commit34e0359d4de223d8cde4166f7d10ae352d7ebfdf (patch)
tree041a57edfb4bb3b58aa23498681295cb71789ee5 /proxy_await.c
parentd85c379d74d92f8e9bd7ccf1ca57520f485a24f0 (diff)
proxy: pull chunks into individual c files
now's a good time to at least shove functional subsections of code into their own files. Some further work to clearly separate the API's will help but looks not too terrible. Big bonus is getting the backend handling code away from the frontend handling code, which should make it easier to follow.
Diffstat (limited to 'proxy_await.c')
1 files changed, 343 insertions, 0 deletions
diff --git a/proxy_await.c b/proxy_await.c
new file mode 100644
index 0000000..f69e7df
--- /dev/null
+++ b/proxy_await.c
@@ -0,0 +1,343 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include "proxy.h"
+typedef struct mcp_await_s {
+ int pending;
+ int wait_for;
+ int req_ref;
+ int argtable_ref; // need to hold refs to any potential hash selectors
+ int restable_ref; // table of result objects
+ int coro_ref; // reference to parent coroutine
+ enum mcp_await_e type;
+ bool completed; // have we completed the parent coroutine or not
+ mcp_request_t *rq;
+ mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
+} mcp_await_t;
+// TODO (v2): mcplib_await_gc()
+// - needs to handle cases where an await is created, but a rare error happens
+// before it completes and the coroutine is killed. must check and free its
+// references.
+// local restable = mcp.await(request, pools, num_wait)
+// NOTE: need to hold onto the pool objects since those hold backend
+// references. Here we just keep a reference to the argument table.
+int mcplib_await(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
+ luaL_checktype(L, 2, LUA_TTABLE);
+ int n = luaL_len(L, 2); // length of hash selector table
+ int wait_for = 0; // 0 means wait for all responses
+ enum mcp_await_e type = AWAIT_GOOD;
+ if (n <= 0) {
+ proxy_lua_error(L, "mcp.await arguments must have at least one pool");
+ }
+ if (lua_isnumber(L, 3)) {
+ wait_for = lua_tointeger(L, 3);
+ lua_pop(L, 1);
+ if (wait_for > n) {
+ wait_for = n;
+ }
+ }
+ if (lua_isnumber(L, 4)) {
+ type = lua_tointeger(L, 4);
+ switch (type) {
+ case AWAIT_GOOD:
+ case AWAIT_ANY:
+ case AWAIT_OK:
+ break;
+ default:
+ proxy_lua_error(L, "invalid type argument tp mcp.await");
+ }
+ }
+ // FIRST is only looking for one valid request.
+ if (type == AWAIT_FIRST) {
+ wait_for = 1;
+ }
+ // TODO (v2): quickly loop table once and ensure they're all pools?
+ // TODO (v2) in case of newuserdatauv throwing an error, we need to grab
+ // these references after allocating *aw else can leak memory.
+ int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
+ int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.
+ // stack will be only the await object now
+ // allocate before grabbing references so an error won't cause leaks.
+ mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0);
+ memset(aw, 0, sizeof(mcp_await_t));
+ aw->wait_for = wait_for;
+ aw->pending = n;
+ aw->argtable_ref = argtable_ref;
+ aw->rq = rq;
+ aw->req_ref = req_ref;
+ aw->type = type;
+ P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
+ return lua_yield(L, 1);
+static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+ mcp_backend_t *be = rq->be;
+ // Then we push a response object, which we'll re-use later.
+ // reserve one uservalue for a lua-supplied response.
+ mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
+ memset(r, 0, sizeof(mcp_resp_t));
+ r->start = rq->start;
+ int x;
+ int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
+ for (x = 0; x < end; x++) {
+ if (rq->pr.request[x] == ' ') {
+ break;
+ }
+ r->cmd[x] = rq->pr.request[x];
+ }
+ r->cmd[x] = '\0';
+ luaL_getmetatable(Lc, "mcp.response");
+ lua_setmetatable(Lc, -2);
+ io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
+ if (p == NULL) {
+ WSTAT_INCR(c, proxy_conn_oom, 1);
+ proxy_lua_error(Lc, "out of memory allocating from IO cache");
+ return;
+ }
+ // this is a re-cast structure, so assert that we never outsize it.
+ assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
+ memset(p, 0, sizeof(io_pending_proxy_t));
+ // set up back references.
+ p->io_queue_type = IO_QUEUE_PROXY;
+ p->thread = c->thread;
+ p->c = c;
+ p->resp = NULL;
+ p->client_resp = r;
+ p->flushed = false;
+ p->ascii_multiget = rq->ascii_multiget;
+ // io_p needs to hold onto its own response reference, because we may or
+ // may not include it in the final await() result.
+ p->mcpres_ref = luaL_ref(Lc, LUA_REGISTRYINDEX); // pops mcp.response
+ // avoiding coroutine reference for sub-IO
+ p->coro_ref = 0;
+ p->coro = NULL;
+ // await specific
+ p->is_await = true;
+ p->await_ref = await_ref;
+ p->await_first = await_first;
+ // The direct backend object. await object is holding reference
+ p->backend = be;
+ mcp_request_attach(Lc, rq, p);
+ // link into the batch chain.
+ p->next = q->stack_ctx;
+ q->stack_ctx = p;
+ P_DEBUG("%s: queued\n", __func__);
+ return;
+// TODO (v2): need to get this code running under pcall().
+// It looks like a bulk of this code can move into mcplib_await(),
+// and then here post-yield we can add the conn and coro_ref to the right
+// places. Else these errors currently crash the daemon.
+int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
+ P_DEBUG("%s: start\n", __func__);
+ mcp_await_t *aw = lua_touserdata(L, -1);
+ int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped.
+ assert(aw != NULL);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1
+ //dump_stack(L);
+ P_DEBUG("%s: argtable len: %d\n", __func__, (int)lua_rawlen(L, -1));
+ mcp_request_t *rq = aw->rq;
+ aw->coro_ref = coro_ref;
+ // create result table
+ lua_newtable(L); // -> 2
+ aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table
+ // prepare the request key
+ const char *key = MCP_PARSER_KEY(rq->pr);
+ size_t len = rq->pr.klen;
+ bool await_first = true;
+ // loop arg table and run each hash selector
+ lua_pushnil(L); // -> 3
+ while (lua_next(L, 1) != 0) {
+ P_DEBUG("%s: top of loop\n", __func__);
+ // (key, -2), (val, -1)
+ mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
+ if (pp == NULL) {
+ proxy_lua_error(L, "mcp.await must be supplied with a pool");
+ }
+ mcp_pool_t *p = pp->main;
+ // NOTE: rq->be is only held to help pass the backend into the IOP in
+ // mcp_queue call. Could be a local variable and an argument too.
+ rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ mcp_queue_await_io(c, L, rq, await_ref, await_first);
+ await_first = false;
+ // pop value, keep key.
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // remove table key.
+ aw->resp = c->resp; // cuddle the current mc_resp to fill later
+ // we count the await as the "response pending" since it covers a single
+ // response object. the sub-IO's don't count toward the redispatch of *c
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+ q->count++;
+ P_DEBUG("%s\n", __func__);
+ return 0;
+// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown
+// purposefully as of this writing, but it's still not safe. Either the code
+// can be restructured to use less lua (which I think is better long term
+// anyway) or it can be pushed behind a cfunc pcall so we don't crash the
+// daemon if something bad happens.
+int mcplib_await_return(io_pending_proxy_t *p) {
+ mcp_await_t *aw;
+ lua_State *L = p->thread->L; // use the main VM coroutine for work
+ bool cleanup = false;
+ bool valid = false; // is response valid to add to the result table.
+ bool completing = false;
+ // TODO (v2): just push the await ptr into *p?
+ lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref);
+ aw = lua_touserdata(L, -1);
+ lua_pop(L, 1); // remove AW object from stack
+ assert(aw != NULL);
+ P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending);
+ //dump_stack(L);
+ aw->pending--;
+ // Await not yet satisfied.
+ // If wait_for != 0 check for response success
+ // if success and wait_for is *now* 0, we complete.
+ // add successful response to response table
+ // Also, if no wait_for, add response to response table
+ // TODO (v2): for GOOD or OK cases, it might be better to return the
+ // last object as valid if there are otherwise zero valids?
+ // Think we just have to count valids...
+ if (!aw->completed) {
+ valid = true; // always collect results unless we are completed.
+ if (aw->wait_for > 0) {
+ bool is_good = false;
+ switch (aw->type) {
+ case AWAIT_GOOD:
+ if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
+ is_good = true;
+ }
+ break;
+ case AWAIT_ANY:
+ is_good = true;
+ break;
+ case AWAIT_OK:
+ if (p->client_resp->status == MCMC_OK) {
+ is_good = true;
+ }
+ break;
+ if (p->await_first) {
+ is_good = true;
+ } else {
+ // user only wants the first pool's result.
+ valid = false;
+ }
+ break;
+ }
+ if (is_good) {
+ aw->wait_for--;
+ }
+ if (aw->wait_for == 0) {
+ completing = true;
+ }
+ }
+ }
+ // note that post-completion, we stop gathering responses into the
+ // resposne table... because it's already been returned.
+ // So "valid" can only be true if also !completed
+ if (aw->pending == 0) {
+ if (!aw->completed) {
+ // were waiting for all responses.
+ completing = true;
+ }
+ cleanup = true;
+ P_DEBUG("%s: pending == 0\n", __func__);
+ }
+ // a valid response to add to the result table.
+ if (valid) {
+ P_DEBUG("%s: valid\n", __func__);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
+ lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2
+ // couldn't find a table.insert() equivalent; so this is
+ // inserting into the length + 1 position manually.
+ //dump_stack(L);
+ lua_rawseti(L, 1, lua_rawlen(L, 1) + 1); // pops mcpres
+ lua_pop(L, 1); // pops restable
+ }
+ // lose our internal mcpres reference regardless.
+ luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
+ // our await_ref is shared, so we don't need to release it.
+ if (completing) {
+ P_DEBUG("%s: completing\n", __func__);
+ aw->completed = true;
+ // if we haven't completed yet, the connection reference is still
+ // valid. So now we pull it, reduce count, and readd if necessary.
+ // here is also the point where we resume the coroutine.
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->coro_ref);
+ lua_State *Lc = lua_tothread(L, -1);
+ lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
+ proxy_run_coroutine(Lc, aw->resp, NULL, p->c);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->coro_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref);
+ io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
+ // call re-add directly since we're already in the worker thread.
+ conn_worker_readd(p->c);
+ }
+ }
+ if (cleanup) {
+ P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref);
+ }
+ // Just remove anything we could have left on the primary VM stack
+ lua_settop(L, 0);
+ // always return free this sub-IO object.
+ do_cache_free(p->thread->io_cache, p);
+ return 0;