summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-03-04 21:50:28 -0800
committerdormando <dormando@rydia.net>2023-03-26 16:48:37 -0700
commit6c80728209acdb46629db8db3868d59d627ec33e (patch)
tree1628e77e962c0b0037ab225a444828d99ae3b8a5 /proto_proxy.c
parente0fa1fe46aeb9e405cb58234f6016b2c48a10777 (diff)
downloadmemcached-6c80728209acdb46629db8db3868d59d627ec33e.tar.gz
proxy: add request and buffer memory limits
Adds: mcp.active_req_limit(count) mcp.buffer_memory_limit(kilobytes) Divides by the number of worker threads and creates a per-worker-thread limit for the number of concurrent proxy requests, and how many bytes used specifically for value bytes. This does not represent total memory usage but will be close. Buffer memory for inbound set requests is not accounted for until after the object has been read from the socket; to be improved in a future update. This should be fine unless clients send just the SET request and then hang without sending further data. Limits should be live-adjustable via configuration reloads.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c91
1 files changed, 78 insertions, 13 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 952c897..b01064d 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
+ bool oom = false;
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
+ oom = true;
+ } else {
+ t->proxy_buffer_memory_used += len;
+ }
+ pthread_mutex_unlock(&t->proxy_limit_lock);
+ return oom;
+}
+
// see also: process_extstore_stats()
void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
if (arg == NULL) {
@@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
struct proxy_int_stats istats = {0};
+ uint64_t req_limit = 0;
+ uint64_t buffer_memory_limit = 0;
+ uint64_t buffer_memory_used = 0;
if (!arg) {
return;
}
proxy_ctx_t *ctx = arg;
STAT_L(ctx);
+ req_limit = ctx->active_req_limit;
+ buffer_memory_limit = ctx->buffer_memory_limit;
// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
@@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
}
}
WSTAT_UL(t);
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ buffer_memory_used += t->proxy_buffer_memory_used;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// return all of the user generated stats
@@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
}
+
STAT_UL(ctx);
+ if (buffer_memory_limit == UINT64_MAX) {
+ buffer_memory_limit = 0;
+ } else {
+ buffer_memory_limit *= settings.num_threads;
+ }
+ if (req_limit == UINT64_MAX) {
+ req_limit = 0;
+ } else {
+ req_limit *= settings.num_threads;
+ }
+
// return proxy counters
+ APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
+ APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
+ APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
@@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) {
pthread_cond_init(&ctx->manager_cond, NULL);
pthread_mutex_init(&ctx->stats_lock, NULL);
+ ctx->active_req_limit = UINT64_MAX;
+ ctx->buffer_memory_limit = UINT64_MAX;
+
// FIXME (v2): default defines.
ctx->tunables.tcp_keepalive = false;
ctx->tunables.backend_failure_limit = 3;
@@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
fprintf(stderr, "Failed to allocate proxy thread stats\n");
exit(EXIT_FAILURE);
}
+ pthread_mutex_init(&thr->proxy_limit_lock, NULL);
thr->proxy_ctx = ctx;
// Initialize the lua state.
@@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) {
void proxy_finalize_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+ if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
+ if (p->hdr_it) {
+ // TODO: lock once, worst case this hashes/locks twice.
+ if (p->miss) {
+ item_unlink(p->hdr_it);
+ }
+ item_remove(p->hdr_it);
+ }
+ }
+
// release our coroutine reference.
// TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist
// after a lua_resetthread(Lc) call.
@@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) {
luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
}
- if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) {
- // TODO: lock once, worst case this hashes/locks twice.
- if (p->miss) {
- item_unlink(p->hdr_it);
- }
- item_remove(p->hdr_it);
- }
return;
}
@@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) {
c->item_malloced = false;
luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
c->proxy_coro_ref = 0;
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
proxy_run_coroutine(Lc, c->resp, NULL, c);
@@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// associated io_pending's of its own later.
} else if (r->buf) {
// response set from C.
- // FIXME (v2): write_and_free() ? it's a bit wrong for here.
- resp->write_and_free = r->buf;
resp_add_iov(resp, r->buf, r->blen);
- r->buf = NULL;
} else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) {
// uservalue slot 1 is pre-created, so we get TNIL instead of
// TNONE when nothing was set into it.
@@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
LIBEVENT_THREAD *thr = c->thread;
struct proxy_hook *hooks = thr->proxy_hooks;
lua_State *L = thr->L;
+ proxy_ctx_t *ctx = thr->proxy_ctx;
mcp_parser_t pr = {0};
// Avoid doing resp_start() here, instead do it a bit later or as-needed.
@@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// Also batch the counts down this far so we can lock once for the active
// counter instead of twice.
struct proxy_int_stats *istats = c->thread->proxy_int_stats;
+ uint64_t active_reqs = 0;
WSTAT_L(c->thread);
istats->counters[pr.command]++;
c->thread->stats.proxy_conn_requests++;
c->thread->stats.proxy_req_active++;
+ active_reqs = c->thread->stats.proxy_req_active;
WSTAT_UL(c->thread);
+ if (active_reqs > ctx->active_req_limit) {
+ proxy_out_errstring(c->resp, "active request limit reached");
+ WSTAT_DECR(c->thread, proxy_req_active, 1);
+ if (pr.vlen != 0) {
+ c->sbytes = pr.vlen;
+ conn_set_state(c, conn_swallow);
+ }
+ return;
+ }
+
// start a coroutine.
// TODO (v2): This can pull a thread from a cache.
lua_newthread(L);
@@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// TODO (v2): lift this to a post-processor?
if (rq->pr.vlen != 0) {
- // relying on temporary malloc's not succumbing as poorly to
- // fragmentation.
- c->item = malloc(rq->pr.vlen);
+ c->item = NULL;
+ // Need to add the used memory later due to needing an extra callback
+ // handler on error during nread.
+ bool oom = proxy_bufmem_checkadd(c->thread, 0);
+
+ // relying on temporary malloc's not having fragmentation
+ if (!oom) {
+ c->item = malloc(rq->pr.vlen);
+ }
if (c->item == NULL) {
lua_settop(L, 0);
proxy_out_errstring(c->resp, "out of memory");
WSTAT_DECR(c->thread, proxy_req_active, 1);
+ c->sbytes = rq->pr.vlen;
+ conn_set_state(c, conn_swallow);
return;
}
c->item_malloced = true;
@@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't