summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c91
1 files changed, 78 insertions, 13 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 952c897..b01064d 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
+ bool oom = false;
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
+ oom = true;
+ } else {
+ t->proxy_buffer_memory_used += len;
+ }
+ pthread_mutex_unlock(&t->proxy_limit_lock);
+ return oom;
+}
+
// see also: process_extstore_stats()
void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
if (arg == NULL) {
@@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
struct proxy_int_stats istats = {0};
+ uint64_t req_limit = 0;
+ uint64_t buffer_memory_limit = 0;
+ uint64_t buffer_memory_used = 0;
if (!arg) {
return;
}
proxy_ctx_t *ctx = arg;
STAT_L(ctx);
+ req_limit = ctx->active_req_limit;
+ buffer_memory_limit = ctx->buffer_memory_limit;
// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
@@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
}
}
WSTAT_UL(t);
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ buffer_memory_used += t->proxy_buffer_memory_used;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// return all of the user generated stats
@@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
}
+
STAT_UL(ctx);
+ if (buffer_memory_limit == UINT64_MAX) {
+ buffer_memory_limit = 0;
+ } else {
+ buffer_memory_limit *= settings.num_threads;
+ }
+ if (req_limit == UINT64_MAX) {
+ req_limit = 0;
+ } else {
+ req_limit *= settings.num_threads;
+ }
+
// return proxy counters
+ APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
+ APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
+ APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
@@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) {
pthread_cond_init(&ctx->manager_cond, NULL);
pthread_mutex_init(&ctx->stats_lock, NULL);
+ ctx->active_req_limit = UINT64_MAX;
+ ctx->buffer_memory_limit = UINT64_MAX;
+
// FIXME (v2): default defines.
ctx->tunables.tcp_keepalive = false;
ctx->tunables.backend_failure_limit = 3;
@@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
fprintf(stderr, "Failed to allocate proxy thread stats\n");
exit(EXIT_FAILURE);
}
+ pthread_mutex_init(&thr->proxy_limit_lock, NULL);
thr->proxy_ctx = ctx;
// Initialize the lua state.
@@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) {
void proxy_finalize_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+ if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
+ if (p->hdr_it) {
+ // TODO: lock once, worst case this hashes/locks twice.
+ if (p->miss) {
+ item_unlink(p->hdr_it);
+ }
+ item_remove(p->hdr_it);
+ }
+ }
+
// release our coroutine reference.
// TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist
// after a lua_resetthread(Lc) call.
@@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) {
luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
}
- if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) {
- // TODO: lock once, worst case this hashes/locks twice.
- if (p->miss) {
- item_unlink(p->hdr_it);
- }
- item_remove(p->hdr_it);
- }
return;
}
@@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) {
c->item_malloced = false;
luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
c->proxy_coro_ref = 0;
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
proxy_run_coroutine(Lc, c->resp, NULL, c);
@@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// associated io_pending's of its own later.
} else if (r->buf) {
// response set from C.
- // FIXME (v2): write_and_free() ? it's a bit wrong for here.
- resp->write_and_free = r->buf;
resp_add_iov(resp, r->buf, r->blen);
- r->buf = NULL;
} else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) {
// uservalue slot 1 is pre-created, so we get TNIL instead of
// TNONE when nothing was set into it.
@@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
LIBEVENT_THREAD *thr = c->thread;
struct proxy_hook *hooks = thr->proxy_hooks;
lua_State *L = thr->L;
+ proxy_ctx_t *ctx = thr->proxy_ctx;
mcp_parser_t pr = {0};
// Avoid doing resp_start() here, instead do it a bit later or as-needed.
@@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// Also batch the counts down this far so we can lock once for the active
// counter instead of twice.
struct proxy_int_stats *istats = c->thread->proxy_int_stats;
+ uint64_t active_reqs = 0;
WSTAT_L(c->thread);
istats->counters[pr.command]++;
c->thread->stats.proxy_conn_requests++;
c->thread->stats.proxy_req_active++;
+ active_reqs = c->thread->stats.proxy_req_active;
WSTAT_UL(c->thread);
+ if (active_reqs > ctx->active_req_limit) {
+ proxy_out_errstring(c->resp, "active request limit reached");
+ WSTAT_DECR(c->thread, proxy_req_active, 1);
+ if (pr.vlen != 0) {
+ c->sbytes = pr.vlen;
+ conn_set_state(c, conn_swallow);
+ }
+ return;
+ }
+
// start a coroutine.
// TODO (v2): This can pull a thread from a cache.
lua_newthread(L);
@@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// TODO (v2): lift this to a post-processor?
if (rq->pr.vlen != 0) {
- // relying on temporary malloc's not succumbing as poorly to
- // fragmentation.
- c->item = malloc(rq->pr.vlen);
+ c->item = NULL;
+ // Need to add the used memory later due to needing an extra callback
+ // handler on error during nread.
+ bool oom = proxy_bufmem_checkadd(c->thread, 0);
+
+ // relying on temporary malloc's not having fragmentation
+ if (!oom) {
+ c->item = malloc(rq->pr.vlen);
+ }
if (c->item == NULL) {
lua_settop(L, 0);
proxy_out_errstring(c->resp, "out of memory");
WSTAT_DECR(c->thread, proxy_req_active, 1);
+ c->sbytes = rq->pr.vlen;
+ conn_set_state(c, conn_swallow);
return;
}
c->item_malloced = true;
@@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't