summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-12 12:17:08 -0800
committerdormando <dormando@rydia.net>2023-02-25 15:07:01 -0800
commitf012c9cb3516933b0fce54e12d358e697c24863d (patch)
tree133a41d279be9895c5258b830f6705ea9fbf1268 /proto_proxy.c
parent6442017c545a2a5ad076697b8695cd64bd32b542 (diff)
downloadmemcached-f012c9cb3516933b0fce54e12d358e697c24863d.tar.gz
proxy: add mcp.internal(r) API
local res = mcp.internal(r) - takes a request object and executes it against the proxy's internal cache instance. Experimental as of this commit. Needs more test coverage and benchmarking.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c64
1 files changed, 64 insertions, 0 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 1acc920..748ff12 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -332,6 +332,14 @@ void proxy_finalize_cb(io_pending_t *pending) {
// Note: lua registry is the same for main thread or a coroutine.
luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
}
+
+ if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) {
+ // TODO: lock once, worst case this hashes/locks twice.
+ if (p->miss) {
+ item_unlink(p->hdr_it);
+ }
+ item_remove(p->hdr_it);
+ }
return;
}
@@ -548,6 +556,34 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
_set_noreply_mode(resp, r);
if (r->status != MCMC_OK) {
proxy_out_errstring(resp, "backend failure");
+ } else if (r->cresp) {
+ mc_resp *tresp = r->cresp;
+ // The internal cache handler has created a resp we want to swap in
+ // here. It would be fastest to swap *resp's position in the
+ // link but if the set is deep this would instead be slow, so
+ // we copy over details from this temporary resp instead.
+ assert(c != NULL);
+
+ // So far all we fill is the wbuf and some iov's? so just copy
+ // that + the UDP info?
+ memcpy(resp->wbuf, tresp->wbuf, tresp->iov[0].iov_len);
+ for (int x = 0; x < tresp->iovcnt; x++) {
+ resp->iov[x] = tresp->iov[x];
+ }
+ resp->iovcnt = tresp->iovcnt;
+ resp->chunked_total = tresp->chunked_total;
+ resp->chunked_data_iov = tresp->chunked_data_iov;
+ // copy UDP headers...
+ resp->request_id = tresp->request_id;
+ resp->udp_sequence = tresp->udp_sequence;
+ resp->udp_total = tresp->udp_total;
+ resp->request_addr = tresp->request_addr;
+ resp->request_addr_size = tresp->request_addr_size;
+ resp->item = tresp->item; // will be populated if not extstore fetch
+ resp->skip = tresp->skip;
+
+ // we let the mcp_resp gc handler free up tresp and any
+ // associated io_pending's of its own later.
} else if (r->buf) {
// response set from C.
// FIXME (v2): write_and_free() ? it's a bit wrong for here.
@@ -565,6 +601,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else {
// Empty response: used for ascii multiget emulation.
}
+
} else if (type == LUA_TSTRING) {
// response is a raw string from lua.
const char *s = lua_tolstring(Lc, 1, &rlen);
@@ -603,6 +640,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
}
+ int res = 0;
switch (yield_type) {
case MCP_YIELD_AWAIT:
mcplib_await_run(c, resp, Lc, coro_ref);
@@ -611,6 +649,32 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// TODO (v2): c only used for cache alloc?
mcp_queue_io(c, resp, coro_ref, Lc);
break;
+ case MCP_YIELD_LOCAL:
+ // stack should be: rq, res
+ res = mcplib_internal_run(Lc, c, resp, coro_ref);
+ if (res == 0) {
+ // stack should still be: rq, res
+ // TODO: turn this function into a for loop that re-runs on
+ // certain status codes, to avoid recursive depth here.
+ //
+ // FIXME: this dance with the coroutine reference is
+ // annoying. In this case we immediately resume, so no *io
+ // was generated, so we won't do the above coro_ref swap, so
+ // we'll try to take the coro_ref again and fail.
+ // The ref is only actually used in proxy_await
+ // It should instead be stashed on the top mc_resp object
+ // (ideally removing c->proxy_coro_ref at the same time)
+ // and unref'ed when the resp is cleaned up.
+ lua_rawgeti(c->thread->L, LUA_REGISTRYINDEX, coro_ref);
+ luaL_unref(c->thread->L, LUA_REGISTRYINDEX, coro_ref);
+ proxy_run_coroutine(Lc, resp, NULL, c);
+ } else if (res > 0) {
+ // internal run queued for extstore.
+ } else {
+ assert(res < 0);
+ proxy_out_errstring(resp, "bad request");
+ }
+ break;
default:
abort();
}