diff options
Diffstat (limited to 'proto_proxy.c')
-rw-r--r-- | proto_proxy.c | 91 |
1 files changed, 78 insertions, 13 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 952c897..b01064d 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc); /******** EXTERNAL FUNCTIONS ******/ // functions starting with _ are breakouts for the public functions. +bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) { + bool oom = false; + pthread_mutex_lock(&t->proxy_limit_lock); + if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) { + oom = true; + } else { + t->proxy_buffer_memory_used += len; + } + pthread_mutex_unlock(&t->proxy_limit_lock); + return oom; +} + // see also: process_extstore_stats() void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { if (arg == NULL) { @@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { char key_str[STAT_KEY_LEN]; struct proxy_int_stats istats = {0}; + uint64_t req_limit = 0; + uint64_t buffer_memory_limit = 0; + uint64_t buffer_memory_used = 0; if (!arg) { return; } proxy_ctx_t *ctx = arg; STAT_L(ctx); + req_limit = ctx->active_req_limit; + buffer_memory_limit = ctx->buffer_memory_limit; // prepare aggregated counters. struct proxy_user_stats *us = &ctx->user_stats; @@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { } } WSTAT_UL(t); + pthread_mutex_lock(&t->proxy_limit_lock); + buffer_memory_used += t->proxy_buffer_memory_used; + pthread_mutex_unlock(&t->proxy_limit_lock); } // return all of the user generated stats @@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]); APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]); } + STAT_UL(ctx); + if (buffer_memory_limit == UINT64_MAX) { + buffer_memory_limit = 0; + } else { + buffer_memory_limit *= settings.num_threads; + } + if (req_limit == UINT64_MAX) { + req_limit = 0; + } else { + req_limit *= settings.num_threads; + } + // return proxy counters + APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit); + APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit); + APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used); APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]); APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]); APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]); @@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) { pthread_cond_init(&ctx->manager_cond, NULL); pthread_mutex_init(&ctx->stats_lock, NULL); + ctx->active_req_limit = UINT64_MAX; + ctx->buffer_memory_limit = UINT64_MAX; + // FIXME (v2): default defines. ctx->tunables.tcp_keepalive = false; ctx->tunables.backend_failure_limit = 3; @@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) { fprintf(stderr, "Failed to allocate proxy thread stats\n"); exit(EXIT_FAILURE); } + pthread_mutex_init(&thr->proxy_limit_lock, NULL); thr->proxy_ctx = ctx; // Initialize the lua state. @@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) { void proxy_finalize_cb(io_pending_t *pending) { io_pending_proxy_t *p = (io_pending_proxy_t *)pending; + if (p->io_type == IO_PENDING_TYPE_EXTSTORE) { + if (p->hdr_it) { + // TODO: lock once, worst case this hashes/locks twice. + if (p->miss) { + item_unlink(p->hdr_it); + } + item_remove(p->hdr_it); + } + } + // release our coroutine reference. // TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist // after a lua_resetthread(Lc) call. @@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) { luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref); } - if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) { - // TODO: lock once, worst case this hashes/locks twice. - if (p->miss) { - item_unlink(p->hdr_it); - } - item_remove(p->hdr_it); - } return; } @@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) { c->item_malloced = false; luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref); c->proxy_coro_ref = 0; + pthread_mutex_lock(&thr->proxy_limit_lock); + thr->proxy_buffer_memory_used += rq->pr.vlen; + pthread_mutex_unlock(&thr->proxy_limit_lock); proxy_run_coroutine(Lc, c->resp, NULL, c); @@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con // associated io_pending's of its own later. } else if (r->buf) { // response set from C. - // FIXME (v2): write_and_free() ? it's a bit wrong for here. - resp->write_and_free = r->buf; resp_add_iov(resp, r->buf, r->blen); - r->buf = NULL; } else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) { // uservalue slot 1 is pre-created, so we get TNIL instead of // TNONE when nothing was set into it. @@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu LIBEVENT_THREAD *thr = c->thread; struct proxy_hook *hooks = thr->proxy_hooks; lua_State *L = thr->L; + proxy_ctx_t *ctx = thr->proxy_ctx; mcp_parser_t pr = {0}; // Avoid doing resp_start() here, instead do it a bit later or as-needed. @@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu // Also batch the counts down this far so we can lock once for the active // counter instead of twice. struct proxy_int_stats *istats = c->thread->proxy_int_stats; + uint64_t active_reqs = 0; WSTAT_L(c->thread); istats->counters[pr.command]++; c->thread->stats.proxy_conn_requests++; c->thread->stats.proxy_req_active++; + active_reqs = c->thread->stats.proxy_req_active; WSTAT_UL(c->thread); + if (active_reqs > ctx->active_req_limit) { + proxy_out_errstring(c->resp, "active request limit reached"); + WSTAT_DECR(c->thread, proxy_req_active, 1); + if (pr.vlen != 0) { + c->sbytes = pr.vlen; + conn_set_state(c, conn_swallow); + } + return; + } + // start a coroutine. // TODO (v2): This can pull a thread from a cache. lua_newthread(L); @@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu // TODO (v2): lift this to a post-processor? if (rq->pr.vlen != 0) { - // relying on temporary malloc's not succumbing as poorly to - // fragmentation. - c->item = malloc(rq->pr.vlen); + c->item = NULL; + // Need to add the used memory later due to needing an extra callback + // handler on error during nread. + bool oom = proxy_bufmem_checkadd(c->thread, 0); + + // relying on temporary malloc's not having fragmentation + if (!oom) { + c->item = malloc(rq->pr.vlen); + } if (c->item == NULL) { lua_settop(L, 0); proxy_out_errstring(c->resp, "out of memory"); WSTAT_DECR(c->thread, proxy_req_active, 1); + c->sbytes = rq->pr.vlen; + conn_set_state(c, conn_swallow); return; } c->item_malloced = true; @@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) { memset(r, 0, sizeof(mcp_resp_t)); r->buf = NULL; r->blen = 0; + r->thread = c->thread; + assert(r->thread != NULL); gettimeofday(&r->start, NULL); // Set noreply mode. // TODO (v2): the response "inherits" the request's noreply mode, which isn't |