summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-03-06 15:21:39 -0800
committerdormando <dormando@rydia.net>2023-03-06 18:10:21 -0800
commitdead60dd43d1b7213ac046d237920eff44b3f68b (patch)
treeb514f6f6cd99325b49c7ddb71ce710eed3dc786e
parent9e740a9abbe72f49bef026c85464f68e0e46757c (diff)
downloadmemcached-dead60dd43d1b7213ac046d237920eff44b3f68b.tar.gz
proxy: mcp.internal fixes and tests
- Refcount leak on sets - Move the response elapsed timer back closer to when the response was processed as to not clobber the wrong IO object data - Restores error messages from set/ms - Adds start of unit tests Requests will look like they run a tiiiiny bit faster than they do, but I need to get the elapsed time there for a later change.
-rw-r--r--proto_proxy.c6
-rw-r--r--proxy_internal.c8
-rw-r--r--proxy_network.c7
-rw-r--r--t/proxyinternal.lua111
-rw-r--r--t/proxyinternal.t128
5 files changed, 252 insertions, 8 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 748ff12..f70d666 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -290,14 +290,8 @@ void proxy_return_cb(io_pending_t *pending) {
if (p->is_await) {
mcplib_await_return(p);
} else {
- struct timeval end;
lua_State *Lc = p->coro;
- // stamp the elapsed time into the response object.
- gettimeofday(&end, NULL);
- p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
- (end.tv_usec - p->client_resp->start.tv_usec);
-
// in order to resume we need to remove the objects that were
// originally returned
// what's currently on the top of the stack is what we want to keep.
diff --git a/proxy_internal.c b/proxy_internal.c
index a193c16..f0254f6 100644
--- a/proxy_internal.c
+++ b/proxy_internal.c
@@ -347,13 +347,13 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
if (it == 0) {
//enum store_item_type status;
if (! item_size_ok(nkey, flags, pr->vlen)) {
- //out_string(c, "SERVER_ERROR object too large for cache");
+ pout_string(resp, "SERVER_ERROR object too large for cache");
//status = TOO_LARGE;
pthread_mutex_lock(&t->stats.mutex);
t->stats.store_too_large++;
pthread_mutex_unlock(&t->stats.mutex);
} else {
- //out_of_memory(c, "SERVER_ERROR out of memory storing object");
+ pout_string(resp, "SERVER_ERROR out of memory storing object");
//status = NO_MEMORY;
pthread_mutex_lock(&t->stats.mutex);
t->stats.store_no_memory++;
@@ -407,6 +407,8 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
pout_string(resp, "SERVER_ERROR Unhandled storage type.");
}
+ // We don't need to hold a reference since the item was fully read.
+ item_remove(it);
}
static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) {
@@ -1214,6 +1216,8 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
resp->wbytes = p - resp->wbuf;
resp_add_iov(resp, resp->wbuf, resp->wbytes);
+ item_remove(it);
+
return;
error:
// Note: no errors possible after the item was successfully allocated.
diff --git a/proxy_network.c b/proxy_network.c
index 45ae98b..b25902d 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -966,6 +966,7 @@ static void _stop_timeout_event(mcp_backend_t *be) {
static int proxy_backend_drive_machine(mcp_backend_t *be) {
bool stop = false;
io_pending_proxy_t *p = NULL;
+ struct timeval end;
int flags = 0;
p = STAILQ_FIRST(&be->io_head);
@@ -1166,6 +1167,12 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
STAILQ_REMOVE_HEAD(&be->io_head, io_next);
be->depth--;
be->pending_read--;
+
+ // stamp the elapsed time into the response object.
+ gettimeofday(&end, NULL);
+ p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
+ (end.tv_usec - p->client_resp->start.tv_usec);
+
// have to do the q->count-- and == 0 and redispatch_conn()
// stuff here. The moment we call return_io here we
// don't own *p anymore.
diff --git a/t/proxyinternal.lua b/t/proxyinternal.lua
new file mode 100644
index 0000000..11e1370
--- /dev/null
+++ b/t/proxyinternal.lua
@@ -0,0 +1,111 @@
+function mcp_config_pools(oldss)
+ mcp.backend_read_timeout(0.5)
+ mcp.backend_connect_timeout(5)
+
+ local srv = mcp.backend
+
+ -- Single backend for zones to ease testing.
+ -- For purposes of this config the proxy is always "zone 1" (z1)
+ local b1 = srv('b1', '127.0.0.1', 11611)
+ local b2 = srv('b2', '127.0.0.1', 11612)
+ local b3 = srv('b3', '127.0.0.1', 11613)
+
+ local b1z = {b1}
+ local b2z = {b2}
+ local b3z = {b3}
+
+ -- convert the backends to pools.
+ -- as per a normal full config see simple.lua or t/startfile.lua
+ local zones = {
+ z1 = mcp.pool(b1z),
+ z2 = mcp.pool(b2z),
+ z3 = mcp.pool(b3z),
+ }
+
+ return zones
+end
+
+-- WORKER CODE:
+
+-- Using a very simple route handler only to allow testing the three
+-- workarounds in the same configuration file.
+function prefix_factory(pattern, list, default)
+ local p = pattern
+ local l = list
+ local d = default
+ return function(r)
+ local route = l[string.match(r:key(), p)]
+ if route == nil then
+ return d(r)
+ end
+ return route(r)
+ end
+end
+
+-- just for golfing the code in mcp_config_routes()
+function toproute_factory(pfx, label)
+ local err = "SERVER_ERROR no " .. label .. " route\r\n"
+ return prefix_factory("^/(%a+)/", pfx, function(r) return err end)
+end
+
+-- Do specialized testing based on the key prefix.
+function mcp_config_routes(zones)
+ local pfx_get = {}
+ local pfx_set = {}
+ local pfx_touch = {}
+ local pfx_gets = {}
+ local pfx_gat = {}
+ local pfx_gats = {}
+ local pfx_cas = {}
+ local pfx_add = {}
+ local pfx_delete = {}
+ local pfx_incr = {}
+ local pfx_decr = {}
+ local pfx_append = {}
+ local pfx_prepend = {}
+ local pfx_mg = {}
+ local pfx_ms = {}
+ local pfx_md = {}
+ local pfx_ma = {}
+
+ local basic = function(r)
+ return mcp.internal(r)
+ end
+
+ pfx_get["b"] = basic
+ pfx_set["b"] = basic
+ pfx_touch["b"] = basic
+ pfx_gets["b"] = basic
+ pfx_gat["b"] = basic
+ pfx_gats["b"] = basic
+ pfx_cas["b"] = basic
+ pfx_add["b"] = basic
+ pfx_delete["b"] = basic
+ pfx_incr["b"] = basic
+ pfx_decr["b"] = basic
+ pfx_append["b"] = basic
+ pfx_prepend["b"] = basic
+ pfx_mg["b"] = basic
+ pfx_ms["b"] = basic
+ pfx_md["b"] = basic
+ pfx_ma["b"] = basic
+
+ mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get"))
+ mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set"))
+ mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch"))
+ mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets"))
+ mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat"))
+ mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats"))
+ mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas"))
+ mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add"))
+ mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete"))
+ mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr"))
+ mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr"))
+ mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append"))
+ mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend"))
+ mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg"))
+ mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms"))
+ mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md"))
+ mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma"))
+
+end
diff --git a/t/proxyinternal.t b/t/proxyinternal.t
new file mode 100644
index 0000000..c9bb8ed
--- /dev/null
+++ b/t/proxyinternal.t
@@ -0,0 +1,128 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+use Test::More;
+use FindBin qw($Bin);
+use lib "$Bin/lib";
+use Carp qw(croak);
+use MemcachedTest;
+use IO::Socket qw(AF_INET SOCK_STREAM);
+use IO::Select;
+
+if (!supports_proxy()) {
+ plan skip_all => 'proxy not enabled';
+ exit 0;
+}
+
+# Don't want to write two distinct set of tests, and extstore is a default.
+if (!supports_extstore()) {
+ plan skip_all => 'extstore not enabled';
+ exit 0;
+}
+
+my $ext_path = "/tmp/proxyinternal.$$";
+
+# Set up some server sockets.
+sub mock_server {
+ my $port = shift;
+ my $srv = IO::Socket->new(
+ Domain => AF_INET,
+ Type => SOCK_STREAM,
+ Proto => 'tcp',
+ LocalHost => '127.0.0.1',
+ LocalPort => $port,
+ ReusePort => 1,
+ Listen => 5) || die "IO::Socket: $@";
+ return $srv;
+}
+
+# Put a version command down the pipe to ensure the socket is clear.
+# client version commands skip the proxy code
+sub check_version {
+ my $ps = shift;
+ print $ps "version\r\n";
+ like(<$ps>, qr/VERSION /, "version received");
+}
+
+my @mocksrvs = ();
+#diag "making mock servers";
+for my $port (11611, 11612, 11613) {
+ my $srv = mock_server($port);
+ ok(defined $srv, "mock server created");
+ push(@mocksrvs, $srv);
+}
+
+my $p_srv = new_memcached("-o proxy_config=./t/proxyinternal.lua,ext_item_size=500,ext_item_age=1,ext_path=$ext_path:64m,ext_max_sleep=100000 -l 127.0.0.1 -U 0", 11510);
+my $ps = $p_srv->sock;
+$ps->autoflush(1);
+
+# set up server backend sockets.
+# uncomment when needed. currently they get thrown out so this can hang.
+#my @mbe = ();
+#diag "accepting mock backends";
+#for my $msrv (@mocksrvs) {
+# my $be = $msrv->accept();
+# $be->autoflush(1);
+# ok(defined $be, "mock backend created");
+# push(@mbe, $be);
+#}
+
+#diag "validating backends";
+#for my $be (@mbe) {
+# like(<$be>, qr/version/, "received version command");
+# print $be "VERSION 1.0.0-mock\r\n";
+#}
+
+#diag "object too large"
+{
+ my $data = 'x' x 2000000;
+ print $ps "set /b/toolarge 0 0 2000000\r\n$data\r\n";
+ is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "set too large");
+
+ print $ps "ms /b/toolarge 2000000 T30\r\n$data\r\n";
+ is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "ms too large");
+}
+
+#diag "basic tests"
+{
+ print $ps "set /b/foo 0 0 2\r\nhi\r\n";
+ is(scalar <$ps>, "STORED\r\n", "int set");
+ print $ps "get /b/foo\r\n";
+ is(scalar <$ps>, "VALUE /b/foo 0 2\r\n", "get response");
+ is(scalar <$ps>, "hi\r\n", "get value");
+ is(scalar <$ps>, "END\r\n", "get END");
+}
+
+#diag "fetch from extstore"
+{
+ my $data = 'x' x 1000;
+ print $ps "set /b/ext 0 0 1000\r\n$data\r\n";
+ is(scalar <$ps>, "STORED\r\n", "int set for extstore");
+ sleep 3; # TODO: import wait_for_ext
+
+ print $ps "get /b/ext\r\n";
+ is(scalar <$ps>, "VALUE /b/ext 0 1000\r\n", "get response from extstore");
+ is(scalar <$ps>, "$data\r\n", "got data from extstore");
+ is(scalar <$ps>, "END\r\n", "get END");
+}
+
+#diag "flood memory"
+{
+ # ensure we don't have a basic reference counter leak
+ my $data = 'x' x 500000;
+ for (1 .. 200) {
+ print $ps "set /b/$_ 0 0 500000\r\n$data\r\n";
+ is(scalar <$ps>, "STORED\r\n", "flood set");
+ }
+ for (1 .. 200) {
+ print $ps "ms /b/$_ 500000 T30\r\n$data\r\n";
+ is(scalar <$ps>, "HD\r\n", "flood ms");
+ }
+}
+
+done_testing();
+
+END {
+ unlink $ext_path if $ext_path;
+}