From f012c9cb3516933b0fce54e12d358e697c24863d Mon Sep 17 00:00:00 2001 From: dormando Date: Thu, 12 Jan 2023 12:17:08 -0800 Subject: proxy: add mcp.internal(r) API local res = mcp.internal(r) - takes a request object and executes it against the proxy's internal cache instance. Experimental as of this commit. Needs more test coverage and benchmarking. --- Makefile.am | 1 + proto_proxy.c | 64 +++ proxy.h | 54 +- proxy_internal.c | 1694 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ proxy_lua.c | 8 + 5 files changed, 1807 insertions(+), 14 deletions(-) create mode 100644 proxy_internal.c diff --git a/Makefile.am b/Makefile.am index 2910b46..43315cf 100644 --- a/Makefile.am +++ b/Makefile.am @@ -62,6 +62,7 @@ memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h \ proxy_jump_hash.c proxy_request.c \ proxy_network.c proxy_lua.c \ proxy_config.c proxy_ring_hash.c \ + proxy_internal.c \ md5.c md5.h endif diff --git a/proto_proxy.c b/proto_proxy.c index 1acc920..748ff12 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -332,6 +332,14 @@ void proxy_finalize_cb(io_pending_t *pending) { // Note: lua registry is the same for main thread or a coroutine. luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref); } + + if (p->io_type == IO_PENDING_TYPE_EXTSTORE && p->hdr_it) { + // TODO: lock once, worst case this hashes/locks twice. + if (p->miss) { + item_unlink(p->hdr_it); + } + item_remove(p->hdr_it); + } return; } @@ -548,6 +556,34 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con _set_noreply_mode(resp, r); if (r->status != MCMC_OK) { proxy_out_errstring(resp, "backend failure"); + } else if (r->cresp) { + mc_resp *tresp = r->cresp; + // The internal cache handler has created a resp we want to swap in + // here. It would be fastest to swap *resp's position in the + // link but if the set is deep this would instead be slow, so + // we copy over details from this temporary resp instead. + assert(c != NULL); + + // So far all we fill is the wbuf and some iov's? so just copy + // that + the UDP info? + memcpy(resp->wbuf, tresp->wbuf, tresp->iov[0].iov_len); + for (int x = 0; x < tresp->iovcnt; x++) { + resp->iov[x] = tresp->iov[x]; + } + resp->iovcnt = tresp->iovcnt; + resp->chunked_total = tresp->chunked_total; + resp->chunked_data_iov = tresp->chunked_data_iov; + // copy UDP headers... + resp->request_id = tresp->request_id; + resp->udp_sequence = tresp->udp_sequence; + resp->udp_total = tresp->udp_total; + resp->request_addr = tresp->request_addr; + resp->request_addr_size = tresp->request_addr_size; + resp->item = tresp->item; // will be populated if not extstore fetch + resp->skip = tresp->skip; + + // we let the mcp_resp gc handler free up tresp and any + // associated io_pending's of its own later. } else if (r->buf) { // response set from C. // FIXME (v2): write_and_free() ? it's a bit wrong for here. @@ -565,6 +601,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con } else { // Empty response: used for ascii multiget emulation. } + } else if (type == LUA_TSTRING) { // response is a raw string from lua. const char *s = lua_tolstring(Lc, 1, &rlen); @@ -603,6 +640,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); } + int res = 0; switch (yield_type) { case MCP_YIELD_AWAIT: mcplib_await_run(c, resp, Lc, coro_ref); @@ -611,6 +649,32 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con // TODO (v2): c only used for cache alloc? mcp_queue_io(c, resp, coro_ref, Lc); break; + case MCP_YIELD_LOCAL: + // stack should be: rq, res + res = mcplib_internal_run(Lc, c, resp, coro_ref); + if (res == 0) { + // stack should still be: rq, res + // TODO: turn this function into a for loop that re-runs on + // certain status codes, to avoid recursive depth here. + // + // FIXME: this dance with the coroutine reference is + // annoying. In this case we immediately resume, so no *io + // was generated, so we won't do the above coro_ref swap, so + // we'll try to take the coro_ref again and fail. + // The ref is only actually used in proxy_await + // It should instead be stashed on the top mc_resp object + // (ideally removing c->proxy_coro_ref at the same time) + // and unref'ed when the resp is cleaned up. + lua_rawgeti(c->thread->L, LUA_REGISTRYINDEX, coro_ref); + luaL_unref(c->thread->L, LUA_REGISTRYINDEX, coro_ref); + proxy_run_coroutine(Lc, resp, NULL, c); + } else if (res > 0) { + // internal run queued for extstore. + } else { + assert(res < 0); + proxy_out_errstring(resp, "bad request"); + } + break; default: abort(); } diff --git a/proxy.h b/proxy.h index bb9f0c3..f8ceec3 100644 --- a/proxy.h +++ b/proxy.h @@ -2,6 +2,7 @@ #define PROXY_H #include "memcached.h" +#include "extstore.h" #include #include #include @@ -400,6 +401,8 @@ enum mcp_resp_mode { typedef struct { mcmc_resp_t resp; char *buf; // response line + potentially value. + mc_resp *cresp; // client mc_resp object during extstore fetches. + LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup. size_t blen; // total size of the value to read. struct timeval start; // time this object was created. long elapsed; // time elapsed once handled. @@ -413,6 +416,8 @@ typedef struct { // re-cast an io_pending_t into this more descriptive structure. // the first few items _must_ match the original struct. +#define IO_PENDING_TYPE_PROXY 0 +#define IO_PENDING_TYPE_EXTSTORE 1 struct _io_pending_proxy_t { int io_queue_type; LIBEVENT_THREAD *thread; @@ -422,22 +427,39 @@ struct _io_pending_proxy_t { io_queue_cb finalize_cb; // called back on the worker thread. // original struct ends here - struct _io_pending_proxy_t *next; // stack for IO submission - STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends + int io_type; // extstore IO or backend IO int coro_ref; // lua registry reference to the coroutine - int mcpres_ref; // mcp.res reference used for await() lua_State *coro; // pointer directly to the coroutine - mcp_backend_t *backend; // backend server to request from - struct iovec iov[2]; // request string + tail buffer - int iovcnt; // 1 or 2... - unsigned int iovbytes; // total bytes in the iovec - int await_ref; // lua reference if we were an await object - mcp_resp_t *client_resp; // reference (currently pointing to a lua object) - bool flushed; // whether we've fully written this request to a backend. - bool ascii_multiget; // passed on from mcp_r_t - bool is_await; // are we an await object? - bool await_first; // are we the main route for an await object? - bool await_background; // dummy IO for backgrounded awaits + union { + // extstore IO. + struct { + obj_io eio; + item *hdr_it; + mc_resp *tresp; // temporary mc_resp for storage to fill. + int gettype; + int iovec_data; + bool miss; + bool badcrc; + bool active; + }; + // backend request IO + struct { + struct _io_pending_proxy_t *next; // stack for IO submission + STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends + int mcpres_ref; // mcp.res reference used for await() + mcp_backend_t *backend; // backend server to request from + struct iovec iov[2]; // request string + tail buffer + int iovcnt; // 1 or 2... + unsigned int iovbytes; // total bytes in the iovec + int await_ref; // lua reference if we were an await object + mcp_resp_t *client_resp; // reference (currently pointing to a lua object) + bool flushed; // whether we've fully written this request to a backend. + bool ascii_multiget; // passed on from mcp_r_t + bool is_await; // are we an await object? + bool await_first; // are we the main route for an await object? + bool await_background; // dummy IO for backgrounded awaits + }; + }; }; // Note: does *be have to be a sub-struct? how stable are userdata pointers? @@ -492,6 +514,10 @@ int mcplib_await_logerrors(lua_State *L); int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref); int mcplib_await_return(io_pending_proxy_t *p); +// internal request interface +int mcplib_internal(lua_State *L); +int mcplib_internal_run(lua_State *L, conn *c, mc_resp *top_resp, int coro_ref); + // user stats interface int mcplib_add_stat(lua_State *L); int mcplib_stat(lua_State *L); diff --git a/proxy_internal.c b/proxy_internal.c new file mode 100644 index 0000000..a193c16 --- /dev/null +++ b/proxy_internal.c @@ -0,0 +1,1694 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// Functions related to local command execution. + +#include "proxy.h" +#include "storage.h" + +#define PROXY_STORAGE_GET 0 +#define PROXY_STORAGE_MG 1 +#define _DO_CAS true +#define _NO_CAS false +#define _DO_TOUCH true +#define _NO_TOUCH false + +static int _store_item_copy_from_buf(item *d_it, char *buf, const int len) { + if (d_it->it_flags & ITEM_CHUNKED) { + item_chunk *dch = (item_chunk *) ITEM_schunk(d_it); + int done = 0; + // Fill dch's via a flat data buffer + while (len > done && dch) { + int todo = (dch->size - dch->used < len - done) + ? dch->size - dch->used : len - done; + memcpy(dch->data + dch->used, buf + done, todo); + done += todo; + dch->used += todo; + assert(dch->used <= dch->size); + + if (dch->size == dch->used) { + item_chunk *tch = do_item_alloc_chunk(dch, len - done); + if (tch) { + dch = tch; + } else { + return -1; + } + } + } + assert(len == done); + } else { + memcpy(ITEM_data(d_it), buf, len); + } + + return 0; +} + +// TODO (v2): out_string() needs to change to just take a *resp, but I don't +// want to do the huge refactor in this change series. So for now we have a +// custom out_string(). +static void pout_string(mc_resp *resp, const char *str) { + size_t len; + bool skip = resp->skip; + assert(resp != NULL); + + // if response was original filled with something, but we're now writing + // out an error or similar, have to reset the object first. + resp_reset(resp); + + // We blank the response "just in case", but if we're not intending on + // sending it lets not rewrite it. + if (skip) { + resp->skip = true; + return; + } + + // Fill response object with static string. + + len = strlen(str); + if ((len + 2) > WRITE_BUFFER_SIZE) { + /* ought to be always enough. just fail for simplicity */ + str = "SERVER_ERROR output line too long"; + len = strlen(str); + } + + memcpy(resp->wbuf, str, len); + memcpy(resp->wbuf + len, "\r\n", 2); + resp_add_iov(resp, resp->wbuf, len + 2); + + return; +} + +// For meta commands error strings override the quiet flag. +static void pout_errstring(mc_resp *resp, const char *str) { + resp->skip = false; + pout_string(resp, str); +} + +#ifdef EXTSTORE +static void _storage_get_item_cb(void *e, obj_io *eio, int ret) { + io_pending_proxy_t *io = (io_pending_proxy_t *)eio->data; + assert(io->active == true); + mc_resp *resp = io->tresp; + item *read_it = (item *)eio->buf; + bool miss = false; + + if (ret < 1) { + miss = true; + } else { + uint32_t crc2; + uint32_t crc = (uint32_t) read_it->exptime; + crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, eio->len-STORE_OFFSET); + + if (crc != crc2) { + miss = true; + io->badcrc = true; + } + } + + if (miss && !resp->skip) { + resp->iovcnt = 1; + if (io->gettype == PROXY_STORAGE_GET) { + resp->iov[0].iov_len = 5; + resp->iov[0].iov_base = "END\r\n"; + resp->tosend = 5; + } else if (io->gettype == PROXY_STORAGE_MG) { + resp->iov[0].iov_len = 4; + resp->iov[0].iov_base = "EN\r\n"; + resp->tosend = 5; + } else { + assert(1 == 0); + } + } + + if (!miss) { + resp->iov[io->iovec_data].iov_base = ITEM_data(read_it); + } + io->miss = miss; + io->active = false; + + // in proxy mode we tend to return IO's as they happen so we can keep + // latency down more. + return_io_pending((io_pending_t *)io); +} + +// TODO (v2): if the item is smaller than resp->wbuf[] shouldn't we just read +// directly into there? item only necessary for recache. +static int proxy_storage_get(LIBEVENT_THREAD *t, item *it, mc_resp *resp, + int type) { +#ifdef NEED_ALIGN + item_hdr hdr; + memcpy(&hdr, ITEM_data(it), sizeof(hdr)); +#else + item_hdr *hdr = (item_hdr *)ITEM_data(it); +#endif + size_t ntotal = ITEM_ntotal(it); + + io_pending_proxy_t *io = do_cache_alloc(t->io_cache); + // this is a re-cast structure, so assert that we never outsize it. + assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t)); + memset(io, 0, sizeof(io_pending_proxy_t)); + io->active = true; + // io_pending owns the reference for this object now. + io->hdr_it = it; + io->tresp = resp; // our mc_resp is a temporary object. + io->io_queue_type = IO_QUEUE_EXTSTORE; + io->io_type = IO_PENDING_TYPE_EXTSTORE; // proxy specific sub-type. + io->gettype = type; + io->thread = t; + io->return_cb = proxy_return_cb; + io->finalize_cb = proxy_finalize_cb; + obj_io *eio = &io->eio; + + eio->buf = malloc(ntotal); + if (eio->buf == NULL) { + do_cache_free(t->io_cache, io); + return -1; + } + + io->iovec_data = resp->iovcnt; + resp_add_iov(resp, "", it->nbytes); + + // We can't bail out anymore, so mc_resp owns the IO from here. + resp->io_pending = (io_pending_t *)io; + + // reference ourselves for the callback. + eio->data = (void *)io; + + // Now, fill in io->io based on what was in our header. +#ifdef NEED_ALIGN + eio->page_version = hdr.page_version; + eio->page_id = hdr.page_id; + eio->offset = hdr.offset; +#else + eio->page_version = hdr->page_version; + eio->page_id = hdr->page_id; + eio->offset = hdr->offset; +#endif + eio->len = ntotal; + eio->mode = OBJ_IO_READ; + eio->cb = _storage_get_item_cb; + + pthread_mutex_lock(&t->stats.mutex); + t->stats.get_extstore++; + pthread_mutex_unlock(&t->stats.mutex); + + return 0; +} +#endif // EXTSTORE + +/* client flags == 0 means use no storage for client flags */ +static inline int make_ascii_get_suffix(char *suffix, item *it, bool return_cas, int nbytes) { + char *p = suffix; + *p = ' '; + p++; + if (FLAGS_SIZE(it) == 0) { + *p = '0'; + p++; + } else { + p = itoa_u32(*((uint32_t *) ITEM_suffix(it)), p); + } + *p = ' '; + p = itoa_u32(nbytes-2, p+1); + + if (return_cas) { + *p = ' '; + p = itoa_u64(ITEM_get_cas(it), p+1); + } + + *p = '\r'; + *(p+1) = '\n'; + *(p+2) = '\0'; + return (p - suffix) + 2; +} + +static void process_get_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, bool return_cas, bool should_touch) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + int nkey = pr->klen; + rel_time_t exptime = 0; + bool overflow = false; // unused. + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + item *it = limited_get(key, nkey, t, exptime, should_touch, DO_UPDATE, &overflow); + if (it) { + int nbytes = it->nbytes;; + nbytes = it->nbytes; + char *p = resp->wbuf; + memcpy(p, "VALUE ", 6); + p += 6; + memcpy(p, ITEM_key(it), it->nkey); + p += it->nkey; + p += make_ascii_get_suffix(p, it, return_cas, nbytes); + resp_add_iov(resp, resp->wbuf, p - resp->wbuf); + +#ifdef EXTSTORE + if (it->it_flags & ITEM_HDR) { + if (proxy_storage_get(t, it, resp, PROXY_STORAGE_GET) != 0) { + pthread_mutex_lock(&t->stats.mutex); + t->stats.get_oom_extstore++; + pthread_mutex_unlock(&t->stats.mutex); + + item_remove(it); + proxy_out_errstring(resp, "out of memory writing get response"); + return; + } + } else if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(resp, ITEM_data(it), it->nbytes); + } else { + resp_add_chunked_iov(resp, it, it->nbytes); + } +#else + if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(resp, ITEM_data(it), it->nbytes); + } else { + resp_add_chunked_iov(resp, it, it->nbytes); + } +#endif + + /* item_get() has incremented it->refcount for us */ + pthread_mutex_lock(&t->stats.mutex); + if (should_touch) { + t->stats.touch_cmds++; + t->stats.slab_stats[ITEM_clsid(it)].touch_hits++; + } else { + t->stats.lru_hits[it->slabs_clsid]++; + t->stats.get_cmds++; + } + pthread_mutex_unlock(&t->stats.mutex); +#ifdef EXTSTORE + /* If ITEM_HDR, an io_wrap owns the reference. */ + if ((it->it_flags & ITEM_HDR) == 0) { + resp->item = it; + } +#else + resp->item = it; +#endif + } else { + pthread_mutex_lock(&t->stats.mutex); + if (should_touch) { + t->stats.touch_cmds++; + t->stats.touch_misses++; + } else { + t->stats.get_misses++; + t->stats.get_cmds++; + } + pthread_mutex_unlock(&t->stats.mutex); + } + + resp_add_iov(resp, "END\r\n", 5); + return; +} + +static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, int comm, bool handle_cas) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + unsigned int flags; + int32_t exptime_int = 0; + rel_time_t exptime = 0; + uint64_t req_cas_id = 0; + item *it; + + assert(resp != NULL); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + // TODO (v2): these safe_str* functions operate on C _strings_, but these + // tokens simply end with a space or carriage return/newline, so we either + // need custom functions or validate harder that these calls won't bite us + // later. + if (! (safe_strtoul(&pr->request[pr->tokens[2]], (uint32_t *)&flags) + && safe_strtol(&pr->request[pr->tokens[3]], &exptime_int))) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int)); + + // does cas value exist? + if (handle_cas) { + if (!safe_strtoull(&pr->request[pr->tokens[5]], &req_cas_id)) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + } + + // vlen is validated from the main parser. + + if (settings.detail_enabled) { + stats_prefix_record_set(key, nkey); + } + + it = item_alloc(key, nkey, flags, exptime, pr->vlen); + + if (it == 0) { + //enum store_item_type status; + if (! item_size_ok(nkey, flags, pr->vlen)) { + //out_string(c, "SERVER_ERROR object too large for cache"); + //status = TOO_LARGE; + pthread_mutex_lock(&t->stats.mutex); + t->stats.store_too_large++; + pthread_mutex_unlock(&t->stats.mutex); + } else { + //out_of_memory(c, "SERVER_ERROR out of memory storing object"); + //status = NO_MEMORY; + pthread_mutex_lock(&t->stats.mutex); + t->stats.store_no_memory++; + pthread_mutex_unlock(&t->stats.mutex); + } + //LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, + // NULL, status, comm, key, nkey, 0, 0, c->sfd); + + /* Avoid stale data persisting in cache because we failed alloc. + * Unacceptable for SET. Anywhere else too? */ + if (comm == NREAD_SET) { + it = item_get(key, nkey, t, DONT_UPDATE); + if (it) { + item_unlink(it); + STORAGE_delete(t->storage, it); + item_remove(it); + } + } + + return; + } + ITEM_set_cas(it, req_cas_id); + + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(it)].set_cmds++; + pthread_mutex_unlock(&t->stats.mutex); + + // complete_nread_proxy() does the data chunk check so all we need to do + // is copy the data. + if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) { + pout_string(resp, "SERVER_ERROR out of memory storing object"); + item_remove(it); + return; + } + + int ret = store_item(it, comm, t, NULL, CAS_NO_STALE); + switch (ret) { + case STORED: + pout_string(resp, "STORED"); + break; + case EXISTS: + pout_string(resp, "EXISTS"); + break; + case NOT_FOUND: + pout_string(resp, "NOT_FOUND"); + break; + case NOT_STORED: + pout_string(resp, "NOT_STORED"); + break; + default: + pout_string(resp, "SERVER_ERROR Unhandled storage type."); + } + +} + +static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) { + char temp[INCR_MAX_STORAGE_LEN]; + uint64_t delta; + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + + assert(t != NULL); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (!safe_strtoull(&pr->request[pr->tokens[2]], &delta)) { + pout_string(resp, "CLIENT_ERROR invalid numeric delta argument"); + return; + } + + switch(add_delta(t, key, nkey, incr, delta, temp, NULL)) { + case OK: + pout_string(resp, temp); + break; + case NON_NUMERIC: + pout_string(resp, "CLIENT_ERROR cannot increment or decrement non-numeric value"); + break; + case EOM: + pout_string(resp, "SERVER_ERROR out of memory"); + break; + case DELTA_ITEM_NOT_FOUND: + pthread_mutex_lock(&t->stats.mutex); + if (incr) { + t->stats.incr_misses++; + } else { + t->stats.decr_misses++; + } + pthread_mutex_unlock(&t->stats.mutex); + + pout_string(resp, "NOT_FOUND"); + break; + case DELTA_ITEM_CAS_MISMATCH: + break; /* Should never get here */ + } +} + +static void process_delete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + item *it; + uint32_t hv; + + assert(t != NULL); + + // NOTE: removed a compatibility bodge from a decade ago. + // delete used to take a "delay" argument, which was removed, but some + // ancient php clients always sent a 0 argument, which would then fail. + // It's been long enough that I don't want to carry this forward into the + // new parser. + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv); + if (it) { + MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); + + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(it)].delete_hits++; + pthread_mutex_unlock(&t->stats.mutex); + + do_item_unlink(it, hv); + STORAGE_delete(t->storage, it); + do_item_remove(it); /* release our reference */ + pout_string(resp, "DELETED"); + } else { + pthread_mutex_lock(&t->stats.mutex); + t->stats.delete_misses++; + pthread_mutex_unlock(&t->stats.mutex); + + pout_string(resp, "NOT_FOUND"); + } + item_unlock(hv); +} + +static void process_touch_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + int32_t exptime_int = 0; + rel_time_t exptime = 0; + item *it; + + assert(t != NULL); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (!safe_strtol(&pr->request[pr->tokens[2]], &exptime_int)) { + pout_string(resp, "CLIENT_ERROR invalid exptime argument"); + return; + } + + exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int)); + it = item_touch(key, nkey, exptime, t); + if (it) { + pthread_mutex_lock(&t->stats.mutex); + t->stats.touch_cmds++; + t->stats.slab_stats[ITEM_clsid(it)].touch_hits++; + pthread_mutex_unlock(&t->stats.mutex); + + pout_string(resp, "TOUCHED"); + item_remove(it); + } else { + pthread_mutex_lock(&t->stats.mutex); + t->stats.touch_cmds++; + t->stats.touch_misses++; + pthread_mutex_unlock(&t->stats.mutex); + + pout_string(resp, "NOT_FOUND"); + } +} + +/*** meta command handlers ***/ + +// FIXME: macro or public interface, this is copypasted. +static int _process_token_len(mcp_parser_t *pr, size_t token) { + const char *s = pr->request + pr->tokens[token]; + const char *e = pr->request + pr->tokens[token+1]; + // start of next token is after any space delimiters, so back those out. + while (*(e-1) == ' ') { + e--; + } + return e - s; +} + +#define META_SPACE(p) { \ + *p = ' '; \ + p++; \ +} + +#define META_CHAR(p, c) { \ + *p = ' '; \ + *(p+1) = c; \ + p += 2; \ +} + +// FIXME: binary key support. +#define META_KEY(p, key, nkey, bin) { \ + META_CHAR(p, 'k'); \ + memcpy(p, key, nkey); \ + p += nkey; \ +} + +#define MFLAG_MAX_OPT_LENGTH 20 +#define MFLAG_MAX_OPAQUE_LENGTH 32 + +struct _meta_flags { + unsigned int has_error :1; // flipped if we found an error during parsing. + unsigned int no_update :1; + unsigned int locked :1; + unsigned int vivify :1; + unsigned int la :1; + unsigned int hit :1; + unsigned int value :1; + unsigned int set_stale :1; + unsigned int no_reply :1; + unsigned int has_cas :1; + unsigned int new_ttl :1; + unsigned int key_binary:1; + char mode; // single character mode switch, common to ms/ma + rel_time_t exptime; + rel_time_t autoviv_exptime; + rel_time_t recache_time; + uint32_t client_flags; + uint64_t req_cas_id; + uint64_t delta; // ma + uint64_t initial; // ma +}; + +static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start, + struct _meta_flags *of, char **errstr) { + unsigned int i; + //size_t ret; + int32_t tmp_int; + uint8_t seen[127] = {0}; + // Start just past the key token. Look at first character of each token. + for (i = start; i < pr->ntokens; i++) { + uint8_t o = (uint8_t)pr->request[pr->tokens[i]]; + // zero out repeat flags so we don't over-parse for return data. + if (o >= 127 || seen[o] != 0) { + *errstr = "CLIENT_ERROR duplicate flag"; + return -1; + } + seen[o] = 1; + switch (o) { + // base64 decode the key in-place, as the binary should always be + // shorter and the conversion code buffers bytes. + // TODO: we need temporary space for the binary key decode since + // request should be const. + /*case 'b': + ret = base64_decode((unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length, + (unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length); + if (ret == 0) { + // Failed to decode + *errstr = "CLIENT_ERROR error decoding key"; + of->has_error = 1; + } + tokens[KEY_TOKEN].length = ret; + of->key_binary = 1; + break;*/ + /* Negative exptimes can underflow and end up immortal. realtime() will + immediately expire values that are greater than REALTIME_MAXDELTA, but less + than process_started, so lets aim for that. */ + case 'N': + of->locked = 1; + of->vivify = 1; + if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) { + *errstr = "CLIENT_ERROR bad token in command line format"; + of->has_error = 1; + } else { + of->autoviv_exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int)); + } + break; + case 'T': + of->locked = 1; + if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) { + *errstr = "CLIENT_ERROR bad token in command line format"; + of->has_error = 1; + } else { + of->exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int)); + of->new_ttl = true; + } + break; + case 'R': + of->locked = 1; + if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) { + *errstr = "CLIENT_ERROR bad token in command line format"; + of->has_error = 1; + } else { + of->recache_time = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int)); + } + break; + case 'l': + of->la = 1; + of->locked = 1; // need locked to delay LRU bump + break; + case 'O': + case 'P': + case 'L': + break; + case 'k': // known but no special handling + case 's': + case 't': + case 'c': + case 'f': + break; + case 'v': + of->value = 1; + break; + case 'h': + of->locked = 1; // need locked to delay LRU bump + break; + case 'u': + of->no_update = 1; + break; + case 'q': + of->no_reply = 1; + break; + // mset-related. + case 'F': + if (!safe_strtoul(&pr->request[pr->tokens[i]+1], &of->client_flags)) { + of->has_error = true; + } + break; + case 'C': // mset, mdelete, marithmetic + if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->req_cas_id)) { + *errstr = "CLIENT_ERROR bad token in command line format"; + of->has_error = true; + } else { + of->has_cas = true; + } + break; + case 'M': // mset and marithmetic mode switch + // FIXME: this used to error if the token isn't a single byte. + // It probably should still? + of->mode = pr->request[pr->tokens[i]]; + break; + case 'J': // marithmetic initial value + if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->initial)) { + *errstr = "CLIENT_ERROR invalid numeric initial value"; + of->has_error = 1; + } + break; + case 'D': // marithmetic delta value + if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->delta)) { + *errstr = "CLIENT_ERROR invalid numeric delta value"; + of->has_error = 1; + } + break; + case 'I': + of->set_stale = 1; + break; + default: // unknown flag, bail. + *errstr = "CLIENT_ERROR invalid flag"; + return -1; + } + } + + return of->has_error ? -1 : 0; +} + +static void process_mget_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + item *it; + unsigned int i = 0; + struct _meta_flags of = {0}; // option bitflags. + uint32_t hv; // cached hash value for unlocking an item. + bool failed = false; + bool item_created = false; + bool won_token = false; + bool ttl_set = false; + char *errstr = "CLIENT_ERROR bad command line format"; + assert(t != NULL); + char *p = resp->wbuf; + int tlen = 0; + + // FIXME: still needed? + //WANT_TOKENS_MIN(ntokens, 3); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) { + // TODO: ensure the command tokenizer gives us at least this many + pout_errstring(resp, "CLIENT_ERROR options flags are too long"); + return; + } + + // scrubs duplicated options and sets flags for how to load the item. + // we pass in the first token that should be a flag. + if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) { + pout_errstring(resp, errstr); + return; + } + + bool overflow = false; + if (!of.locked) { + it = limited_get(key, nkey, t, 0, false, !of.no_update, &overflow); + } else { + // If we had to lock the item, we're doing our own bump later. + it = limited_get_locked(key, nkey, t, DONT_UPDATE, &hv, &overflow); + } + + // Since we're a new protocol, we can actually inform users that refcount + // overflow is happening by straight up throwing an error. + // We definitely don't want to re-autovivify by accident. + if (overflow) { + assert(it == NULL); + pout_errstring(resp, "SERVER_ERROR refcount overflow during fetch"); + return; + } + + if (it == NULL && of.vivify) { + // Fill in the exptime during parsing later. + it = item_alloc(key, nkey, 0, realtime(0), 2); + // We don't actually need any of do_store_item's logic: + // - already fetched and missed an existing item. + // - lock is still held. + // - not append/prepend/replace + // - not testing CAS + if (it != NULL) { + // I look forward to the day I get rid of this :) + memcpy(ITEM_data(it), "\r\n", 2); + // NOTE: This initializes the CAS value. + do_item_link(it, hv); + item_created = true; + } + } + + // don't have to check result of add_iov() since the iov size defaults are + // enough. + if (it) { + if (of.value) { + memcpy(p, "VA ", 3); + p = itoa_u32(it->nbytes-2, p+3); + } else { + memcpy(p, "HD", 2); + p += 2; + } + + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + case 'T': + ttl_set = true; + it->exptime = of.exptime; + break; + case 'N': + if (item_created) { + it->exptime = of.autoviv_exptime; + won_token = true; + } + break; + case 'R': + // If we haven't autovivified and supplied token is less + // than current TTL, mark a win. + if ((it->it_flags & ITEM_TOKEN_SENT) == 0 + && !item_created + && it->exptime != 0 + && it->exptime < of.recache_time) { + won_token = true; + } + break; + case 's': + META_CHAR(p, 's'); + p = itoa_u32(it->nbytes-2, p); + break; + case 't': + // TTL remaining as of this request. + // needs to be relative because server clocks may not be in sync. + META_CHAR(p, 't'); + if (it->exptime == 0) { + *p = '-'; + *(p+1) = '1'; + p += 2; + } else { + p = itoa_u32(it->exptime - current_time, p); + } + break; + case 'c': + META_CHAR(p, 'c'); + p = itoa_u64(ITEM_get_cas(it), p); + break; + case 'f': + META_CHAR(p, 'f'); + if (FLAGS_SIZE(it) == 0) { + *p = '0'; + p++; + } else { + p = itoa_u32(*((uint32_t *) ITEM_suffix(it)), p); + } + break; + case 'l': + META_CHAR(p, 'l'); + p = itoa_u32(current_time - it->time, p); + break; + case 'h': + META_CHAR(p, 'h'); + if (it->it_flags & ITEM_FETCHED) { + *p = '1'; + } else { + *p = '0'; + } + p++; + break; + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + p += tlen; + break; + case 'k': + META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY)); + break; + } + } + + // Has this item already sent a token? + // Important to do this here so we don't send W with Z. + // Isn't critical, but easier for client authors to understand. + if (it->it_flags & ITEM_TOKEN_SENT) { + META_CHAR(p, 'Z'); + } + if (it->it_flags & ITEM_STALE) { + META_CHAR(p, 'X'); + // FIXME: think hard about this. is this a default, or a flag? + if ((it->it_flags & ITEM_TOKEN_SENT) == 0) { + // If we're stale but no token already sent, now send one. + won_token = true; + } + } + + if (won_token) { + // Mark a win into the flag buffer. + META_CHAR(p, 'W'); + it->it_flags |= ITEM_TOKEN_SENT; + } + + *p = '\r'; + *(p+1) = '\n'; + *(p+2) = '\0'; + p += 2; + // finally, chain in the buffer. + resp_add_iov(resp, resp->wbuf, p - resp->wbuf); + + if (of.value) { +#ifdef EXTSTORE + if (it->it_flags & ITEM_HDR) { + if (proxy_storage_get(t, it, resp, PROXY_STORAGE_MG) != 0) { + pthread_mutex_lock(&t->stats.mutex); + t->stats.get_oom_extstore++; + pthread_mutex_unlock(&t->stats.mutex); + + failed = true; + } + } else if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(resp, ITEM_data(it), it->nbytes); + } else { + resp_add_chunked_iov(resp, it, it->nbytes); + } +#else + if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(resp, ITEM_data(it), it->nbytes); + } else { + resp_add_chunked_iov(resp, it, it->nbytes); + } +#endif + } + + // need to hold the ref at least because of the key above. +#ifdef EXTSTORE + if (!failed) { + if ((it->it_flags & ITEM_HDR) != 0 && of.value) { + // Only have extstore clean if header and returning value. + resp->item = NULL; + } else { + resp->item = it; + } + } else { + // Failed to set up extstore fetch. + if (of.locked) { + do_item_remove(it); + } else { + item_remove(it); + } + } +#else + resp->item = it; +#endif + } else { + failed = true; + } + + if (of.locked) { + // Delayed bump so we could get fetched/last access time pre-update. + if (!of.no_update && it != NULL) { + do_item_bump(t, it, hv); + } + item_unlock(hv); + } + + // we count this command as a normal one if we've gotten this far. + // TODO: for autovivify case, miss never happens. Is this okay? + if (!failed) { + pthread_mutex_lock(&t->stats.mutex); + if (ttl_set) { + t->stats.touch_cmds++; + t->stats.slab_stats[ITEM_clsid(it)].touch_hits++; + } else { + t->stats.lru_hits[it->slabs_clsid]++; + t->stats.get_cmds++; + } + pthread_mutex_unlock(&t->stats.mutex); + } else { + pthread_mutex_lock(&t->stats.mutex); + if (ttl_set) { + t->stats.touch_cmds++; + t->stats.touch_misses++; + } else { + t->stats.get_misses++; + t->stats.get_cmds++; + } + pthread_mutex_unlock(&t->stats.mutex); + + // This gets elided in noreply mode. + if (of.no_reply) + resp->skip = true; + memcpy(p, "EN", 2); + p += 2; + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + // TODO: macro perhaps? + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + p += tlen; + break; + case 'k': + META_KEY(p, key, nkey, of.key_binary); + break; + } + } + resp->wbytes = p - resp->wbuf; + memcpy(resp->wbuf + resp->wbytes, "\r\n", 2); + resp->wbytes += 2; + resp_add_iov(resp, resp->wbuf, resp->wbytes); + } + return; +error: + if (it) { + do_item_remove(it); + if (of.locked) { + item_unlock(hv); + } + } + pout_errstring(resp, errstr); +} + +static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + + item *it; + int i; + short comm = NREAD_SET; + struct _meta_flags of = {0}; // option bitflags. + char *errstr = "CLIENT_ERROR bad command line format"; + uint32_t hv; // cached hash value. + int vlen = pr->vlen; // value from data line. + assert(t != NULL); + char *p = resp->wbuf; + int tlen = 0; + + //WANT_TOKENS_MIN(ntokens, 3); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) { + // TODO: ensure the command tokenizer gives us at least this many + pout_errstring(resp, "CLIENT_ERROR options flags are too long"); + return; + } + + if (pr->ntokens == 3) { + pout_errstring(resp, "CLIENT_ERROR bad command line format"); + return; + } + + // We need to at least try to get the size to properly slurp bad bytes + // after an error. + // we pass in the first token that should be a flag. + if (_meta_flag_preparse(pr, 3, &of, &errstr) != 0) { + goto error; + } + + // "mode switch" to alternative commands + switch (of.mode) { + case 0: + break; // no mode supplied. + case 'E': // Add... + comm = NREAD_ADD; + break; + case 'A': // Append. + comm = NREAD_APPEND; + break; + case 'P': // Prepend. + comm = NREAD_PREPEND; + break; + case 'R': // Replace. + comm = NREAD_REPLACE; + break; + case 'S': // Set. Default. + comm = NREAD_SET; + break; + default: + errstr = "CLIENT_ERROR invalid mode for ms M token"; + goto error; + } + + // The item storage function doesn't exactly map to mset. + // If a CAS value is supplied, upgrade default SET mode to CAS mode. + // Also allows REPLACE to work, as REPLACE + CAS works the same as CAS. + // add-with-cas works the same as add; but could only LRU bump if match.. + // APPEND/PREPEND allow a simplified CAS check. + if (of.has_cas && (comm == NREAD_SET || comm == NREAD_REPLACE)) { + comm = NREAD_CAS; + } + + it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen); + + if (it == 0) { + if (! item_size_ok(nkey, of.client_flags, vlen)) { + errstr = "SERVER_ERROR object too large for cache"; + pthread_mutex_lock(&t->stats.mutex); + t->stats.store_too_large++; + pthread_mutex_unlock(&t->stats.mutex); + } else { + errstr = "SERVER_ERROR out of memory storing object"; + pthread_mutex_lock(&t->stats.mutex); + t->stats.store_no_memory++; + pthread_mutex_unlock(&t->stats.mutex); + } + + /* Avoid stale data persisting in cache because we failed alloc. */ + // NOTE: only if SET mode? + it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv); + if (it) { + do_item_unlink(it, hv); + STORAGE_delete(t->storage, it); + do_item_remove(it); + } + item_unlock(hv); + + goto error; + } + ITEM_set_cas(it, of.req_cas_id); + + // data should already be read into the request. + + // Prevent printing back the key in meta commands as garbage. + if (of.key_binary) { + it->it_flags |= ITEM_KEY_BINARY; + } + + bool set_stale = CAS_NO_STALE; + if (of.set_stale && comm == NREAD_CAS) { + set_stale = CAS_ALLOW_STALE; + } + resp->wbytes = p - resp->wbuf; + + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(it)].set_cmds++; + pthread_mutex_unlock(&t->stats.mutex); + + // complete_nread_proxy() does the data chunk check so all we need to do + // is copy the data. + if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) { + pout_string(resp, "SERVER_ERROR out of memory storing object"); + item_remove(it); + return; + } + + uint64_t cas = 0; + int ret = store_item(it, comm, t, &cas, set_stale); + switch (ret) { + case STORED: + memcpy(p, "HD", 2); + // Only place noreply is used for meta cmds is a nominal response. + if (of.no_reply) { + resp->skip = true; + } + break; + case EXISTS: + memcpy(p, "EX", 2); + break; + case NOT_FOUND: + memcpy(p, "NF", 2); + break; + case NOT_STORED: + memcpy(p, "NS", 2); + break; + default: + pout_errstring(resp, "SERVER_ERROR Unhandled storage type."); + return; + + } + p += 2; + + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + p += tlen; + break; + case 'k': + META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY)); + break; + case 'c': + META_CHAR(p, 'c'); + p = itoa_u64(cas, p); + break; + } + } + + // We don't need to free pr->vbuf as that is owned by *rq + // either way, there's no c->item or resp->item reference right now. + + memcpy(p, "\r\n", 2); + p += 2; + // we're offset into wbuf, but good convention to track wbytes. + resp->wbytes = p - resp->wbuf; + resp_add_iov(resp, resp->wbuf, resp->wbytes); + + return; +error: + // Note: no errors possible after the item was successfully allocated. + // So we're just looking at dumping error codes and returning. + pout_errstring(resp, errstr); +} + +static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + item *it = NULL; + int i; + uint32_t hv; + struct _meta_flags of = {0}; // option bitflags. + char *errstr = "CLIENT_ERROR bad command line format"; + assert(t != NULL); + // reserve bytes for status code + char *p = resp->wbuf + 2; + int tlen = 0; + + //WANT_TOKENS_MIN(ntokens, 3); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) { + // TODO: ensure the command tokenizer gives us at least this many + pout_errstring(resp, "CLIENT_ERROR options flags are too long"); + return; + } + + // scrubs duplicated options and sets flags for how to load the item. + // we pass in the first token that should be a flag. + // FIXME: not using the preparse errstr? + if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) { + pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag"); + return; + } + + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + // TODO: macro perhaps? + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + p += tlen; + break; + case 'k': + META_KEY(p, key, nkey, of.key_binary); + break; + } + } + + it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv); + if (it) { + // allow only deleting/marking if a CAS value matches. + if (of.has_cas && ITEM_get_cas(it) != of.req_cas_id) { + pthread_mutex_lock(&t->stats.mutex); + t->stats.delete_misses++; + pthread_mutex_unlock(&t->stats.mutex); + + memcpy(resp->wbuf, "EX", 2); + goto cleanup; + } + + // If we're to set this item as stale, we don't actually want to + // delete it. We mark the stale bit, bump CAS, and update exptime if + // we were supplied a new TTL. + if (of.set_stale) { + if (of.new_ttl) { + it->exptime = of.exptime; + } + it->it_flags |= ITEM_STALE; + // Also need to remove TOKEN_SENT, so next client can win. + it->it_flags &= ~ITEM_TOKEN_SENT; + + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); + + // Clients can noreply nominal responses. + if (of.no_reply) + resp->skip = true; + memcpy(resp->wbuf, "HD", 2); + } else { + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(it)].delete_hits++; + pthread_mutex_unlock(&t->stats.mutex); + + do_item_unlink(it, hv); + STORAGE_delete(t->storage, it); + if (of.no_reply) + resp->skip = true; + memcpy(resp->wbuf, "HD", 2); + } + goto cleanup; + } else { + pthread_mutex_lock(&t->stats.mutex); + t->stats.delete_misses++; + pthread_mutex_unlock(&t->stats.mutex); + + memcpy(resp->wbuf, "NF", 2); + goto cleanup; + } +cleanup: + if (it) { + do_item_remove(it); + } + // Item is always returned locked, even if missing. + item_unlock(hv); + resp->wbytes = p - resp->wbuf; + memcpy(resp->wbuf + resp->wbytes, "\r\n", 2); + resp->wbytes += 2; + resp_add_iov(resp, resp->wbuf, resp->wbytes); + //conn_set_state(c, conn_new_cmd); + return; +error: + pout_errstring(resp, errstr); +} + +static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) { + const char *key = &pr->request[pr->tokens[pr->keytoken]]; + size_t nkey = pr->klen; + int i; + struct _meta_flags of = {0}; // option bitflags. + char *errstr = "CLIENT_ERROR bad command line format"; + assert(t != NULL); + // no reservation (like del/set) since we post-process the status line. + char *p = resp->wbuf; + int tlen = 0; + + // If no argument supplied, incr or decr by one. + of.delta = 1; + of.initial = 0; // redundant, for clarity. + bool incr = true; // default mode is to increment. + bool locked = false; + uint32_t hv = 0; + item *it = NULL; // item returned by do_add_delta. + + //WANT_TOKENS_MIN(ntokens, 3); + + if (nkey > KEY_MAX_LENGTH) { + pout_string(resp, "CLIENT_ERROR bad command line format"); + return; + } + + if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) { + // TODO: ensure the command tokenizer gives us at least this many + pout_errstring(resp, "CLIENT_ERROR options flags are too long"); + return; + } + + // scrubs duplicated options and sets flags for how to load the item. + // we pass in the first token that should be a flag. + if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) { + pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag"); + return; + } + //c->noreply = of.no_reply; + + // "mode switch" to alternative commands + switch (of.mode) { + case 0: // no switch supplied. + break; + case 'I': // Incr (default) + case '+': + incr = true; + break; + case 'D': // Decr. + case '-': + incr = false; + break; + default: + errstr = "CLIENT_ERROR invalid mode for ma M token"; + goto error; + break; + } + + // take hash value and manually lock item... hold lock during store phase + // on miss and avoid recalculating the hash multiple times. + hv = hash(key, nkey); + item_lock(hv); + locked = true; + char tmpbuf[INCR_MAX_STORAGE_LEN]; + + // return a referenced item if it exists, so we can modify it here, rather + // than adding even more parameters to do_add_delta. + bool item_created = false; + uint64_t cas = 0; + switch(do_add_delta(t, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) { + case OK: + //if (c->noreply) + // resp->skip = true; + // *it was filled, set the status below. + cas = ITEM_get_cas(it); + break; + case NON_NUMERIC: + errstr = "CLIENT_ERROR cannot increment or decrement non-numeric value"; + goto error; + break; + case EOM: + errstr = "SERVER_ERROR out of memory"; + goto error; + break; + case DELTA_ITEM_NOT_FOUND: + if (of.vivify) { + itoa_u64(of.initial, tmpbuf); + int vlen = strlen(tmpbuf); + + it = item_alloc(key, nkey, 0, 0, vlen+2); + if (it != NULL) { + memcpy(ITEM_data(it), tmpbuf, vlen); + memcpy(ITEM_data(it) + vlen, "\r\n", 2); + if (do_store_item(it, NREAD_ADD, t, hv, &cas, CAS_NO_STALE)) { + item_created = true; + } else { + // Not sure how we can get here if we're holding the lock. + memcpy(resp->wbuf, "NS", 2); + } + } else { + errstr = "SERVER_ERROR Out of memory allocating new item"; + goto error; + } + } else { + pthread_mutex_lock(&t->stats.mutex); + if (incr) { + t->stats.incr_misses++; + } else { + t->stats.decr_misses++; + } + pthread_mutex_unlock(&t->stats.mutex); + // won't have a valid it here. + memcpy(p, "NF", 2); + p += 2; + } + break; + case DELTA_ITEM_CAS_MISMATCH: + // also returns without a valid it. + memcpy(p, "EX", 2); + p += 2; + break; + } + + // final loop + // allows building the response with information after vivifying from a + // miss, or returning a new CAS value after add_delta(). + if (it) { + size_t vlen = strlen(tmpbuf); + if (of.value) { + memcpy(p, "VA ", 3); + p = itoa_u32(vlen, p+3); + } else { + memcpy(p, "HD", 2); + p += 2; + } + + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + case 'c': + META_CHAR(p, 'c'); + p = itoa_u64(cas, p); + break; + case 't': + META_CHAR(p, 't'); + if (it->exptime == 0) { + *p = '-'; + *(p+1) = '1'; + p += 2; + } else { + p = itoa_u32(it->exptime - current_time, p); + } + break; + case 'T': + it->exptime = of.exptime; + break; + case 'N': + if (item_created) { + it->exptime = of.autoviv_exptime; + } + break; + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + break; + case 'k': + META_KEY(p, key, nkey, of.key_binary); + break; + } + } + + if (of.value) { + *p = '\r'; + *(p+1) = '\n'; + p += 2; + memcpy(p, tmpbuf, vlen); + p += vlen; + } + + do_item_remove(it); + } else { + // No item to handle. still need to return opaque/key tokens + for (i = pr->keytoken+1; i < pr->ntokens; i++) { + switch (pr->request[pr->tokens[i]]) { + case 'O': + tlen = _process_token_len(pr, i); + if (tlen > MFLAG_MAX_OPAQUE_LENGTH) { + errstr = "CLIENT_ERROR opaque token too long"; + goto error; + } + META_SPACE(p); + memcpy(p, &pr->request[pr->tokens[i]], tlen); + break; + case 'k': + META_KEY(p, key, nkey, of.key_binary); + break; + } + } + } + + item_unlock(hv); + + resp->wbytes = p - resp->wbuf; + memcpy(resp->wbuf + resp->wbytes, "\r\n", 2); + resp->wbytes += 2; + resp_add_iov(resp, resp->wbuf, resp->wbytes); + return; +error: + if (it != NULL) + do_item_remove(it); + if (locked) + item_unlock(hv); + pout_errstring(resp, errstr); +} + +/*** Lua and internal handler ***/ + +int mcplib_internal(lua_State *L) { + luaL_checkudata(L, 1, "mcp.request"); + mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0); + memset(r, 0, sizeof(mcp_resp_t)); + luaL_getmetatable(L, "mcp.response"); + lua_setmetatable(L, -2); + + lua_pushinteger(L, MCP_YIELD_LOCAL); + return lua_yield(L, 2); +} + +// we're pretending to be p_c_ascii(), but reusing our already tokenized code. +// the text parser should eventually move to the new tokenizer and we can +// merge all of this code together. +int mcplib_internal_run(lua_State *L, conn *c, mc_resp *top_resp, int coro_ref) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response"); + mc_resp *resp = resp_start_unlinked(c); + LIBEVENT_THREAD *t = c->thread; + mcp_parser_t *pr = &rq->pr; + if (resp == NULL) { + return -1; + } + + // TODO: meta no-op isn't handled here. haven't decided how yet. + switch (rq->pr.command) { + case CMD_MG: + process_mget_cmd(t, pr, resp); + break; + case CMD_MS: + process_mset_cmd(t, pr, resp); + break; + case CMD_MD: + process_mdelete_cmd(t, pr, resp); + break; + case CMD_MA: + process_marithmetic_cmd(t, pr, resp); + break; + case CMD_GET: + process_get_cmd(t, pr, resp, _NO_CAS, _NO_TOUCH); + break; + case CMD_GETS: + process_get_cmd(t, pr, resp, _DO_CAS, _NO_TOUCH); + break; + case CMD_GAT: + process_get_cmd(t, pr, resp, _NO_CAS, _DO_TOUCH); + break; + case CMD_GATS: + process_get_cmd(t, pr, resp, _DO_CAS, _DO_TOUCH); + break; + case CMD_SET: + process_update_cmd(t, pr, resp, NREAD_SET, _NO_CAS); + break; + case CMD_ADD: + process_update_cmd(t, pr, resp, NREAD_ADD, _NO_CAS); + break; + case CMD_APPEND: + process_update_cmd(t, pr, resp, NREAD_APPEND, _NO_CAS); + break; + case CMD_PREPEND: + process_update_cmd(t, pr, resp, NREAD_PREPEND, _NO_CAS); + break; + case CMD_CAS: + process_update_cmd(t, pr, resp, NREAD_CAS, _DO_CAS); + break; + case CMD_REPLACE: + process_update_cmd(t, pr, resp, NREAD_REPLACE, _DO_CAS); + break; + case CMD_INCR: + process_arithmetic_cmd(t, pr, resp, true); + break; + case CMD_DECR: + process_arithmetic_cmd(t, pr, resp, false); + break; + case CMD_DELETE: + process_delete_cmd(t, pr, resp); + break; + case CMD_TOUCH: + process_touch_cmd(t, pr, resp); + break; + default: + resp_free(t, resp); + return -1; + } + + // TODO: I'd like to shortcut the parsing here, but if we want the resp + // object to have full support (ie: resp:line()/etc) it might be necessary + // to still do a full parsing. It might be possible to + // wrap the main commands with something that decorates r->resp directly + // instead of going through a parser to save some CPU. + // Either way this is a lot less code. + mcmc_bare_parse_buf(resp->iov[0].iov_base, resp->iov[0].iov_len, &r->resp); + + // in case someone logs this response it should make sense. + memcpy(r->be_name, "internal", strlen("internal")); + memcpy(r->be_port, "0", 1); + + // TODO: r-> will need status/code/mode copied from resp. + r->cresp = resp; + r->thread = c->thread; + r->cmd = rq->pr.command; + // Always return OK from here as this is signalling an internal error. + r->status = MCMC_OK; + + if (resp->io_pending) { + // TODO (v2): here we move the IO from the temporary resp to the top + // resp, but this feels kludgy so I'm leaving an explicit note to find + // a better way to do this. + top_resp->io_pending = resp->io_pending; + resp->io_pending = NULL; + + // Add io object to extstore submission queue. + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE); + io_pending_proxy_t *io = (io_pending_proxy_t *)top_resp->io_pending; + + io->eio.next = q->stack_ctx; + q->stack_ctx = &io->eio; + assert(q->count >= 0); + q->count++; + + io->coro_ref = coro_ref; + io->coro = L; + io->c = c; + // we need to associate the top level mc_resp here so the run routine + // can fill it in later. + io->resp = top_resp; + // mark the buffer into the mcp_resp for freeing later. + r->buf = io->eio.buf; + return 1; + } + return 0; +} diff --git a/proxy_lua.c b/proxy_lua.c index aeaf1e5..ede23bb 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -90,6 +90,13 @@ static int mcplib_response_gc(lua_State *L) { free(r->buf); } + // release our temporary mc_resp sub-object. + if (r->cresp != NULL) { + mc_resp *cresp = r->cresp; + assert(r->thread != NULL); + resp_free(r->thread, cresp); + } + return 0; } @@ -1188,6 +1195,7 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { }; const struct luaL_Reg mcplib_f [] = { + {"internal", mcplib_internal}, {"pool", mcplib_pool}, {"backend", mcplib_backend}, {"request", mcplib_request}, -- cgit v1.2.1