summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.h4
-rw-r--r--proto_proxy.c91
-rw-r--r--proxy.h5
-rw-r--r--proxy_await.c2
-rw-r--r--proxy_config.c7
-rw-r--r--proxy_lua.c49
-rw-r--r--proxy_network.c17
-rw-r--r--proxy_request.c12
-rw-r--r--t/proxyconfig.lua56
-rw-r--r--t/proxyconfig.t177
10 files changed, 400 insertions, 20 deletions
diff --git a/memcached.h b/memcached.h
index 68d0ab0..ba3544d 100644
--- a/memcached.h
+++ b/memcached.h
@@ -725,6 +725,10 @@ typedef struct {
void *proxy_user_stats;
void *proxy_int_stats;
void *proxy_event_thread; // worker threads can also be proxy IO threads
+ pthread_mutex_t proxy_limit_lock;
+ uint64_t proxy_active_req_limit;
+ uint64_t proxy_buffer_memory_limit; // protected by limit_lock
+ uint64_t proxy_buffer_memory_used; // protected by limit_lock
uint32_t proxy_rng[4]; // fast per-thread rng for lua.
// TODO: add ctx object so we can attach to queue.
#endif
diff --git a/proto_proxy.c b/proto_proxy.c
index 952c897..b01064d 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -18,6 +18,18 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len) {
+ bool oom = false;
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ if (t->proxy_buffer_memory_used > t->proxy_buffer_memory_limit) {
+ oom = true;
+ } else {
+ t->proxy_buffer_memory_used += len;
+ }
+ pthread_mutex_unlock(&t->proxy_limit_lock);
+ return oom;
+}
+
// see also: process_extstore_stats()
void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
if (arg == NULL) {
@@ -37,12 +49,17 @@ void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
struct proxy_int_stats istats = {0};
+ uint64_t req_limit = 0;
+ uint64_t buffer_memory_limit = 0;
+ uint64_t buffer_memory_used = 0;
if (!arg) {
return;
}
proxy_ctx_t *ctx = arg;
STAT_L(ctx);
+ req_limit = ctx->active_req_limit;
+ buffer_memory_limit = ctx->buffer_memory_limit;
// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
@@ -65,6 +82,9 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
}
}
WSTAT_UL(t);
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ buffer_memory_used += t->proxy_buffer_memory_used;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// return all of the user generated stats
@@ -72,9 +92,24 @@ void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
}
+
STAT_UL(ctx);
+ if (buffer_memory_limit == UINT64_MAX) {
+ buffer_memory_limit = 0;
+ } else {
+ buffer_memory_limit *= settings.num_threads;
+ }
+ if (req_limit == UINT64_MAX) {
+ req_limit = 0;
+ } else {
+ req_limit *= settings.num_threads;
+ }
+
// return proxy counters
+ APPEND_STAT("active_req_limit", "%llu", (unsigned long long)req_limit);
+ APPEND_STAT("buffer_memory_limit", "%llu", (unsigned long long)buffer_memory_limit);
+ APPEND_STAT("buffer_memory_used", "%llu", (unsigned long long)buffer_memory_used);
APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
@@ -110,6 +145,9 @@ void *proxy_init(bool use_uring) {
pthread_cond_init(&ctx->manager_cond, NULL);
pthread_mutex_init(&ctx->stats_lock, NULL);
+ ctx->active_req_limit = UINT64_MAX;
+ ctx->buffer_memory_limit = UINT64_MAX;
+
// FIXME (v2): default defines.
ctx->tunables.tcp_keepalive = false;
ctx->tunables.backend_failure_limit = 3;
@@ -166,6 +204,7 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
fprintf(stderr, "Failed to allocate proxy thread stats\n");
exit(EXIT_FAILURE);
}
+ pthread_mutex_init(&thr->proxy_limit_lock, NULL);
thr->proxy_ctx = ctx;
// Initialize the lua state.
@@ -320,6 +359,16 @@ void proxy_return_cb(io_pending_t *pending) {
void proxy_finalize_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+ if (p->io_type == IO_PENDING_TYPE_EXTSTORE) {
+ if (p->hdr_it) {
+ // TODO: lock once, worst case this hashes/locks twice.
+ if (p->miss) {
+ item_unlink(p->hdr_it);
+ }
+ item_remove(p->hdr_it);
+ }
+ }
+
// release our coroutine reference.
// TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist
// after a lua_resetthread(Lc) call.
@@ -328,13 +377,6 @@ void proxy_finalize_cb(io_pending_t *pending) {
luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
}
- if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) {
- // TODO: lock once, worst case this hashes/locks twice.
- if (p->miss) {
- item_unlink(p->hdr_it);
- }
- item_remove(p->hdr_it);
- }
return;
}
@@ -441,6 +483,9 @@ void complete_nread_proxy(conn *c) {
c->item_malloced = false;
luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
c->proxy_coro_ref = 0;
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
proxy_run_coroutine(Lc, c->resp, NULL, c);
@@ -581,10 +626,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// associated io_pending's of its own later.
} else if (r->buf) {
// response set from C.
- // FIXME (v2): write_and_free() ? it's a bit wrong for here.
- resp->write_and_free = r->buf;
resp_add_iov(resp, r->buf, r->blen);
- r->buf = NULL;
} else if (lua_getiuservalue(Lc, 1, 1) != LUA_TNIL) {
// uservalue slot 1 is pre-created, so we get TNIL instead of
// TNONE when nothing was set into it.
@@ -689,6 +731,7 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
LIBEVENT_THREAD *thr = c->thread;
struct proxy_hook *hooks = thr->proxy_hooks;
lua_State *L = thr->L;
+ proxy_ctx_t *ctx = thr->proxy_ctx;
mcp_parser_t pr = {0};
// Avoid doing resp_start() here, instead do it a bit later or as-needed.
@@ -824,12 +867,24 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// Also batch the counts down this far so we can lock once for the active
// counter instead of twice.
struct proxy_int_stats *istats = c->thread->proxy_int_stats;
+ uint64_t active_reqs = 0;
WSTAT_L(c->thread);
istats->counters[pr.command]++;
c->thread->stats.proxy_conn_requests++;
c->thread->stats.proxy_req_active++;
+ active_reqs = c->thread->stats.proxy_req_active;
WSTAT_UL(c->thread);
+ if (active_reqs > ctx->active_req_limit) {
+ proxy_out_errstring(c->resp, "active request limit reached");
+ WSTAT_DECR(c->thread, proxy_req_active, 1);
+ if (pr.vlen != 0) {
+ c->sbytes = pr.vlen;
+ conn_set_state(c, conn_swallow);
+ }
+ return;
+ }
+
// start a coroutine.
// TODO (v2): This can pull a thread from a cache.
lua_newthread(L);
@@ -847,13 +902,21 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
// TODO (v2): lift this to a post-processor?
if (rq->pr.vlen != 0) {
- // relying on temporary malloc's not succumbing as poorly to
- // fragmentation.
- c->item = malloc(rq->pr.vlen);
+ c->item = NULL;
+ // Need to add the used memory later due to needing an extra callback
+ // handler on error during nread.
+ bool oom = proxy_bufmem_checkadd(c->thread, 0);
+
+ // relying on temporary malloc's not having fragmentation
+ if (!oom) {
+ c->item = malloc(rq->pr.vlen);
+ }
if (c->item == NULL) {
lua_settop(L, 0);
proxy_out_errstring(c->resp, "out of memory");
WSTAT_DECR(c->thread, proxy_req_active, 1);
+ c->sbytes = rq->pr.vlen;
+ conn_set_state(c, conn_swallow);
return;
}
c->item_malloced = true;
@@ -892,6 +955,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
diff --git a/proxy.h b/proxy.h
index 0ab3b91..24a5ea0 100644
--- a/proxy.h
+++ b/proxy.h
@@ -195,6 +195,8 @@ typedef struct {
lua_State *proxy_state;
void *proxy_code;
proxy_event_thread_t *proxy_io_thread;
+ uint64_t active_req_limit; // max total in-flight requests
+ uint64_t buffer_memory_limit; // max bytes for send/receive buffers.
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
@@ -497,6 +499,9 @@ typedef struct {
mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread.
} mcp_pool_proxy_t;
+// utils
+bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len);
+
// networking interface
void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
void *proxy_event_thread(void *arg);
diff --git a/proxy_await.c b/proxy_await.c
index d5e40d3..12a8546 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -121,6 +121,8 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// reserve one uservalue for a lua-supplied response.
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
memset(r, 0, sizeof(mcp_resp_t));
+ r->thread = c->thread;
+ assert(r->thread != NULL);
gettimeofday(&r->start, NULL);
// Set noreply mode.
// TODO (v2): the response "inherits" the request's noreply mode, which isn't
diff --git a/proxy_config.c b/proxy_config.c
index e45711e..cca6b87 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -458,8 +458,15 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
tus->num_stats = us->num_stats;
pthread_mutex_unlock(&thr->stats.mutex);
}
+ // also grab the concurrent request limit
+ thr->proxy_active_req_limit = ctx->active_req_limit;
STAT_UL(ctx);
+ // update limit counter(s)
+ pthread_mutex_lock(&thr->proxy_limit_lock);
+ thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit;
+ pthread_mutex_unlock(&thr->proxy_limit_lock);
+
return 0;
}
diff --git a/proxy_lua.c b/proxy_lua.c
index 852624e..25209d0 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -82,11 +82,15 @@ static int mcplib_response_line(lua_State *L) {
}
static int mcplib_response_gc(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
// On error/similar we might be holding the read buffer.
// If the buf is handed off to mc_resp for return, this pointer is NULL
if (r->buf != NULL) {
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used -= r->blen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
free(r->buf);
}
@@ -872,6 +876,49 @@ static int mcplib_backend_read_timeout(lua_State *L) {
return 0;
}
+static int mcplib_active_req_limit(lua_State *L) {
+ proxy_ctx_t *ctx = PROXY_GET_CTX(L);
+ uint64_t limit = luaL_checkinteger(L, -1);
+
+ if (limit == 0) {
+ limit = UINT64_MAX;
+ } else {
+ // FIXME: global
+ int tcount = settings.num_threads;
+ // The actual limit is per-worker-thread, so divide it up.
+ if (limit > tcount * 2) {
+ limit /= tcount;
+ }
+ }
+
+ STAT_L(ctx);
+ ctx->active_req_limit = limit;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+// limit specified in kilobytes
+static int mcplib_buffer_memory_limit(lua_State *L) {
+ proxy_ctx_t *ctx = PROXY_GET_CTX(L);
+ uint64_t limit = luaL_checkinteger(L, -1);
+
+ if (limit == 0) {
+ limit = UINT64_MAX;
+ } else {
+ limit *= 1024;
+
+ int tcount = settings.num_threads;
+ if (limit > tcount * 2) {
+ limit /= tcount;
+ }
+
+ ctx->buffer_memory_limit = limit;
+ }
+
+ return 0;
+}
+
// mcp.attach(mcp.HOOK_NAME, function)
// fill hook structure: if lua function, use luaL_ref() to store the func
static int mcplib_attach(lua_State *L) {
@@ -1203,6 +1250,8 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"backend_read_timeout", mcplib_backend_read_timeout},
{"backend_failure_limit", mcplib_backend_failure_limit},
{"tcp_keepalive", mcplib_tcp_keepalive},
+ {"active_req_limit", mcplib_active_req_limit},
+ {"buffer_memory_limit", mcplib_buffer_memory_limit},
{NULL, NULL}
};
diff --git a/proxy_network.c b/proxy_network.c
index 189b6aa..2da41aa 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -1051,12 +1051,25 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
// r->resp.reslen + r->resp.vlen is the total length of the response.
// TODO (v2): need to associate a buffer with this response...
- // for now lets abuse write_and_free on mc_resp and simply malloc the
- // space we need, stuffing it into the resp object.
+ // for now we simply malloc, but reusable buffers should be used
r->blen = r->resp.reslen + r->resp.vlen;
+ {
+ bool oom = proxy_bufmem_checkadd(r->thread, r->blen + extra_space);
+
+ if (oom) {
+ flags = P_BE_FAIL_OOM;
+ stop = true;
+ break;
+ }
+ }
r->buf = malloc(r->blen + extra_space);
if (r->buf == NULL) {
+ // Enforce accounting.
+ pthread_mutex_lock(&r->thread->proxy_limit_lock);
+ r->thread->proxy_buffer_memory_used -= r->blen + extra_space;
+ pthread_mutex_unlock(&r->thread->proxy_limit_lock);
+
flags = P_BE_FAIL_OOM;
stop = true;
break;
diff --git a/proxy_request.c b/proxy_request.c
index cd520b0..c72fed2 100644
--- a/proxy_request.c
+++ b/proxy_request.c
@@ -538,6 +538,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p)
// second argument is optional, for building set requests.
// TODO: append the \r\n for the VAL?
int mcplib_request(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
size_t len = 0;
size_t vlen = 0;
mcp_parser_t pr = {0};
@@ -586,6 +587,13 @@ int mcplib_request(lua_State *L) {
proxy_lua_error(L, "failed to allocate value memory for request object");
}
memcpy(rq->pr.vbuf, val, vlen);
+ // Note: Not enforcing the memory limit here as a bit of a choice:
+ // - if we're over the memory limit, it'll get caught very soon after
+ // this, but we won't be causing some lua to bail mid-flight, which is
+ // more graceful to the end user.
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used += rq->pr.vlen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
}
// rq is now created, parsed, and on the stack.
@@ -766,11 +774,15 @@ int mcplib_request_flag_token(lua_State *L) {
}
int mcplib_request_gc(lua_State *L) {
+ LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
// During nread c->item is the malloc'ed buffer. not yet put into
// rq->buf - this gets freed because we've also set c->item_malloced if
// the connection closes before finishing nread.
if (rq->pr.vbuf != NULL) {
+ pthread_mutex_lock(&t->proxy_limit_lock);
+ t->proxy_buffer_memory_used -= rq->pr.vlen;
+ pthread_mutex_unlock(&t->proxy_limit_lock);
free(rq->pr.vbuf);
}
diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua
index bcea461..47360e0 100644
--- a/t/proxyconfig.lua
+++ b/t/proxyconfig.lua
@@ -39,6 +39,57 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3}, { iothread = false })
}
return pools
+ elseif mode == "reqlimit" then
+ mcp.active_req_limit(4)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "noreqlimit" then
+ mcp.active_req_limit(0)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "buflimit" or mode == "buflimit2" then
+ mcp.buffer_memory_limit(20)
+ if mode == "buflimit2" then
+ mcp.buffer_memory_limit(200)
+ end
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
+ elseif mode == "nobuflimit" then
+ mcp.buffer_memory_limit(0)
+ local b1 = mcp.backend('b1', '127.0.0.1', 11511)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11512)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11513)
+
+ -- Direct all traffic at a single backend to simplify the test.
+ local pools = {
+ test = mcp.pool({b1}),
+ hold = mcp.pool({b2, b3})
+ }
+ return pools
end
end
@@ -49,10 +100,7 @@ function mcp_config_routes(zones)
if mode == "none" then
mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end)
mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end)
- elseif mode == "start" or mode == "betable" then
- mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
- mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
- elseif mode == "noiothread" then
+ else
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
end
diff --git a/t/proxyconfig.t b/t/proxyconfig.t
index 8c9332c..73b0169 100644
--- a/t/proxyconfig.t
+++ b/t/proxyconfig.t
@@ -60,7 +60,7 @@ sub wait_reload {
}
my @mocksrvs = ();
-diag "making mock servers";
+#diag "making mock servers";
for my $port (11511, 11512, 11513) {
my $srv = mock_server($port);
ok(defined $srv, "mock server created");
@@ -267,7 +267,182 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
}
+###
+# diag "starting proxy again from scratch"
+###
+
+# TODO: probably time to abstract the entire "start the server with mocked
+# listeners" routine.
+$watcher = undef;
+write_modefile('return "reqlimit"');
+$p_srv->stop;
+while (1) {
+ if ($p_srv->is_running) {
+ sleep 1;
+ } else {
+ ok(!$p_srv->is_running, "stopped proxy");
+ last;
+ }
+}
+
+@mocksrvs = ();
+# re-create the mock servers so we get clean connects, the previous
+# backends could be reconnecting still.
+for my $port (11511, 11512, 11513) {
+ my $srv = mock_server($port);
+ ok(defined $srv, "mock server created");
+ push(@mocksrvs, $srv);
+}
+
+# Start up a clean server.
+# Since limits are per worker thread, cut the worker threads down to 1 to ease
+# testing.
+$p_srv = new_memcached('-o proxy_config=./t/proxyconfig.lua -l 127.0.0.1 -t 1', 11510);
+$ps = $p_srv->sock;
+$ps->autoflush(1);
+
+{
+ for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
+ my $be = $msrv->accept();
+ $be->autoflush(1);
+ ok(defined $be, "mock backend created");
+ push(@mbe, $be);
+ }
+
+ for my $be (@mbe) {
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ }
+
+ my $stats = mem_stats($ps, 'proxy');
+ isnt($stats->{active_req_limit}, 0, "active request limit is set");
+
+ # active request limit is 4, pipeline 6 requests and ensure the last two
+ # get junked
+ my $cmd = '';
+ for ('a', 'b', 'c', 'd', 'e', 'f') {
+ $cmd .= "mg /test/$_\r\n";
+ }
+ print $ps $cmd;
+
+ # Lua config only sends commands to the first backend for this test.
+ my $be = $mbe[0];
+ for (1 .. 4) {
+ like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
+ print $be "EN\r\n";
+ }
+ my $s = IO::Select->new();
+ $s->add($be);
+ my @readable = $s->can_read(0.25);
+ is(scalar @readable, 0, "no more pending reads on backend");
+
+ for (1 .. 4) {
+ is(scalar <$ps>, "EN\r\n", "received miss from backend");
+ }
+
+ is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back");
+ is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors");
+
+ # Test turning the limit back off.
+ write_modefile('return "noreqlimit"');
+ $watcher = $p_srv->new_sock;
+ print $watcher "watch proxyevents\n";
+ is(<$watcher>, "OK\r\n", "watcher enabled");
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ $stats = mem_stats($ps, 'proxy');
+ is($stats->{active_req_limit}, 0, "active request limit unset");
+
+ $cmd = '';
+ for ('a', 'b', 'c', 'd', 'e', 'f') {
+ $cmd .= "mg /test/$_\r\n";
+ }
+ print $ps $cmd;
+ for (1 .. 6) {
+ like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
+ print $be "EN\r\n";
+ }
+ for (1 .. 6) {
+ is(scalar <$ps>, "EN\r\n", "received miss from backend");
+ }
+}
+
+{
+ # Test the buffer memory limiter.
+ # - limit per worker will be 1/t global limit
+ write_modefile('return "buflimit"');
+ $p_srv->reload();
+ wait_reload($watcher);
+ # Get a secondary client to trample limit.
+ my $sps = $p_srv->new_sock;
+
+ my $stats = mem_stats($ps, 'proxy');
+ isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set");
+
+ # - test SET commands with values, but nothing being read on backend
+ my $data = 'x' x 30000;
+ my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n";
+ print $ps $cmd;
+
+ my $be = $mbe[0];
+ my $s = IO::Select->new;
+ $s->add($be);
+ # Wait until the backend has the request queued, then send the second one.
+ my @readable = $s->can_read(1);
+ print $sps $cmd;
+
+ my $res;
+ is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms");
+ $res = scalar <$be>;
+ print $be "HD\r\n";
+
+ # The second request should have been caught by the memory limiter
+ is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error");
+ # FIXME: The original response cannot succeed because we cannot allocate
+ # enough memory to read the response from the backend.
+ # This is conveniently testing both paths right here but I would prefer
+ # something better.
+ # TODO: need to see if it's possible to surface an OOM from the backend
+ # handler, but that requires more rewiring.
+ is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded");
+
+ # Backend gets killed from a read OOM, so we need to re-establish.
+ $mbe[0] = $mocksrvs[0]->accept();
+ $be = $mbe[0];
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ like(<$watcher>, qr/error=outofmemory/, "OOM log line");
+
+ # Memory limits won't drop until the garbage collectors run, which
+ # requires a bit more push, so instead we raise the limits here so we can
+ # retry from the pre-existing connections to test swallow mode.
+ write_modefile('return "buflimit2"');
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ # Test sending another request down both pipes to ensure they still work.
+ $cmd = "ms foo 2 T30\r\nhi\r\n";
+ print $ps $cmd;
+ is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
+ is(scalar <$be>, "hi\r\n", "client works after oom");
+ print $be "HD\r\n";
+ is(scalar <$ps>, "HD\r\n", "client received resp after oom");
+ print $sps $cmd;
+ is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
+ is(scalar <$be>, "hi\r\n", "client works after oom");
+ print $be "HD\r\n";
+ is(scalar <$sps>, "HD\r\n", "client received resp after oom");
+
+ # - test GET commands but don't read back, large backend values
+ # - test disabling the limiter
+ # extended testing:
+ # - create errors while holding the buffers?
+}
+
# TODO:
+# check reqlimit/bwlimit counters
# remove backends
# do dead sockets close?
# adding user stats