summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-26 15:41:37 -0800
committerdormando <dormando@rydia.net>2023-01-27 19:08:47 -0800
commit0503e4fb125cee5d8711229ee07e561f9fb36bc3 (patch)
tree5366192967ff7edffbe1daa387b8711b00018842
parente56062321515728fdb5b9c80589f51db3357827c (diff)
downloadmemcached-0503e4fb125cee5d8711229ee07e561f9fb36bc3.tar.gz
proxy: add mcp.await_logerrors()
Logs any backgrounded requests that resulted in an error. Note that this may be a temporary interface, and could be deprecated in the future.
-rw-r--r--proxy.h1
-rw-r--r--proxy_await.c50
-rw-r--r--proxy_lua.c1
-rw-r--r--t/proxyunits.lua5
-rw-r--r--t/proxyunits.t34
5 files changed, 89 insertions, 2 deletions
diff --git a/proxy.h b/proxy.h
index 7c94e97..6ce6ec8 100644
--- a/proxy.h
+++ b/proxy.h
@@ -480,6 +480,7 @@ enum mcp_await_e {
AWAIT_BACKGROUND, // returns as soon as background jobs are dispatched
};
int mcplib_await(lua_State *L);
+int mcplib_await_logerrors(lua_State *L);
int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref);
int mcplib_await_return(io_pending_proxy_t *p);
diff --git a/proxy_await.c b/proxy_await.c
index 4192900..b0a4dee 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -9,8 +9,10 @@ typedef struct mcp_await_s {
int argtable_ref; // need to hold refs to any potential hash selectors
int restable_ref; // table of result objects
int coro_ref; // reference to parent coroutine
+ int detail_ref; // reference to detail string.
enum mcp_await_e type;
bool completed; // have we completed the parent coroutine or not
+ bool logerr; // create log_req entries for error responses
mcp_request_t *rq;
mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
} mcp_await_t;
@@ -23,12 +25,13 @@ typedef struct mcp_await_s {
// local restable = mcp.await(request, pools, num_wait)
// NOTE: need to hold onto the pool objects since those hold backend
// references. Here we just keep a reference to the argument table.
-int mcplib_await(lua_State *L) {
+static int _mcplib_await(lua_State *L, bool logerr) {
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
luaL_checktype(L, 2, LUA_TTABLE);
int n = 0; // length of table of pools
int wait_for = 0; // 0 means wait for all responses
enum mcp_await_e type = AWAIT_GOOD;
+ int detail_ref = 0;
lua_pushnil(L); // init table key
while (lua_next(L, 2) != 0) {
@@ -41,6 +44,11 @@ int mcplib_await(lua_State *L) {
proxy_lua_error(L, "mcp.await arguments must have at least one pool");
}
+ if (lua_isstring(L, 5)) {
+ // pops the detail string.
+ detail_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ }
+
if (lua_isnumber(L, 4)) {
type = lua_tointeger(L, 4);
lua_pop(L, 1);
@@ -86,13 +94,24 @@ int mcplib_await(lua_State *L) {
aw->argtable_ref = argtable_ref;
aw->rq = rq;
aw->req_ref = req_ref;
+ aw->detail_ref = detail_ref;
aw->type = type;
+ aw->logerr = logerr;
P_DEBUG("%s: about to yield [len: %d]\n", __func__, n);
lua_pushinteger(L, MCP_YIELD_AWAIT);
return lua_yield(L, 2);
}
+// default await, no logging.
+int mcplib_await(lua_State *L) {
+ return _mcplib_await(L, false);
+}
+
+int mcplib_await_logerrors(lua_State *L) {
+ return _mcplib_await(L, true);
+}
+
static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
@@ -367,7 +386,7 @@ int mcplib_await_return(io_pending_proxy_t *p) {
}
// note that post-completion, we stop gathering responses into the
- // resposne table... because it's already been returned.
+ // response table... because it's already been returned.
// So "valid" can only be true if also !completed
if (aw->pending == 0) {
if (!aw->completed) {
@@ -398,6 +417,30 @@ int mcplib_await_return(io_pending_proxy_t *p) {
p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
(end.tv_usec - p->client_resp->start.tv_usec);
+ // instructed to generate log_req entries for each failed request,
+ // this is useful to do here as these can be asynchronous.
+ // NOTE: this may be a temporary feature.
+ if (aw->logerr && p->client_resp->status != MCMC_OK && aw->completed) {
+ size_t dlen = 0;
+ const char *detail = NULL;
+ logger *l = p->thread->l;
+ // only process logs if someone is listening.
+ if (l->eflags & LOG_PROXYREQS) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->req_ref);
+ mcp_request_t *rq = lua_touserdata(L, -1);
+ lua_pop(L, 1); // references still held, just clearing stack.
+ mcp_resp_t *rs = p->client_resp;
+
+ if (aw->detail_ref) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->detail_ref);
+ detail = luaL_tolstring(L, -1, &dlen);
+ lua_pop(L, 1);
+ }
+
+ logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, rs->elapsed, rs->resp.type, rs->resp.code, rs->status, detail, dlen, rs->be_name, rs->be_port);
+ }
+ }
+
luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
}
// our await_ref is shared, so we don't need to release it.
@@ -430,6 +473,9 @@ int mcplib_await_return(io_pending_proxy_t *p) {
luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref);
luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref);
luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref);
+ if (aw->detail_ref) {
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->detail_ref);
+ }
WSTAT_DECR(p->thread, proxy_await_active, 1);
}
diff --git a/proxy_lua.c b/proxy_lua.c
index 91b0840..14f2564 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -1009,6 +1009,7 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"add_stat", mcplib_add_stat},
{"stat", mcplib_stat},
{"await", mcplib_await},
+ {"await_logerrors", mcplib_await_logerrors},
{"log", mcplib_log},
{"log_req", mcplib_log_req},
{"log_reqsample", mcplib_log_reqsample},
diff --git a/t/proxyunits.lua b/t/proxyunits.lua
index 6b492b4..411950d 100644
--- a/t/proxyunits.lua
+++ b/t/proxyunits.lua
@@ -213,6 +213,11 @@ function mcp_config_routes(zones)
return "VALUE " .. r:key() .. " 0 " .. vlen .. "\r\n" .. count .. "\r\nEND\r\n"
end
+ pfx_set["awaitlogerr"] = function(r)
+ local rtable = mcp.await_logerrors(r, { zones.z1, zones.z2, zones.z3 }, 1, mcp.AWAIT_FASTGOOD, "write_failed")
+ return rtable[1]
+ end
+
mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get"))
mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set"))
mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch"))
diff --git a/t/proxyunits.t b/t/proxyunits.t
index 0914b98..da3447f 100644
--- a/t/proxyunits.t
+++ b/t/proxyunits.t
@@ -512,5 +512,39 @@ check_version($ps);
# test hitting mcp.await() then a pool normally
}
+{
+ my $watcher = $p_srv->new_sock;
+ print $watcher "watch proxyreqs\n";
+ is(<$watcher>, "OK\r\n", "watcher enabled");
+
+ # test logging errors from special await.
+ my $key = "/awaitlogerr/a";
+ my $cmd = "set $key 0 0 5\r\n";
+ print $ps $cmd . "hello\r\n";
+ # respond from the first backend normally, then other two with errors.
+ my $be = $mbe[0];
+ is(scalar <$be>, $cmd, "await_logerrors backend req");
+ is(scalar <$be>, "hello\r\n", "await_logerrors set payload");
+ print $be "STORED\r\n";
+
+ is(scalar <$ps>, "STORED\r\n", "block until await responded");
+ # now ship some errors.
+ for my $be ($mbe[1], $mbe[2]) {
+ is(scalar <$be>, $cmd, "await_logerrors backend req");
+ is(scalar <$be>, "hello\r\n", "await_logerrors set payload");
+ print $be "SERVER_ERROR out of memory\r\n";
+ }
+ like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 1");
+ like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=\d+ code=\d+ status=-1 be=(\S+) detail=write_failed req=set \/awaitlogerr\/a/, "await_logerrors log entry 2");
+
+ # Repeat the logreqtest to ensure we only got the log lines we expected.
+ $cmd = "get /logreqtest/a\r\n";
+ print $ps $cmd;
+ is(scalar <$be>, $cmd, "got passthru for log");
+ print $be "END\r\n";
+ is(scalar <$ps>, "END\r\n", "got END from log test");
+ like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_req elapsed=\d+ type=105 code=17 status=0 be=127.0.0.1:11411 detail=logreqtest req=get \/logreqtest\/a/, "found request log entry");
+}
+
check_version($ps);
done_testing();