diff options
-rw-r--r-- | memcached.h | 4 | ||||
-rw-r--r-- | proto_proxy.c | 91 | ||||
-rw-r--r-- | proxy.h | 5 | ||||
-rw-r--r-- | proxy_await.c | 2 | ||||
-rw-r--r-- | proxy_config.c | 7 | ||||
-rw-r--r-- | proxy_lua.c | 49 | ||||
-rw-r--r-- | proxy_network.c | 17 | ||||
-rw-r--r-- | proxy_request.c | 12 | ||||
-rw-r--r-- | t/proxyconfig.lua | 56 | ||||
-rw-r--r-- | t/proxyconfig.t | 177 |
10 files changed, 400 insertions, 20 deletions
diff --git a/memcached.h b/memcached.h index 68d0ab0..ba3544d 100644 --- a/memcached.h +++ b/memcached.h @@ -725,6 +725,10 @@ typedef struct { void *proxy_user_stats; void *proxy_int_stats; void *proxy_event_thread; // worker threads can also be proxy IO threads + pthread_mutex_t proxy_limit_lock; + uint64_t proxy_active_req_limit; + uint64_t proxy_buffer_memory_limit; // protected by limit_lock + uint64_t proxy_buffer_memory_used; // protected by limit_lock uint32_t proxy_rng[4]; // fast per-thread rng for lua. // TODO: add ctx object so we can attach to queue. #endif diff --git a/proto_proxy.c b/proto_proxy.c index 952c897..b01064d 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc); /******** EXTERNAL FUNCTIONS ******/ // functions starting with _ are breakouts for the public functions. +bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) { + bool oom = false; + pthread_mutex_lock(&t->proxy_limit_lock); + if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) { + oom = true; + } else { + t->proxy_buffer_memory_used += len; + } + pthread_mutex_unlock(&t->proxy_limit_lock); + return oom; +} + // see also: process_extstore_stats() void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { if (arg == NULL) { @@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { char key_str[STAT_KEY_LEN]; struct proxy_int_stats istats = {0}; + uint64_t req_limit = 0; + uint64_t buffer_memory_limit = 0; + uint64_t buffer_memory_used = 0; if (!arg) { return; } proxy_ctx_t *ctx = arg; STAT_L(ctx); + req_limit = ctx->active_req_limit; + buffer_memory_limit = ctx->buffer_memory_limit; // prepare aggregated counters. struct proxy_user_stats *us = &ctx->user_stats; @@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { } } WSTAT_UL(t); + pthread_mutex_lock(&t->proxy_limit_lock); + buffer_memory_used += t->proxy_buffer_memory_used; + pthread_mutex_unlock(&t->proxy_limit_lock); } // return all of the user generated stats @@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]); APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]); } + STAT_UL(ctx); + if (buffer_memory_limit == UINT64_MAX) { + buffer_memory_limit = 0; + } else { + buffer_memory_limit *= settings.num_threads; + } + if (req_limit == UINT64_MAX) { + req_limit = 0; + } else { + req_limit *= settings.num_threads; + } + // return proxy counters + APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit); + APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit); + APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used); APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]); APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]); APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]); @@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) { pthread_cond_init(&ctx->manager_cond, NULL); pthread_mutex_init(&ctx->stats_lock, NULL); + ctx->active_req_limit = UINT64_MAX; + ctx->buffer_memory_limit = UINT64_MAX; + // FIXME (v2): default defines. ctx->tunables.tcp_keepalive = false; ctx->tunables.backend_failure_limit = 3; @@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) { fprintf(stderr, "Failed to allocate proxy thread stats\n"); exit(EXIT_FAILURE); } + pthread_mutex_init(&thr->proxy_limit_lock, NULL); thr->proxy_ctx = ctx; // Initialize the lua state. @@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) { void proxy_finalize_cb(io_pending_t *pending) { io_pending_proxy_t *p = (io_pending_proxy_t *)pending; + if (p->io_type == IO_PENDING_TYPE_EXTSTORE) { + if (p->hdr_it) { + // TODO: lock once, worst case this hashes/locks twice. + if (p->miss) { + item_unlink(p->hdr_it); + } + item_remove(p->hdr_it); + } + } + // release our coroutine reference. // TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist // after a lua_resetthread(Lc) call. @@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) { luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref); } - if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) { - // TODO: lock once, worst case this hashes/locks twice. - if (p->miss) { - item_unlink(p->hdr_it); - } - item_remove(p->hdr_it); - } return; } @@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) { c->item_malloced = false; luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref); c->proxy_coro_ref = 0; + pthread_mutex_lock(&thr->proxy_limit_lock); + thr->proxy_buffer_memory_used += rq->pr.vlen; + pthread_mutex_unlock(&thr->proxy_limit_lock); proxy_run_coroutine(Lc, c->resp, NULL, c); @@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con // associated io_pending's of its own later. } else if (r->buf) { // response set from C. - // FIXME (v2): write_and_free() ? it's a bit wrong for here. - resp->write_and_free = r->buf; resp_add_iov(resp, r->buf, r->blen); - r->buf = NULL; } else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) { // uservalue slot 1 is pre-created, so we get TNIL instead of // TNONE when nothing was set into it. @@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu LIBEVENT_THREAD *thr = c->thread; struct proxy_hook *hooks = thr->proxy_hooks; lua_State *L = thr->L; + proxy_ctx_t *ctx = thr->proxy_ctx; mcp_parser_t pr = {0}; // Avoid doing resp_start() here, instead do it a bit later or as-needed. @@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu // Also batch the counts down this far so we can lock once for the active // counter instead of twice. struct proxy_int_stats *istats = c->thread->proxy_int_stats; + uint64_t active_reqs = 0; WSTAT_L(c->thread); istats->counters[pr.command]++; c->thread->stats.proxy_conn_requests++; c->thread->stats.proxy_req_active++; + active_reqs = c->thread->stats.proxy_req_active; WSTAT_UL(c->thread); + if (active_reqs > ctx->active_req_limit) { + proxy_out_errstring(c->resp, "active request limit reached"); + WSTAT_DECR(c->thread, proxy_req_active, 1); + if (pr.vlen != 0) { + c->sbytes = pr.vlen; + conn_set_state(c, conn_swallow); + } + return; + } + // start a coroutine. // TODO (v2): This can pull a thread from a cache. lua_newthread(L); @@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu // TODO (v2): lift this to a post-processor? if (rq->pr.vlen != 0) { - // relying on temporary malloc's not succumbing as poorly to - // fragmentation. - c->item = malloc(rq->pr.vlen); + c->item = NULL; + // Need to add the used memory later due to needing an extra callback + // handler on error during nread. + bool oom = proxy_bufmem_checkadd(c->thread, 0); + + // relying on temporary malloc's not having fragmentation + if (!oom) { + c->item = malloc(rq->pr.vlen); + } if (c->item == NULL) { lua_settop(L, 0); proxy_out_errstring(c->resp, "out of memory"); WSTAT_DECR(c->thread, proxy_req_active, 1); + c->sbytes = rq->pr.vlen; + conn_set_state(c, conn_swallow); return; } c->item_malloced = true; @@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) { memset(r, 0, sizeof(mcp_resp_t)); r->buf = NULL; r->blen = 0; + r->thread = c->thread; + assert(r->thread != NULL); gettimeofday(&r->start, NULL); // Set noreply mode. // TODO (v2): the response "inherits" the request's noreply mode, which isn't @@ -195,6 +195,8 @@ typedef struct { lua_State *proxy_state; void *proxy_code; proxy_event_thread_t *proxy_io_thread; + uint64_t active_req_limit; // max total in-flight requests + uint64_t buffer_memory_limit; // max bytes for send/receive buffers. pthread_mutex_t config_lock; pthread_cond_t config_cond; pthread_t config_tid; @@ -497,6 +499,9 @@ typedef struct { mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread. } mcp_pool_proxy_t; +// utils +bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len); + // networking interface void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base); void *proxy_event_thread(void *arg); diff --git a/proxy_await.c b/proxy_await.c index d5e40d3..12a8546 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -121,6 +121,8 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw // reserve one uservalue for a lua-supplied response. mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1); memset(r, 0, sizeof(mcp_resp_t)); + r->thread = c->thread; + assert(r->thread != NULL); gettimeofday(&r->start, NULL); // Set noreply mode. // TODO (v2): the response "inherits" the request's noreply mode, which isn't diff --git a/proxy_config.c b/proxy_config.c index e45711e..cca6b87 100644 --- a/proxy_config.c +++ b/proxy_config.c @@ -458,8 +458,15 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) { tus->num_stats = us->num_stats; pthread_mutex_unlock(&thr->stats.mutex); } + // also grab the concurrent request limit + thr->proxy_active_req_limit = ctx->active_req_limit; STAT_UL(ctx); + // update limit counter(s) + pthread_mutex_lock(&thr->proxy_limit_lock); + thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit; + pthread_mutex_unlock(&thr->proxy_limit_lock); + return 0; } diff --git a/proxy_lua.c b/proxy_lua.c index 852624e..25209d0 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -82,11 +82,15 @@ static int mcplib_response_line(lua_State *L) { } static int mcplib_response_gc(lua_State *L) { + LIBEVENT_THREAD *t = PROXY_GET_THR(L); mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); // On error/similar we might be holding the read buffer. // If the buf is handed off to mc_resp for return, this pointer is NULL if (r->buf != NULL) { + pthread_mutex_lock(&t->proxy_limit_lock); + t->proxy_buffer_memory_used -= r->blen; + pthread_mutex_unlock(&t->proxy_limit_lock); free(r->buf); } @@ -872,6 +876,49 @@ static int mcplib_backend_read_timeout(lua_State *L) { return 0; } +static int mcplib_active_req_limit(lua_State *L) { + proxy_ctx_t *ctx = PROXY_GET_CTX(L); + uint64_t limit = luaL_checkinteger(L, -1); + + if (limit == 0) { + limit = UINT64_MAX; + } else { + // FIXME: global + int tcount = settings.num_threads; + // The actual limit is per-worker-thread, so divide it up. + if (limit > tcount * 2) { + limit /= tcount; + } + } + + STAT_L(ctx); + ctx->active_req_limit = limit; + STAT_UL(ctx); + + return 0; +} + +// limit specified in kilobytes +static int mcplib_buffer_memory_limit(lua_State *L) { + proxy_ctx_t *ctx = PROXY_GET_CTX(L); + uint64_t limit = luaL_checkinteger(L, -1); + + if (limit == 0) { + limit = UINT64_MAX; + } else { + limit *= 1024; + + int tcount = settings.num_threads; + if (limit > tcount * 2) { + limit /= tcount; + } + + ctx->buffer_memory_limit = limit; + } + + return 0; +} + // mcp.attach(mcp.HOOK_NAME, function) // fill hook structure: if lua function, use luaL_ref() to store the func static int mcplib_attach(lua_State *L) { @@ -1203,6 +1250,8 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { {"backend_read_timeout", mcplib_backend_read_timeout}, {"backend_failure_limit", mcplib_backend_failure_limit}, {"tcp_keepalive", mcplib_tcp_keepalive}, + {"active_req_limit", mcplib_active_req_limit}, + {"buffer_memory_limit", mcplib_buffer_memory_limit}, {NULL, NULL} }; diff --git a/proxy_network.c b/proxy_network.c index 189b6aa..2da41aa 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -1051,12 +1051,25 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { // r->resp.reslen + r->resp.vlen is the total length of the response. // TODO (v2): need to associate a buffer with this response... - // for now lets abuse write_and_free on mc_resp and simply malloc the - // space we need, stuffing it into the resp object. + // for now we simply malloc, but reusable buffers should be used r->blen = r->resp.reslen + r->resp.vlen; + { + bool oom = proxy_bufmem_checkadd(r->thread, r->blen + extra_space); + + if (oom) { + flags = P_BE_FAIL_OOM; + stop = true; + break; + } + } r->buf = malloc(r->blen + extra_space); if (r->buf == NULL) { + // Enforce accounting. + pthread_mutex_lock(&r->thread->proxy_limit_lock); + r->thread->proxy_buffer_memory_used -= r->blen + extra_space; + pthread_mutex_unlock(&r->thread->proxy_limit_lock); + flags = P_BE_FAIL_OOM; stop = true; break; diff --git a/proxy_request.c b/proxy_request.c index cd520b0..c72fed2 100644 --- a/proxy_request.c +++ b/proxy_request.c @@ -538,6 +538,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p) // second argument is optional, for building set requests. // TODO: append the \r\n for the VAL? int mcplib_request(lua_State *L) { + LIBEVENT_THREAD *t = PROXY_GET_THR(L); size_t len = 0; size_t vlen = 0; mcp_parser_t pr = {0}; @@ -586,6 +587,13 @@ int mcplib_request(lua_State *L) { proxy_lua_error(L, "failed to allocate value memory for request object"); } memcpy(rq->pr.vbuf, val, vlen); + // Note: Not enforcing the memory limit here as a bit of a choice: + // - if we're over the memory limit, it'll get caught very soon after + // this, but we won't be causing some lua to bail mid-flight, which is + // more graceful to the end user. + pthread_mutex_lock(&t->proxy_limit_lock); + t->proxy_buffer_memory_used += rq->pr.vlen; + pthread_mutex_unlock(&t->proxy_limit_lock); } // rq is now created, parsed, and on the stack. @@ -766,11 +774,15 @@ int mcplib_request_flag_token(lua_State *L) { } int mcplib_request_gc(lua_State *L) { + LIBEVENT_THREAD *t = PROXY_GET_THR(L); mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); // During nread c->item is the malloc'ed buffer. not yet put into // rq->buf - this gets freed because we've also set c->item_malloced if // the connection closes before finishing nread. if (rq->pr.vbuf != NULL) { + pthread_mutex_lock(&t->proxy_limit_lock); + t->proxy_buffer_memory_used -= rq->pr.vlen; + pthread_mutex_unlock(&t->proxy_limit_lock); free(rq->pr.vbuf); } diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua index bcea461..47360e0 100644 --- a/t/proxyconfig.lua +++ b/t/proxyconfig.lua @@ -39,6 +39,57 @@ function mcp_config_pools(old) test = mcp.pool({b1, b2, b3}, { iothread = false }) } return pools + elseif mode == "reqlimit" then + mcp.active_req_limit(4) + local b1 = mcp.backend('b1', '127.0.0.1', 11511) + local b2 = mcp.backend('b2', '127.0.0.1', 11512) + local b3 = mcp.backend('b3', '127.0.0.1', 11513) + + -- Direct all traffic at a single backend to simplify the test. + local pools = { + test = mcp.pool({b1}), + hold = mcp.pool({b2, b3}) + } + return pools + elseif mode == "noreqlimit" then + mcp.active_req_limit(0) + local b1 = mcp.backend('b1', '127.0.0.1', 11511) + local b2 = mcp.backend('b2', '127.0.0.1', 11512) + local b3 = mcp.backend('b3', '127.0.0.1', 11513) + + -- Direct all traffic at a single backend to simplify the test. + local pools = { + test = mcp.pool({b1}), + hold = mcp.pool({b2, b3}) + } + return pools + elseif mode == "buflimit" or mode == "buflimit2" then + mcp.buffer_memory_limit(20) + if mode == "buflimit2" then + mcp.buffer_memory_limit(200) + end + local b1 = mcp.backend('b1', '127.0.0.1', 11511) + local b2 = mcp.backend('b2', '127.0.0.1', 11512) + local b3 = mcp.backend('b3', '127.0.0.1', 11513) + + -- Direct all traffic at a single backend to simplify the test. + local pools = { + test = mcp.pool({b1}), + hold = mcp.pool({b2, b3}) + } + return pools + elseif mode == "nobuflimit" then + mcp.buffer_memory_limit(0) + local b1 = mcp.backend('b1', '127.0.0.1', 11511) + local b2 = mcp.backend('b2', '127.0.0.1', 11512) + local b3 = mcp.backend('b3', '127.0.0.1', 11513) + + -- Direct all traffic at a single backend to simplify the test. + local pools = { + test = mcp.pool({b1}), + hold = mcp.pool({b2, b3}) + } + return pools end end @@ -49,10 +100,7 @@ function mcp_config_routes(zones) if mode == "none" then mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end) mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end) - elseif mode == "start" or mode == "betable" then - mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) - mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) - elseif mode == "noiothread" then + else mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) end diff --git a/t/proxyconfig.t b/t/proxyconfig.t index 8c9332c..73b0169 100644 --- a/t/proxyconfig.t +++ b/t/proxyconfig.t @@ -60,7 +60,7 @@ sub wait_reload { } my @mocksrvs = (); -diag "making mock servers"; +#diag "making mock servers"; for my $port (11511, 11512, 11513) { my $srv = mock_server($port); ok(defined $srv, "mock server created"); @@ -267,7 +267,182 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); } +### +# diag "starting proxy again from scratch" +### + +# TODO: probably time to abstract the entire "start the server with mocked +# listeners" routine. +$watcher = undef; +write_modefile('return "reqlimit"'); +$p_srv->stop; +while (1) { + if ($p_srv->is_running) { + sleep 1; + } else { + ok(!$p_srv->is_running, "stopped proxy"); + last; + } +} + +@mocksrvs = (); +# re-create the mock servers so we get clean connects, the previous +# backends could be reconnecting still. +for my $port (11511, 11512, 11513) { + my $srv = mock_server($port); + ok(defined $srv, "mock server created"); + push(@mocksrvs, $srv); +} + +# Start up a clean server. +# Since limits are per worker thread, cut the worker threads down to 1 to ease +# testing. +$p_srv = new_memcached('-o proxy_config=./t/proxyconfig.lua -l 127.0.0.1 -t 1', 11510); +$ps = $p_srv->sock; +$ps->autoflush(1); + +{ + for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) { + my $be = $msrv->accept(); + $be->autoflush(1); + ok(defined $be, "mock backend created"); + push(@mbe, $be); + } + + for my $be (@mbe) { + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + } + + my $stats = mem_stats($ps, 'proxy'); + isnt($stats->{active_req_limit}, 0, "active request limit is set"); + + # active request limit is 4, pipeline 6 requests and ensure the last two + # get junked + my $cmd = ''; + for ('a', 'b', 'c', 'd', 'e', 'f') { + $cmd .= "mg /test/$_\r\n"; + } + print $ps $cmd; + + # Lua config only sends commands to the first backend for this test. + my $be = $mbe[0]; + for (1 .. 4) { + like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg"); + print $be "EN\r\n"; + } + my $s = IO::Select->new(); + $s->add($be); + my @readable = $s->can_read(0.25); + is(scalar @readable, 0, "no more pending reads on backend"); + + for (1 .. 4) { + is(scalar <$ps>, "EN\r\n", "received miss from backend"); + } + + is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back"); + is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors"); + + # Test turning the limit back off. + write_modefile('return "noreqlimit"'); + $watcher = $p_srv->new_sock; + print $watcher "watch proxyevents\n"; + is(<$watcher>, "OK\r\n", "watcher enabled"); + $p_srv->reload(); + wait_reload($watcher); + + $stats = mem_stats($ps, 'proxy'); + is($stats->{active_req_limit}, 0, "active request limit unset"); + + $cmd = ''; + for ('a', 'b', 'c', 'd', 'e', 'f') { + $cmd .= "mg /test/$_\r\n"; + } + print $ps $cmd; + for (1 .. 6) { + like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg"); + print $be "EN\r\n"; + } + for (1 .. 6) { + is(scalar <$ps>, "EN\r\n", "received miss from backend"); + } +} + +{ + # Test the buffer memory limiter. + # - limit per worker will be 1/t global limit + write_modefile('return "buflimit"'); + $p_srv->reload(); + wait_reload($watcher); + # Get a secondary client to trample limit. + my $sps = $p_srv->new_sock; + + my $stats = mem_stats($ps, 'proxy'); + isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set"); + + # - test SET commands with values, but nothing being read on backend + my $data = 'x' x 30000; + my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n"; + print $ps $cmd; + + my $be = $mbe[0]; + my $s = IO::Select->new; + $s->add($be); + # Wait until the backend has the request queued, then send the second one. + my @readable = $s->can_read(1); + print $sps $cmd; + + my $res; + is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms"); + $res = scalar <$be>; + print $be "HD\r\n"; + + # The second request should have been caught by the memory limiter + is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error"); + # FIXME: The original response cannot succeed because we cannot allocate + # enough memory to read the response from the backend. + # This is conveniently testing both paths right here but I would prefer + # something better. + # TODO: need to see if it's possible to surface an OOM from the backend + # handler, but that requires more rewiring. + is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded"); + + # Backend gets killed from a read OOM, so we need to re-establish. + $mbe[0] = $mocksrvs[0]->accept(); + $be = $mbe[0]; + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + like(<$watcher>, qr/error=outofmemory/, "OOM log line"); + + # Memory limits won't drop until the garbage collectors run, which + # requires a bit more push, so instead we raise the limits here so we can + # retry from the pre-existing connections to test swallow mode. + write_modefile('return "buflimit2"'); + $p_srv->reload(); + wait_reload($watcher); + + # Test sending another request down both pipes to ensure they still work. + $cmd = "ms foo 2 T30\r\nhi\r\n"; + print $ps $cmd; + is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom"); + is(scalar <$be>, "hi\r\n", "client works after oom"); + print $be "HD\r\n"; + is(scalar <$ps>, "HD\r\n", "client received resp after oom"); + print $sps $cmd; + is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom"); + is(scalar <$be>, "hi\r\n", "client works after oom"); + print $be "HD\r\n"; + is(scalar <$sps>, "HD\r\n", "client received resp after oom"); + + # - test GET commands but don't read back, large backend values + # - test disabling the limiter + # extended testing: + # - create errors while holding the buffers? +} + # TODO: +# check reqlimit/bwlimit counters # remove backends # do dead sockets close? # adding user stats |