From 0b35fb967a5088109d5237b00c8ef168a56a6f9a Mon Sep 17 00:00:00 2001 From: dormando Date: Fri, 11 Feb 2022 10:57:52 -0800 Subject: proxy: track in-flight requests I wanted to do this via lua with some on-close hooks on the coroutine but this might work for now. not 100% sure I caught all of the incr/decr cases properly. Was trying to avoid hitting the counters too hard as well. --- proto_proxy.c | 54 +++++++++++++++++++++--------------------------------- 1 file changed, 21 insertions(+), 33 deletions(-) (limited to 'proto_proxy.c') diff --git a/proto_proxy.c b/proto_proxy.c index b463ced..22703b0 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -53,6 +53,11 @@ c->thread->stats.stat += amount; \ pthread_mutex_unlock(&c->thread->stats.mutex); \ } +#define WSTAT_DECR(c, stat, amount) { \ + pthread_mutex_lock(&c->thread->stats.mutex); \ + c->thread->stats.stat -= amount; \ + pthread_mutex_unlock(&c->thread->stats.mutex); \ +} #define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock); #define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock); #define STAT_INCR(ctx, stat, amount) { \ @@ -1212,31 +1217,7 @@ void proxy_submit_cb(io_queue_t *q) { } void proxy_complete_cb(io_queue_t *q) { - /* - io_pending_proxy_t *p = q->stack_ctx; - q->stack_ctx = NULL; - - while (p) { - io_pending_proxy_t *next = p->next; - mc_resp *resp = p->resp; - lua_State *Lc = p->coro; - - // in order to resume we need to remove the objects that were - // originally returned - // what's currently on the top of the stack is what we want to keep. - lua_rotate(Lc, 1, 1); - // We kept the original results from the yield so lua would not - // collect them in the meantime. We can drop those now. - lua_settop(Lc, 1); - - proxy_run_coroutine(Lc, resp, p, NULL); - - // don't need to flatten main thread here, since the coro is gone. - - p = next; - } - return; - */ + // empty/unused. } // called from worker thread after an individual IO has been returned back to @@ -1259,7 +1240,7 @@ void proxy_return_cb(io_pending_t *pending) { // p can be freed/changed from the call below, so fetch the queue now. io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); conn *c = p->c; - proxy_run_coroutine(Lc, p->resp, p, NULL); + proxy_run_coroutine(Lc, p->resp, p, c); q->count--; if (q->count == 0) { @@ -1347,6 +1328,7 @@ void proxy_cleanup_conn(conn *c) { lua_State *L = thr->L; luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref); c->proxy_coro_ref = 0; + WSTAT_DECR(c, proxy_req_active, 1); } // we buffered a SET of some kind. @@ -1934,6 +1916,7 @@ static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t size_t rlen = 0; if (cores == LUA_OK) { + WSTAT_DECR(c, proxy_req_active, 1); int type = lua_type(Lc, -1); if (type == LUA_TUSERDATA) { mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response"); @@ -2007,6 +1990,7 @@ static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t mcp_queue_io(c, resp, coro_ref, Lc); } } else { + WSTAT_DECR(c, proxy_req_active, 1); P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1)); LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1)); proxy_out_errstring(resp, "lua failure"); @@ -2590,13 +2574,6 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu return; } - // Count requests handled by proxy vs local. - struct proxy_int_stats *istats = c->thread->proxy_int_stats; - WSTAT_L(c->thread); - istats->counters[pr.command]++; - c->thread->stats.proxy_conn_requests++; - WSTAT_UL(c->thread); - // If ascii multiget, we turn this into a self-calling loop :( // create new request with next key, call this func again, then advance // original string. @@ -2671,6 +2648,16 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu return; } + // Count requests handled by proxy vs local. + // Also batch the counts down this far so we can lock once for the active + // counter instead of twice. + struct proxy_int_stats *istats = c->thread->proxy_int_stats; + WSTAT_L(c->thread); + istats->counters[pr.command]++; + c->thread->stats.proxy_conn_requests++; + c->thread->stats.proxy_req_active++; + WSTAT_UL(c->thread); + // start a coroutine. // TODO (v2): This can pull a thread from a cache. lua_newthread(L); @@ -2691,6 +2678,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu if (c->item == NULL) { lua_settop(L, 0); proxy_out_errstring(c->resp, "out of memory"); + WSTAT_DECR(c, proxy_req_active, 1); return; } c->item_malloced = true; -- cgit v1.2.1