From f012c9cb3516933b0fce54e12d358e697c24863d Mon Sep 17 00:00:00 2001 From: dormando Date: Thu, 12 Jan 2023 12:17:08 -0800 Subject: proxy: add mcp.internal(r) API local res = mcp.internal(r) - takes a request object and executes it against the proxy's internal cache instance. Experimental as of this commit. Needs more test coverage and benchmarking. --- proto_proxy.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) (limited to 'proto_proxy.c') diff --git a/proto_proxy.c b/proto_proxy.c index 1acc920..748ff12 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -332,6 +332,14 @@ void proxy_finalize_cb(io_pending_t *pending) { // Note: lua registry is the same for main thread or a coroutine. luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref); } + + if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) { + // TODO: lock once, worst case this hashes/locks twice. + if (p->miss) { + item_unlink(p->hdr_it); + } + item_remove(p->hdr_it); + } return; } @@ -548,6 +556,34 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con _set_noreply_mode(resp, r); if (r->status != MCMC_OK) { proxy_out_errstring(resp, "backend failure"); + } else if (r->cresp) { + mc_resp *tresp = r->cresp; + // The internal cache handler has created a resp we want to swap in + // here. It would be fastest to swap *resp's position in the + // link but if the set is deep this would instead be slow, so + // we copy over details from this temporary resp instead. + assert(c != NULL); + + // So far all we fill is the wbuf and some iov's? so just copy + // that + the UDP info? + memcpy(resp->wbuf, tresp->wbuf, tresp->iov[0].iov_len); + for (int x = 0; x < tresp->iovcnt; x++) { + resp->iov[x] = tresp->iov[x]; + } + resp->iovcnt = tresp->iovcnt; + resp->chunked_total = tresp->chunked_total; + resp->chunked_data_iov = tresp->chunked_data_iov; + // copy UDP headers... + resp->request_id = tresp->request_id; + resp->udp_sequence = tresp->udp_sequence; + resp->udp_total = tresp->udp_total; + resp->request_addr = tresp->request_addr; + resp->request_addr_size = tresp->request_addr_size; + resp->item = tresp->item; // will be populated if not extstore fetch + resp->skip = tresp->skip; + + // we let the mcp_resp gc handler free up tresp and any + // associated io_pending's of its own later. } else if (r->buf) { // response set from C. // FIXME (v2): write_and_free() ? it's a bit wrong for here. @@ -565,6 +601,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con } else { // Empty response: used for ascii multiget emulation. } + } else if (type == LUA_TSTRING) { // response is a raw string from lua. const char *s = lua_tolstring(Lc, 1, &rlen); @@ -603,6 +640,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); } + int res = 0; switch (yield_type) { case MCP_YIELD_AWAIT: mcplib_await_run(c, resp, Lc, coro_ref); @@ -611,6 +649,32 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con // TODO (v2): c only used for cache alloc? mcp_queue_io(c, resp, coro_ref, Lc); break; + case MCP_YIELD_LOCAL: + // stack should be: rq, res + res = mcplib_internal_run(Lc, c, resp, coro_ref); + if (res == 0) { + // stack should still be: rq, res + // TODO: turn this function into a for loop that re-runs on + // certain status codes, to avoid recursive depth here. + // + // FIXME: this dance with the coroutine reference is + // annoying. In this case we immediately resume, so no *io + // was generated, so we won't do the above coro_ref swap, so + // we'll try to take the coro_ref again and fail. + // The ref is only actually used in proxy_await + // It should instead be stashed on the top mc_resp object + // (ideally removing c->proxy_coro_ref at the same time) + // and unref'ed when the resp is cleaned up. + lua_rawgeti(c->thread->L, LUA_REGISTRYINDEX, coro_ref); + luaL_unref(c->thread->L, LUA_REGISTRYINDEX, coro_ref); + proxy_run_coroutine(Lc, resp, NULL, c); + } else if (res > 0) { + // internal run queued for extstore. + } else { + assert(res < 0); + proxy_out_errstring(resp, "bad request"); + } + break; default: abort(); } -- cgit v1.2.1