From dead60dd43d1b7213ac046d237920eff44b3f68b Mon Sep 17 00:00:00 2001 From: dormando Date: Mon, 6 Mar 2023 15:21:39 -0800 Subject: proxy: mcp.internal fixes and tests - Refcount leak on sets - Move the response elapsed timer back closer to when the response was processed as to not clobber the wrong IO object data - Restores error messages from set/ms - Adds start of unit tests Requests will look like they run a tiiiiny bit faster than they do, but I need to get the elapsed time there for a later change. --- proto_proxy.c | 6 --- proxy_internal.c | 8 +++- proxy_network.c | 7 +++ t/proxyinternal.lua | 111 +++++++++++++++++++++++++++++++++++++++++++++ t/proxyinternal.t | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 252 insertions(+), 8 deletions(-) create mode 100644 t/proxyinternal.lua create mode 100644 t/proxyinternal.t diff --git a/proto_proxy.c b/proto_proxy.c index 748ff12..f70d666 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -290,14 +290,8 @@ void proxy_return_cb(io_pending_t *pending) { if (p->is_await) { mcplib_await_return(p); } else { - struct timeval end; lua_State *Lc = p->coro; - // stamp the elapsed time into the response object. - gettimeofday(&end, NULL); - p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + - (end.tv_usec - p->client_resp->start.tv_usec); - // in order to resume we need to remove the objects that were // originally returned // what's currently on the top of the stack is what we want to keep. diff --git a/proxy_internal.c b/proxy_internal.c index a193c16..f0254f6 100644 --- a/proxy_internal.c +++ b/proxy_internal.c @@ -347,13 +347,13 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re if (it == 0) { //enum store_item_type status; if (! item_size_ok(nkey, flags, pr->vlen)) { - //out_string(c, "SERVER_ERROR object too large for cache"); + pout_string(resp, "SERVER_ERROR object too large for cache"); //status = TOO_LARGE; pthread_mutex_lock(&t->stats.mutex); t->stats.store_too_large++; pthread_mutex_unlock(&t->stats.mutex); } else { - //out_of_memory(c, "SERVER_ERROR out of memory storing object"); + pout_string(resp, "SERVER_ERROR out of memory storing object"); //status = NO_MEMORY; pthread_mutex_lock(&t->stats.mutex); t->stats.store_no_memory++; @@ -407,6 +407,8 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re pout_string(resp, "SERVER_ERROR Unhandled storage type."); } + // We don't need to hold a reference since the item was fully read. + item_remove(it); } static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) { @@ -1214,6 +1216,8 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp resp->wbytes = p - resp->wbuf; resp_add_iov(resp, resp->wbuf, resp->wbytes); + item_remove(it); + return; error: // Note: no errors possible after the item was successfully allocated. diff --git a/proxy_network.c b/proxy_network.c index 45ae98b..b25902d 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -966,6 +966,7 @@ static void _stop_timeout_event(mcp_backend_t *be) { static int proxy_backend_drive_machine(mcp_backend_t *be) { bool stop = false; io_pending_proxy_t *p = NULL; + struct timeval end; int flags = 0; p = STAILQ_FIRST(&be->io_head); @@ -1166,6 +1167,12 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { STAILQ_REMOVE_HEAD(&be->io_head, io_next); be->depth--; be->pending_read--; + + // stamp the elapsed time into the response object. + gettimeofday(&end, NULL); + p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + + (end.tv_usec - p->client_resp->start.tv_usec); + // have to do the q->count-- and == 0 and redispatch_conn() // stuff here. The moment we call return_io here we // don't own *p anymore. diff --git a/t/proxyinternal.lua b/t/proxyinternal.lua new file mode 100644 index 0000000..11e1370 --- /dev/null +++ b/t/proxyinternal.lua @@ -0,0 +1,111 @@ +function mcp_config_pools(oldss) + mcp.backend_read_timeout(0.5) + mcp.backend_connect_timeout(5) + + local srv = mcp.backend + + -- Single backend for zones to ease testing. + -- For purposes of this config the proxy is always "zone 1" (z1) + local b1 = srv('b1', '127.0.0.1', 11611) + local b2 = srv('b2', '127.0.0.1', 11612) + local b3 = srv('b3', '127.0.0.1', 11613) + + local b1z = {b1} + local b2z = {b2} + local b3z = {b3} + + -- convert the backends to pools. + -- as per a normal full config see simple.lua or t/startfile.lua + local zones = { + z1 = mcp.pool(b1z), + z2 = mcp.pool(b2z), + z3 = mcp.pool(b3z), + } + + return zones +end + +-- WORKER CODE: + +-- Using a very simple route handler only to allow testing the three +-- workarounds in the same configuration file. +function prefix_factory(pattern, list, default) + local p = pattern + local l = list + local d = default + return function(r) + local route = l[string.match(r:key(), p)] + if route == nil then + return d(r) + end + return route(r) + end +end + +-- just for golfing the code in mcp_config_routes() +function toproute_factory(pfx, label) + local err = "SERVER_ERROR no " .. label .. " route\r\n" + return prefix_factory("^/(%a+)/", pfx, function(r) return err end) +end + +-- Do specialized testing based on the key prefix. +function mcp_config_routes(zones) + local pfx_get = {} + local pfx_set = {} + local pfx_touch = {} + local pfx_gets = {} + local pfx_gat = {} + local pfx_gats = {} + local pfx_cas = {} + local pfx_add = {} + local pfx_delete = {} + local pfx_incr = {} + local pfx_decr = {} + local pfx_append = {} + local pfx_prepend = {} + local pfx_mg = {} + local pfx_ms = {} + local pfx_md = {} + local pfx_ma = {} + + local basic = function(r) + return mcp.internal(r) + end + + pfx_get["b"] = basic + pfx_set["b"] = basic + pfx_touch["b"] = basic + pfx_gets["b"] = basic + pfx_gat["b"] = basic + pfx_gats["b"] = basic + pfx_cas["b"] = basic + pfx_add["b"] = basic + pfx_delete["b"] = basic + pfx_incr["b"] = basic + pfx_decr["b"] = basic + pfx_append["b"] = basic + pfx_prepend["b"] = basic + pfx_mg["b"] = basic + pfx_ms["b"] = basic + pfx_md["b"] = basic + pfx_ma["b"] = basic + + mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get")) + mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set")) + mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch")) + mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets")) + mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat")) + mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats")) + mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas")) + mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add")) + mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete")) + mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr")) + mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr")) + mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append")) + mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend")) + mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg")) + mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms")) + mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md")) + mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma")) + +end diff --git a/t/proxyinternal.t b/t/proxyinternal.t new file mode 100644 index 0000000..c9bb8ed --- /dev/null +++ b/t/proxyinternal.t @@ -0,0 +1,128 @@ +#!/usr/bin/env perl + +use strict; +use warnings; +use Test::More; +use FindBin qw($Bin); +use lib "$Bin/lib"; +use Carp qw(croak); +use MemcachedTest; +use IO::Socket qw(AF_INET SOCK_STREAM); +use IO::Select; + +if (!supports_proxy()) { + plan skip_all => 'proxy not enabled'; + exit 0; +} + +# Don't want to write two distinct set of tests, and extstore is a default. +if (!supports_extstore()) { + plan skip_all => 'extstore not enabled'; + exit 0; +} + +my $ext_path = "/tmp/proxyinternal.$$"; + +# Set up some server sockets. +sub mock_server { + my $port = shift; + my $srv = IO::Socket->new( + Domain => AF_INET, + Type => SOCK_STREAM, + Proto => 'tcp', + LocalHost => '127.0.0.1', + LocalPort => $port, + ReusePort => 1, + Listen => 5) || die "IO::Socket: $@"; + return $srv; +} + +# Put a version command down the pipe to ensure the socket is clear. +# client version commands skip the proxy code +sub check_version { + my $ps = shift; + print $ps "version\r\n"; + like(<$ps>, qr/VERSION /, "version received"); +} + +my @mocksrvs = (); +#diag "making mock servers"; +for my $port (11611, 11612, 11613) { + my $srv = mock_server($port); + ok(defined $srv, "mock server created"); + push(@mocksrvs, $srv); +} + +my $p_srv = new_memcached("-o proxy_config=./t/proxyinternal.lua,ext_item_size=500,ext_item_age=1,ext_path=$ext_path:64m,ext_max_sleep=100000 -l 127.0.0.1 -U 0", 11510); +my $ps = $p_srv->sock; +$ps->autoflush(1); + +# set up server backend sockets. +# uncomment when needed. currently they get thrown out so this can hang. +#my @mbe = (); +#diag "accepting mock backends"; +#for my $msrv (@mocksrvs) { +# my $be = $msrv->accept(); +# $be->autoflush(1); +# ok(defined $be, "mock backend created"); +# push(@mbe, $be); +#} + +#diag "validating backends"; +#for my $be (@mbe) { +# like(<$be>, qr/version/, "received version command"); +# print $be "VERSION 1.0.0-mock\r\n"; +#} + +#diag "object too large" +{ + my $data = 'x' x 2000000; + print $ps "set /b/toolarge 0 0 2000000\r\n$data\r\n"; + is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "set too large"); + + print $ps "ms /b/toolarge 2000000 T30\r\n$data\r\n"; + is(scalar <$ps>, "SERVER_ERROR object too large for cache\r\n", "ms too large"); +} + +#diag "basic tests" +{ + print $ps "set /b/foo 0 0 2\r\nhi\r\n"; + is(scalar <$ps>, "STORED\r\n", "int set"); + print $ps "get /b/foo\r\n"; + is(scalar <$ps>, "VALUE /b/foo 0 2\r\n", "get response"); + is(scalar <$ps>, "hi\r\n", "get value"); + is(scalar <$ps>, "END\r\n", "get END"); +} + +#diag "fetch from extstore" +{ + my $data = 'x' x 1000; + print $ps "set /b/ext 0 0 1000\r\n$data\r\n"; + is(scalar <$ps>, "STORED\r\n", "int set for extstore"); + sleep 3; # TODO: import wait_for_ext + + print $ps "get /b/ext\r\n"; + is(scalar <$ps>, "VALUE /b/ext 0 1000\r\n", "get response from extstore"); + is(scalar <$ps>, "$data\r\n", "got data from extstore"); + is(scalar <$ps>, "END\r\n", "get END"); +} + +#diag "flood memory" +{ + # ensure we don't have a basic reference counter leak + my $data = 'x' x 500000; + for (1 .. 200) { + print $ps "set /b/$_ 0 0 500000\r\n$data\r\n"; + is(scalar <$ps>, "STORED\r\n", "flood set"); + } + for (1 .. 200) { + print $ps "ms /b/$_ 500000 T30\r\n$data\r\n"; + is(scalar <$ps>, "HD\r\n", "flood ms"); + } +} + +done_testing(); + +END { + unlink $ext_path if $ext_path; +} -- cgit v1.2.1