summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-11 10:57:52 -0800
committerdormando <dormando@rydia.net>2022-02-11 16:48:33 -0800
commit0b35fb967a5088109d5237b00c8ef168a56a6f9a (patch)
tree91e5d3f750bfbde6890b8251d7bd62020423df2b /proto_proxy.c
parent58351cdc1d5da2be9a2b4402c987c3cc9478ac7a (diff)
downloadmemcached-0b35fb967a5088109d5237b00c8ef168a56a6f9a.tar.gz
proxy: track in-flight requests
I wanted to do this via lua with some on-close hooks on the coroutine but this might work for now. not 100% sure I caught all of the incr/decr cases properly. Was trying to avoid hitting the counters too hard as well.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c54
1 files changed, 21 insertions, 33 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index b463ced..22703b0 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -53,6 +53,11 @@
c->thread->stats.stat += amount; \
pthread_mutex_unlock(&c->thread->stats.mutex); \
}
+#define WSTAT_DECR(c, stat, amount) { \
+ pthread_mutex_lock(&c->thread->stats.mutex); \
+ c->thread->stats.stat -= amount; \
+ pthread_mutex_unlock(&c->thread->stats.mutex); \
+}
#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
#define STAT_INCR(ctx, stat, amount) { \
@@ -1212,31 +1217,7 @@ void proxy_submit_cb(io_queue_t *q) {
}
void proxy_complete_cb(io_queue_t *q) {
- /*
- io_pending_proxy_t *p = q->stack_ctx;
- q->stack_ctx = NULL;
-
- while (p) {
- io_pending_proxy_t *next = p->next;
- mc_resp *resp = p->resp;
- lua_State *Lc = p->coro;
-
- // in order to resume we need to remove the objects that were
- // originally returned
- // what's currently on the top of the stack is what we want to keep.
- lua_rotate(Lc, 1, 1);
- // We kept the original results from the yield so lua would not
- // collect them in the meantime. We can drop those now.
- lua_settop(Lc, 1);
-
- proxy_run_coroutine(Lc, resp, p, NULL);
-
- // don't need to flatten main thread here, since the coro is gone.
-
- p = next;
- }
- return;
- */
+ // empty/unused.
}
// called from worker thread after an individual IO has been returned back to
@@ -1259,7 +1240,7 @@ void proxy_return_cb(io_pending_t *pending) {
// p can be freed/changed from the call below, so fetch the queue now.
io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
conn *c = p->c;
- proxy_run_coroutine(Lc, p->resp, p, NULL);
+ proxy_run_coroutine(Lc, p->resp, p, c);
q->count--;
if (q->count == 0) {
@@ -1347,6 +1328,7 @@ void proxy_cleanup_conn(conn *c) {
lua_State *L = thr->L;
luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
c->proxy_coro_ref = 0;
+ WSTAT_DECR(c, proxy_req_active, 1);
}
// we buffered a SET of some kind.
@@ -1934,6 +1916,7 @@ static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t
size_t rlen = 0;
if (cores == LUA_OK) {
+ WSTAT_DECR(c, proxy_req_active, 1);
int type = lua_type(Lc, -1);
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response");
@@ -2007,6 +1990,7 @@ static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t
mcp_queue_io(c, resp, coro_ref, Lc);
}
} else {
+ WSTAT_DECR(c, proxy_req_active, 1);
P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1));
proxy_out_errstring(resp, "lua failure");
@@ -2590,13 +2574,6 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
return;
}
- // Count requests handled by proxy vs local.
- struct proxy_int_stats *istats = c->thread->proxy_int_stats;
- WSTAT_L(c->thread);
- istats->counters[pr.command]++;
- c->thread->stats.proxy_conn_requests++;
- WSTAT_UL(c->thread);
-
// If ascii multiget, we turn this into a self-calling loop :(
// create new request with next key, call this func again, then advance
// original string.
@@ -2671,6 +2648,16 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
return;
}
+ // Count requests handled by proxy vs local.
+ // Also batch the counts down this far so we can lock once for the active
+ // counter instead of twice.
+ struct proxy_int_stats *istats = c->thread->proxy_int_stats;
+ WSTAT_L(c->thread);
+ istats->counters[pr.command]++;
+ c->thread->stats.proxy_conn_requests++;
+ c->thread->stats.proxy_req_active++;
+ WSTAT_UL(c->thread);
+
// start a coroutine.
// TODO (v2): This can pull a thread from a cache.
lua_newthread(L);
@@ -2691,6 +2678,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
if (c->item == NULL) {
lua_settop(L, 0);
proxy_out_errstring(c->resp, "out of memory");
+ WSTAT_DECR(c, proxy_req_active, 1);
return;
}
c->item_malloced = true;