diff options
author | dormando <dormando@rydia.net> | 2023-01-26 15:41:37 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-01-27 19:08:47 -0800 |
commit | 0503e4fb125cee5d8711229ee07e561f9fb36bc3 (patch) | |
tree | 5366192967ff7edffbe1daa387b8711b00018842 | |
parent | e56062321515728fdb5b9c80589f51db3357827c (diff) | |
download | memcached-0503e4fb125cee5d8711229ee07e561f9fb36bc3.tar.gz |
proxy: add mcp.await_logerrors()
Logs any backgrounded requests that resulted in an error.
Note that this may be a temporary interface, and could be deprecated in
the future.
-rw-r--r-- | proxy.h | 1 | ||||
-rw-r--r-- | proxy_await.c | 50 | ||||
-rw-r--r-- | proxy_lua.c | 1 | ||||
-rw-r--r-- | t/proxyunits.lua | 5 | ||||
-rw-r--r-- | t/proxyunits.t | 34 |
5 files changed, 89 insertions, 2 deletions
@@ -480,6 +480,7 @@ enum mcp_await_e { AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched }; int mcplib_await(lua_State *L); +int mcplib_await_logerrors(lua_State *L); int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref); int mcplib_await_return(io_pending_proxy_t *p); diff --git a/proxy_await.c b/proxy_await.c index 4192900..b0a4dee 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -9,8 +9,10 @@ typedef struct mcp_await_s { int argtable_ref; // need to hold refs to any potential hash selectors int restable_ref; // table of result objects int coro_ref; // reference to parent coroutine + int detail_ref; // reference to detail string. enum mcp_await_e type; bool completed; // have we completed the parent coroutine or not + bool logerr; // create log_req entries for error responses mcp_request_t *rq; mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop) } mcp_await_t; @@ -23,12 +25,13 @@ typedef struct mcp_await_s { // local restable = mcp.await(request, pools, num_wait) // NOTE: need to hold onto the pool objects since those hold backend // references. Here we just keep a reference to the argument table. -int mcplib_await(lua_State *L) { +static int _mcplib_await(lua_State *L, bool logerr) { mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); luaL_checktype(L, 2, LUA_TTABLE); int n = 0; // length of table of pools int wait_for = 0; // 0 means wait for all responses enum mcp_await_e type = AWAIT_GOOD; + int detail_ref = 0; lua_pushnil(L); // init table key while (lua_next(L, 2) != 0) { @@ -41,6 +44,11 @@ int mcplib_await(lua_State *L) { proxy_lua_error(L, "mcp.await arguments must have at least one pool"); } + if (lua_isstring(L, 5)) { + // pops the detail string. + detail_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } + if (lua_isnumber(L, 4)) { type = lua_tointeger(L, 4); lua_pop(L, 1); @@ -86,13 +94,24 @@ int mcplib_await(lua_State *L) { aw->argtable_ref = argtable_ref; aw->rq = rq; aw->req_ref = req_ref; + aw->detail_ref = detail_ref; aw->type = type; + aw->logerr = logerr; P_DEBUG("%s: about to yield [len: %d]\n", __func__, n); lua_pushinteger(L, MCP_YIELD_AWAIT); return lua_yield(L, 2); } +// default await, no logging. +int mcplib_await(lua_State *L) { + return _mcplib_await(L, false); +} + +int mcplib_await_logerrors(lua_State *L) { + return _mcplib_await(L, true); +} + static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) { io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); @@ -367,7 +386,7 @@ int mcplib_await_return(io_pending_proxy_t *p) { } // note that post-completion, we stop gathering responses into the - // resposne table... because it's already been returned. + // response table... because it's already been returned. // So "valid" can only be true if also !completed if (aw->pending == 0) { if (!aw->completed) { @@ -398,6 +417,30 @@ int mcplib_await_return(io_pending_proxy_t *p) { p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + (end.tv_usec - p->client_resp->start.tv_usec); + // instructed to generate log_req entries for each failed request, + // this is useful to do here as these can be asynchronous. + // NOTE: this may be a temporary feature. + if (aw->logerr && p->client_resp->status != MCMC_OK && aw->completed) { + size_t dlen = 0; + const char *detail = NULL; + logger *l = p->thread->l; + // only process logs if someone is listening. + if (l->eflags & LOG_PROXYREQS) { + lua_rawgeti(L, LUA_REGISTRYINDEX, aw->req_ref); + mcp_request_t *rq = lua_touserdata(L, -1); + lua_pop(L, 1); // references still held, just clearing stack. + mcp_resp_t *rs = p->client_resp; + + if (aw->detail_ref) { + lua_rawgeti(L, LUA_REGISTRYINDEX, aw->detail_ref); + detail = luaL_tolstring(L, -1, &dlen); + lua_pop(L, 1); + } + + logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, rs->elapsed, rs->resp.type, rs->resp.code, rs->status, detail, dlen, rs->be_name, rs->be_port); + } + } + luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); } // our await_ref is shared, so we don't need to release it. @@ -430,6 +473,9 @@ int mcplib_await_return(io_pending_proxy_t *p) { luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref); luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref); luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref); + if (aw->detail_ref) { + luaL_unref(L, LUA_REGISTRYINDEX, aw->detail_ref); + } WSTAT_DECR(p->thread, proxy_await_active, 1); } diff --git a/proxy_lua.c b/proxy_lua.c index 91b0840..14f2564 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -1009,6 +1009,7 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { {"add_stat", mcplib_add_stat}, {"stat", mcplib_stat}, {"await", mcplib_await}, + {"await_logerrors", mcplib_await_logerrors}, {"log", mcplib_log}, {"log_req", mcplib_log_req}, {"log_reqsample", mcplib_log_reqsample}, diff --git a/t/proxyunits.lua b/t/proxyunits.lua index 6b492b4..411950d 100644 --- a/t/proxyunits.lua +++ b/t/proxyunits.lua @@ -213,6 +213,11 @@ function mcp_config_routes(zones) return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n" end + pfx_set["awaitlogerr"] = function(r) + local rtable = mcp.await_logerrors(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_FASTGOOD, "write_failed") + return rtable[1] + end + mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get")) mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set")) mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch")) diff --git a/t/proxyunits.t b/t/proxyunits.t index 0914b98..da3447f 100644 --- a/t/proxyunits.t +++ b/t/proxyunits.t @@ -512,5 +512,39 @@ check_version($ps); # test hitting mcp.await() then a pool normally } +{ + my $watcher = $p_srv->new_sock; + print $watcher "watch proxyreqs\n"; + is(<$watcher>, "OK\r\n", "watcher enabled"); + + # test logging errors from special await. + my $key = "/awaitlogerr/a"; + my $cmd = "set $key 0 0 5\r\n"; + print $ps $cmd . "hello\r\n"; + # respond from the first backend normally, then other two with errors. + my $be = $mbe[0]; + is(scalar <$be>, $cmd, "await_logerrors backend req"); + is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); + print $be "STORED\r\n"; + + is(scalar <$ps>, "STORED\r\n", "block until await responded"); + # now ship some errors. + for my $be ($mbe[1], $mbe[2]) { + is(scalar <$be>, $cmd, "await_logerrors backend req"); + is(scalar <$be>, "hello\r\n", "await_logerrors set payload"); + print $be "SERVER_ERROR out of memory\r\n"; + } + like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 1"); + like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 2"); + + # Repeat the logreqtest to ensure we only got the log lines we expected. + $cmd = "get /logreqtest/a\r\n"; + print $ps $cmd; + is(scalar <$be>, $cmd, "got passthru for log"); + print $be "END\r\n"; + is(scalar <$ps>, "END\r\n", "got END from log test"); + like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 be=127.0.0.1:11411 detail=logreqtest req=get \/logreqtest\/a/, "found request log entry"); +} + check_version($ps); done_testing(); |