summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-03-04 21:50:28 -0800
committerdormando <dormando@rydia.net>2023-03-26 16:48:37 -0700
commit6c80728209acdb46629db8db3868d59d627ec33e (patch)
tree1628e77e962c0b0037ab225a444828d99ae3b8a5
parente0fa1fe46aeb9e405cb58234f6016b2c48a10777 (diff)
downloadmemcached-6c80728209acdb46629db8db3868d59d627ec33e.tar.gz
proxy: add request and buffer memory limits
Adds: mcp.active_req_limit(count) mcp.buffer_memory_limit(kilobytes) Divides by the number of worker threads and creates a per-worker-thread limit for the number of concurrent proxy requests, and how many bytes used specifically for value bytes. This does not represent total memory usage but will be close. Buffer memory for inbound set requests is not accounted for until after the object has been read from the socket; to be improved in a future update. This should be fine unless clients send just the SET request and then hang without sending further data. Limits should be live-adjustable via configuration reloads.
-rw-r--r--memcached.h4
-rw-r--r--proto_proxy.c91
-rw-r--r--proxy.h5
-rw-r--r--proxy_await.c2
-rw-r--r--proxy_config.c7
-rw-r--r--proxy_lua.c49
-rw-r--r--proxy_network.c17
-rw-r--r--proxy_request.c12
-rw-r--r--t/proxyconfig.lua56
-rw-r--r--t/proxyconfig.t177
10 files changed, 400 insertions, 20 deletions
diff --git a/memcached.h b/memcached.h
index 68d0ab0..ba3544d 100644
--- a/memcached.h
+++ b/memcached.h
@@ -725,6 +725,10 @@ typedef struct {
void *proxy_user_stats;
void *proxy_int_stats;
void *proxy_event_thread; // worker threads can also be proxy IO threads
+ pthread_mutex_t proxy_limit_lock;
+ uint64_t proxy_active_req_limit;
+ uint64_t proxy_buffer_memory_limit; // protected by limit_lock
+ uint64_t proxy_buffer_memory_used; // protected by limit_lock
uint32_t proxy_rng[4]; // fast per-thread rng for lua.
// TODO: add ctx object so we can attach to queue.
#endif
diff --git a/proto_proxy.c b/proto_proxy.c
index 952c897..b01064d 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
+ bool oom = false;
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
+ oom = true;
+ } else {
+ t->proxy_buffer_memory_used += len;
+ }
+ pthread_mutex_unlock(&t->proxy_limit_lock);
+ return oom;
+}
+
// see also: process_extstore_stats()
void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
if (arg == NULL) {
@@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
struct proxy_int_stats istats = {0};
+ uint64_t req_limit = 0;
+ uint64_t buffer_memory_limit = 0;
+ uint64_t buffer_memory_used = 0;
if (!arg) {
return;
}
proxy_ctx_t *ctx = arg;
STAT_L(ctx);
+ req_limit = ctx->active_req_limit;
+ buffer_memory_limit = ctx->buffer_memory_limit;
// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
@@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
}
}
WSTAT_UL(t);
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ buffer_memory_used += t->proxy_buffer_memory_used;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// return all of the user generated stats
@@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
}
+
STAT_UL(ctx);
+ if (buffer_memory_limit == UINT64_MAX) {
+ buffer_memory_limit = 0;
+ } else {
+ buffer_memory_limit *= settings.num_threads;
+ }
+ if (req_limit == UINT64_MAX) {
+ req_limit = 0;
+ } else {
+ req_limit *= settings.num_threads;
+ }
+
// return proxy counters
+ APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
+ APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
+ APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
@@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) {
pthread_cond_init(&ctx->manager_cond, NULL);
pthread_mutex_init(&ctx->stats_lock, NULL);
+ ctx->active_req_limit = UINT64_MAX;
+ ctx->buffer_memory_limit = UINT64_MAX;
+
// FIXME (v2): default defines.
ctx->tunables.tcp_keepalive = false;
ctx->tunables.backend_failure_limit = 3;
@@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
fprintf(stderr, "Failed to allocate proxy thread stats\n");
exit(EXIT_FAILURE);
}
+ pthread_mutex_init(&thr->proxy_limit_lock, NULL);
thr->proxy_ctx = ctx;
// Initialize the lua state.
@@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) {
void proxy_finalize_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+ if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
+ if (p->hdr_it) {
+ // TODO: lock once, worst case this hashes/locks twice.
+ if (p->miss) {
+ item_unlink(p->hdr_it);
+ }
+ item_remove(p->hdr_it);
+ }
+ }
+
// release our coroutine reference.
// TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist
// after a lua_resetthread(Lc) call.
@@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) {
luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
}
- if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) {
- // TODO: lock once, worst case this hashes/locks twice.
- if (p->miss) {
- item_unlink(p->hdr_it);
- }
- item_remove(p->hdr_it);
- }
return;
}
@@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) {
c->item_malloced = false;
luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
c->proxy_coro_ref = 0;
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
proxy_run_coroutine(Lc, c->resp, NULL, c);
@@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// associated io_pending's of its own later.
} else if (r->buf) {
// response set from C.
- // FIXME (v2): write_and_free() ? it's a bit wrong for here.
- resp->write_and_free = r->buf;
resp_add_iov(resp, r->buf, r->blen);
- r->buf = NULL;
} else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) {
// uservalue slot 1 is pre-created, so we get TNIL instead of
// TNONE when nothing was set into it.
@@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
LIBEVENT_THREAD *thr = c->thread;
struct proxy_hook *hooks = thr->proxy_hooks;
lua_State *L = thr->L;
+ proxy_ctx_t *ctx = thr->proxy_ctx;
mcp_parser_t pr = {0};
// Avoid doing resp_start() here, instead do it a bit later or as-needed.
@@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// Also batch the counts down this far so we can lock once for the active
// counter instead of twice.
struct proxy_int_stats *istats = c->thread->proxy_int_stats;
+ uint64_t active_reqs = 0;
WSTAT_L(c->thread);
istats->counters[pr.command]++;
c->thread->stats.proxy_conn_requests++;
c->thread->stats.proxy_req_active++;
+ active_reqs = c->thread->stats.proxy_req_active;
WSTAT_UL(c->thread);
+ if (active_reqs > ctx->active_req_limit) {
+ proxy_out_errstring(c->resp, "active request limit reached");
+ WSTAT_DECR(c->thread, proxy_req_active, 1);
+ if (pr.vlen != 0) {
+ c->sbytes = pr.vlen;
+ conn_set_state(c, conn_swallow);
+ }
+ return;
+ }
+
// start a coroutine.
// TODO (v2): This can pull a thread from a cache.
lua_newthread(L);
@@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// TODO (v2): lift this to a post-processor?
if (rq->pr.vlen != 0) {
- // relying on temporary malloc's not succumbing as poorly to
- // fragmentation.
- c->item = malloc(rq->pr.vlen);
+ c->item = NULL;
+ // Need to add the used memory later due to needing an extra callback
+ // handler on error during nread.
+ bool oom = proxy_bufmem_checkadd(c->thread, 0);
+
+ // relying on temporary malloc's not having fragmentation
+ if (!oom) {
+ c->item = malloc(rq->pr.vlen);
+ }
if (c->item == NULL) {
lua_settop(L, 0);
proxy_out_errstring(c->resp, "out of memory");
WSTAT_DECR(c->thread, proxy_req_active, 1);
+ c->sbytes = rq->pr.vlen;
+ conn_set_state(c, conn_swallow);
return;
}
c->item_malloced = true;
@@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
diff --git a/proxy.h b/proxy.h
index 0ab3b91..24a5ea0 100644
--- a/proxy.h
+++ b/proxy.h
@@ -195,6 +195,8 @@ typedef struct {
lua_State *proxy_state;
void *proxy_code;
proxy_event_thread_t *proxy_io_thread;
+ uint64_t active_req_limit; // max total in-flight requests
+ uint64_t buffer_memory_limit; // max bytes for send/receive buffers.
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
@@ -497,6 +499,9 @@ typedef struct {
mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread.
} mcp_pool_proxy_t;
+// utils
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len);
+
// networking interface
void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
void *proxy_event_thread(void *arg);
diff --git a/proxy_await.c b/proxy_await.c
index d5e40d3..12a8546 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -121,6 +121,8 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// reserve one uservalue for a lua-supplied response.
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
memset(r, 0, sizeof(mcp_resp_t));
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
diff --git a/proxy_config.c b/proxy_config.c
index e45711e..cca6b87 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -458,8 +458,15 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
tus->num_stats = us->num_stats;
pthread_mutex_unlock(&thr->stats.mutex);
}
+ // also grab the concurrent request limit
+ thr->proxy_active_req_limit = ctx->active_req_limit;
STAT_UL(ctx);
+ // update limit counter(s)
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
+
return 0;
}
diff --git a/proxy_lua.c b/proxy_lua.c
index 852624e..25209d0 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -82,11 +82,15 @@ static int mcplib_response_line(lua_State *L) {
}
static int mcplib_response_gc(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
// On error/similar we might be holding the read buffer.
// If the buf is handed off to mc_resp for return, this pointer is NULL
if (r->buf != NULL) {
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used -= r->blen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
free(r->buf);
}
@@ -872,6 +876,49 @@ static int mcplib_backend_read_timeout(lua_State *L) {
return 0;
}
+static int mcplib_active_req_limit(lua_State *L) {
+ proxy_ctx_t *ctx = PROXY_GET_CTX(L);
+ uint64_t limit = luaL_checkinteger(L, -1);
+
+ if (limit == 0) {
+ limit = UINT64_MAX;
+ } else {
+ // FIXME: global
+ int tcount = settings.num_threads;
+ // The actual limit is per-worker-thread, so divide it up.
+ if (limit > tcount * 2) {
+ limit /= tcount;
+ }
+ }
+
+ STAT_L(ctx);
+ ctx->active_req_limit = limit;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+// limit specified in kilobytes
+static int mcplib_buffer_memory_limit(lua_State *L) {
+ proxy_ctx_t *ctx = PROXY_GET_CTX(L);
+ uint64_t limit = luaL_checkinteger(L, -1);
+
+ if (limit == 0) {
+ limit = UINT64_MAX;
+ } else {
+ limit *= 1024;
+
+ int tcount = settings.num_threads;
+ if (limit > tcount * 2) {
+ limit /= tcount;
+ }
+
+ ctx->buffer_memory_limit = limit;
+ }
+
+ return 0;
+}
+
// mcp.attach(mcp.HOOK_NAME, function)
// fill hook structure: if lua function, use luaL_ref() to store the func
static int mcplib_attach(lua_State *L) {
@@ -1203,6 +1250,8 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"backend_read_timeout", mcplib_backend_read_timeout},
{"backend_failure_limit", mcplib_backend_failure_limit},
{"tcp_keepalive", mcplib_tcp_keepalive},
+ {"active_req_limit", mcplib_active_req_limit},
+ {"buffer_memory_limit", mcplib_buffer_memory_limit},
{NULL, NULL}
};
diff --git a/proxy_network.c b/proxy_network.c
index 189b6aa..2da41aa 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -1051,12 +1051,25 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
// r->resp.reslen + r->resp.vlen is the total length of the response.
// TODO (v2): need to associate a buffer with this response...
- // for now lets abuse write_and_free on mc_resp and simply malloc the
- // space we need, stuffing it into the resp object.
+ // for now we simply malloc, but reusable buffers should be used
r->blen = r->resp.reslen + r->resp.vlen;
+ {
+ bool oom = proxy_bufmem_checkadd(r->thread, r->blen + extra_space);
+
+ if (oom) {
+ flags = P_BE_FAIL_OOM;
+ stop = true;
+ break;
+ }
+ }
r->buf = malloc(r->blen + extra_space);
if (r->buf == NULL) {
+ // Enforce accounting.
+ pthread_mutex_lock(&r->thread->proxy_limit_lock);
+ r->thread->proxy_buffer_memory_used -= r->blen + extra_space;
+ pthread_mutex_unlock(&r->thread->proxy_limit_lock);
+
flags = P_BE_FAIL_OOM;
stop = true;
break;
diff --git a/proxy_request.c b/proxy_request.c
index cd520b0..c72fed2 100644
--- a/proxy_request.c
+++ b/proxy_request.c
@@ -538,6 +538,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p)
// second argument is optional, for building set requests.
// TODO: append the \r\n for the VAL?
int mcplib_request(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
size_t len = 0;
size_t vlen = 0;
mcp_parser_t pr = {0};
@@ -586,6 +587,13 @@ int mcplib_request(lua_State *L) {
proxy_lua_error(L, "failed to allocate value memory for request object");
}
memcpy(rq->pr.vbuf, val, vlen);
+ // Note: Not enforcing the memory limit here as a bit of a choice:
+ // - if we're over the memory limit, it'll get caught very soon after
+ // this, but we won't be causing some lua to bail mid-flight, which is
+ // more graceful to the end user.
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// rq is now created, parsed, and on the stack.
@@ -766,11 +774,15 @@ int mcplib_request_flag_token(lua_State *L) {
}
int mcplib_request_gc(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
// During nread c->item is the malloc'ed buffer. not yet put into
// rq->buf - this gets freed because we've also set c->item_malloced if
// the connection closes before finishing nread.
if (rq->pr.vbuf != NULL) {
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used -= rq->pr.vlen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
free(rq->pr.vbuf);
}
diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua
index bcea461..47360e0 100644
--- a/t/proxyconfig.lua
+++ b/t/proxyconfig.lua
@@ -39,6 +39,57 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3}, { iothread = false })
}
return pools
+ elseif mode == "reqlimit" then
+ mcp.active_req_limit(4)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "noreqlimit" then
+ mcp.active_req_limit(0)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "buflimit" or mode == "buflimit2" then
+ mcp.buffer_memory_limit(20)
+ if mode == "buflimit2" then
+ mcp.buffer_memory_limit(200)
+ end
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "nobuflimit" then
+ mcp.buffer_memory_limit(0)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
end
end
@@ -49,10 +100,7 @@ function mcp_config_routes(zones)
if mode == "none" then
mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end)
mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end)
- elseif mode == "start" or mode == "betable" then
- mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
- mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
- elseif mode == "noiothread" then
+ else
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
end
diff --git a/t/proxyconfig.t b/t/proxyconfig.t
index 8c9332c..73b0169 100644
--- a/t/proxyconfig.t
+++ b/t/proxyconfig.t
@@ -60,7 +60,7 @@ sub wait_reload {
}
my @mocksrvs = ();
-diag "making mock servers";
+#diag "making mock servers";
for my $port (11511, 11512, 11513) {
my $srv = mock_server($port);
ok(defined $srv, "mock server created");
@@ -267,7 +267,182 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
}
+###
+# diag "starting proxy again from scratch"
+###
+
+# TODO: probably time to abstract the entire "start the server with mocked
+# listeners" routine.
+$watcher = undef;
+write_modefile('return "reqlimit"');
+$p_srv->stop;
+while (1) {
+ if ($p_srv->is_running) {
+ sleep 1;
+ } else {
+ ok(!$p_srv->is_running, "stopped proxy");
+ last;
+ }
+}
+
+@mocksrvs = ();
+# re-create the mock servers so we get clean connects, the previous
+# backends could be reconnecting still.
+for my $port (11511, 11512, 11513) {
+ my $srv = mock_server($port);
+ ok(defined $srv, "mock server created");
+ push(@mocksrvs, $srv);
+}
+
+# Start up a clean server.
+# Since limits are per worker thread, cut the worker threads down to 1 to ease
+# testing.
+$p_srv = new_memcached('-o proxy_config=./t/proxyconfig.lua -l 127.0.0.1 -t 1', 11510);
+$ps = $p_srv->sock;
+$ps->autoflush(1);
+
+{
+ for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
+ my $be = $msrv->accept();
+ $be->autoflush(1);
+ ok(defined $be, "mock backend created");
+ push(@mbe, $be);
+ }
+
+ for my $be (@mbe) {
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ }
+
+ my $stats = mem_stats($ps, 'proxy');
+ isnt($stats->{active_req_limit}, 0, "active request limit is set");
+
+ # active request limit is 4, pipeline 6 requests and ensure the last two
+ # get junked
+ my $cmd = '';
+ for ('a', 'b', 'c', 'd', 'e', 'f') {
+ $cmd .= "mg /test/$_\r\n";
+ }
+ print $ps $cmd;
+
+ # Lua config only sends commands to the first backend for this test.
+ my $be = $mbe[0];
+ for (1 .. 4) {
+ like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
+ print $be "EN\r\n";
+ }
+ my $s = IO::Select->new();
+ $s->add($be);
+ my @readable = $s->can_read(0.25);
+ is(scalar @readable, 0, "no more pending reads on backend");
+
+ for (1 .. 4) {
+ is(scalar <$ps>, "EN\r\n", "received miss from backend");
+ }
+
+ is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back");
+ is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors");
+
+ # Test turning the limit back off.
+ write_modefile('return "noreqlimit"');
+ $watcher = $p_srv->new_sock;
+ print $watcher "watch proxyevents\n";
+ is(<$watcher>, "OK\r\n", "watcher enabled");
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ $stats = mem_stats($ps, 'proxy');
+ is($stats->{active_req_limit}, 0, "active request limit unset");
+
+ $cmd = '';
+ for ('a', 'b', 'c', 'd', 'e', 'f') {
+ $cmd .= "mg /test/$_\r\n";
+ }
+ print $ps $cmd;
+ for (1 .. 6) {
+ like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
+ print $be "EN\r\n";
+ }
+ for (1 .. 6) {
+ is(scalar <$ps>, "EN\r\n", "received miss from backend");
+ }
+}
+
+{
+ # Test the buffer memory limiter.
+ # - limit per worker will be 1/t global limit
+ write_modefile('return "buflimit"');
+ $p_srv->reload();
+ wait_reload($watcher);
+ # Get a secondary client to trample limit.
+ my $sps = $p_srv->new_sock;
+
+ my $stats = mem_stats($ps, 'proxy');
+ isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set");
+
+ # - test SET commands with values, but nothing being read on backend
+ my $data = 'x' x 30000;
+ my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n";
+ print $ps $cmd;
+
+ my $be = $mbe[0];
+ my $s = IO::Select->new;
+ $s->add($be);
+ # Wait until the backend has the request queued, then send the second one.
+ my @readable = $s->can_read(1);
+ print $sps $cmd;
+
+ my $res;
+ is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms");
+ $res = scalar <$be>;
+ print $be "HD\r\n";
+
+ # The second request should have been caught by the memory limiter
+ is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error");
+ # FIXME: The original response cannot succeed because we cannot allocate
+ # enough memory to read the response from the backend.
+ # This is conveniently testing both paths right here but I would prefer
+ # something better.
+ # TODO: need to see if it's possible to surface an OOM from the backend
+ # handler, but that requires more rewiring.
+ is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded");
+
+ # Backend gets killed from a read OOM, so we need to re-establish.
+ $mbe[0] = $mocksrvs[0]->accept();
+ $be = $mbe[0];
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ like(<$watcher>, qr/error=outofmemory/, "OOM log line");
+
+ # Memory limits won't drop until the garbage collectors run, which
+ # requires a bit more push, so instead we raise the limits here so we can
+ # retry from the pre-existing connections to test swallow mode.
+ write_modefile('return "buflimit2"');
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ # Test sending another request down both pipes to ensure they still work.
+ $cmd = "ms foo 2 T30\r\nhi\r\n";
+ print $ps $cmd;
+ is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
+ is(scalar <$be>, "hi\r\n", "client works after oom");
+ print $be "HD\r\n";
+ is(scalar <$ps>, "HD\r\n", "client received resp after oom");
+ print $sps $cmd;
+ is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
+ is(scalar <$be>, "hi\r\n", "client works after oom");
+ print $be "HD\r\n";
+ is(scalar <$sps>, "HD\r\n", "client received resp after oom");
+
+ # - test GET commands but don't read back, large backend values
+ # - test disabling the limiter
+ # extended testing:
+ # - create errors while holding the buffers?
+}
+
# TODO:
+# check reqlimit/bwlimit counters
# remove backends
# do dead sockets close?
# adding user stats