summaryrefslogtreecommitdiff
path: root/proxy_internal.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-12 12:17:08 -0800
committerdormando <dormando@rydia.net>2023-02-25 15:07:01 -0800
commitf012c9cb3516933b0fce54e12d358e697c24863d (patch)
tree133a41d279be9895c5258b830f6705ea9fbf1268 /proxy_internal.c
parent6442017c545a2a5ad076697b8695cd64bd32b542 (diff)
downloadmemcached-f012c9cb3516933b0fce54e12d358e697c24863d.tar.gz
proxy: add mcp.internal(r) API
local res = mcp.internal(r) - takes a request object and executes it against the proxy's internal cache instance. Experimental as of this commit. Needs more test coverage and benchmarking.
Diffstat (limited to 'proxy_internal.c')
-rw-r--r--proxy_internal.c1694
1 files changed, 1694 insertions, 0 deletions
diff --git a/proxy_internal.c b/proxy_internal.c
new file mode 100644
index 0000000..a193c16
--- /dev/null
+++ b/proxy_internal.c
@@ -0,0 +1,1694 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// Functions related to local command execution.
+
+#include "proxy.h"
+#include "storage.h"
+
+#define PROXY_STORAGE_GET 0
+#define PROXY_STORAGE_MG 1
+#define _DO_CAS true
+#define _NO_CAS false
+#define _DO_TOUCH true
+#define _NO_TOUCH false
+
+static int _store_item_copy_from_buf(item *d_it, char *buf, const int len) {
+ if (d_it->it_flags & ITEM_CHUNKED) {
+ item_chunk *dch = (item_chunk *) ITEM_schunk(d_it);
+ int done = 0;
+ // Fill dch's via a flat data buffer
+ while (len > done && dch) {
+ int todo = (dch->size - dch->used < len - done)
+ ? dch->size - dch->used : len - done;
+ memcpy(dch->data + dch->used, buf + done, todo);
+ done += todo;
+ dch->used += todo;
+ assert(dch->used <= dch->size);
+
+ if (dch->size == dch->used) {
+ item_chunk *tch = do_item_alloc_chunk(dch, len - done);
+ if (tch) {
+ dch = tch;
+ } else {
+ return -1;
+ }
+ }
+ }
+ assert(len == done);
+ } else {
+ memcpy(ITEM_data(d_it), buf, len);
+ }
+
+ return 0;
+}
+
+// TODO (v2): out_string() needs to change to just take a *resp, but I don't
+// want to do the huge refactor in this change series. So for now we have a
+// custom out_string().
+static void pout_string(mc_resp *resp, const char *str) {
+ size_t len;
+ bool skip = resp->skip;
+ assert(resp != NULL);
+
+ // if response was original filled with something, but we're now writing
+ // out an error or similar, have to reset the object first.
+ resp_reset(resp);
+
+ // We blank the response "just in case", but if we're not intending on
+ // sending it lets not rewrite it.
+ if (skip) {
+ resp->skip = true;
+ return;
+ }
+
+ // Fill response object with static string.
+
+ len = strlen(str);
+ if ((len + 2) > WRITE_BUFFER_SIZE) {
+ /* ought to be always enough. just fail for simplicity */
+ str = "SERVER_ERROR output line too long";
+ len = strlen(str);
+ }
+
+ memcpy(resp->wbuf, str, len);
+ memcpy(resp->wbuf + len, "\r\n", 2);
+ resp_add_iov(resp, resp->wbuf, len + 2);
+
+ return;
+}
+
+// For meta commands error strings override the quiet flag.
+static void pout_errstring(mc_resp *resp, const char *str) {
+ resp->skip = false;
+ pout_string(resp, str);
+}
+
+#ifdef EXTSTORE
+static void _storage_get_item_cb(void *e, obj_io *eio, int ret) {
+ io_pending_proxy_t *io = (io_pending_proxy_t *)eio->data;
+ assert(io->active == true);
+ mc_resp *resp = io->tresp;
+ item *read_it = (item *)eio->buf;
+ bool miss = false;
+
+ if (ret < 1) {
+ miss = true;
+ } else {
+ uint32_t crc2;
+ uint32_t crc = (uint32_t) read_it->exptime;
+ crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, eio->len-STORE_OFFSET);
+
+ if (crc != crc2) {
+ miss = true;
+ io->badcrc = true;
+ }
+ }
+
+ if (miss && !resp->skip) {
+ resp->iovcnt = 1;
+ if (io->gettype == PROXY_STORAGE_GET) {
+ resp->iov[0].iov_len = 5;
+ resp->iov[0].iov_base = "END\r\n";
+ resp->tosend = 5;
+ } else if (io->gettype == PROXY_STORAGE_MG) {
+ resp->iov[0].iov_len = 4;
+ resp->iov[0].iov_base = "EN\r\n";
+ resp->tosend = 5;
+ } else {
+ assert(1 == 0);
+ }
+ }
+
+ if (!miss) {
+ resp->iov[io->iovec_data].iov_base = ITEM_data(read_it);
+ }
+ io->miss = miss;
+ io->active = false;
+
+ // in proxy mode we tend to return IO's as they happen so we can keep
+ // latency down more.
+ return_io_pending((io_pending_t *)io);
+}
+
+// TODO (v2): if the item is smaller than resp->wbuf[] shouldn't we just read
+// directly into there? item only necessary for recache.
+static int proxy_storage_get(LIBEVENT_THREAD *t, item *it, mc_resp *resp,
+ int type) {
+#ifdef NEED_ALIGN
+ item_hdr hdr;
+ memcpy(&hdr, ITEM_data(it), sizeof(hdr));
+#else
+ item_hdr *hdr = (item_hdr *)ITEM_data(it);
+#endif
+ size_t ntotal = ITEM_ntotal(it);
+
+ io_pending_proxy_t *io = do_cache_alloc(t->io_cache);
+ // this is a re-cast structure, so assert that we never outsize it.
+ assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
+ memset(io, 0, sizeof(io_pending_proxy_t));
+ io->active = true;
+ // io_pending owns the reference for this object now.
+ io->hdr_it = it;
+ io->tresp = resp; // our mc_resp is a temporary object.
+ io->io_queue_type = IO_QUEUE_EXTSTORE;
+ io->io_type = IO_PENDING_TYPE_EXTSTORE; // proxy specific sub-type.
+ io->gettype = type;
+ io->thread = t;
+ io->return_cb = proxy_return_cb;
+ io->finalize_cb = proxy_finalize_cb;
+ obj_io *eio = &io->eio;
+
+ eio->buf = malloc(ntotal);
+ if (eio->buf == NULL) {
+ do_cache_free(t->io_cache, io);
+ return -1;
+ }
+
+ io->iovec_data = resp->iovcnt;
+ resp_add_iov(resp, "", it->nbytes);
+
+ // We can't bail out anymore, so mc_resp owns the IO from here.
+ resp->io_pending = (io_pending_t *)io;
+
+ // reference ourselves for the callback.
+ eio->data = (void *)io;
+
+ // Now, fill in io->io based on what was in our header.
+#ifdef NEED_ALIGN
+ eio->page_version = hdr.page_version;
+ eio->page_id = hdr.page_id;
+ eio->offset = hdr.offset;
+#else
+ eio->page_version = hdr->page_version;
+ eio->page_id = hdr->page_id;
+ eio->offset = hdr->offset;
+#endif
+ eio->len = ntotal;
+ eio->mode = OBJ_IO_READ;
+ eio->cb = _storage_get_item_cb;
+
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.get_extstore++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ return 0;
+}
+#endif // EXTSTORE
+
+/* client flags == 0 means use no storage for client flags */
+static inline int make_ascii_get_suffix(char *suffix, item *it, bool return_cas, int nbytes) {
+ char *p = suffix;
+ *p = ' ';
+ p++;
+ if (FLAGS_SIZE(it) == 0) {
+ *p = '0';
+ p++;
+ } else {
+ p = itoa_u32(*((uint32_t *) ITEM_suffix(it)), p);
+ }
+ *p = ' ';
+ p = itoa_u32(nbytes-2, p+1);
+
+ if (return_cas) {
+ *p = ' ';
+ p = itoa_u64(ITEM_get_cas(it), p+1);
+ }
+
+ *p = '\r';
+ *(p+1) = '\n';
+ *(p+2) = '\0';
+ return (p - suffix) + 2;
+}
+
+static void process_get_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, bool return_cas, bool should_touch) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ int nkey = pr->klen;
+ rel_time_t exptime = 0;
+ bool overflow = false; // unused.
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ item *it = limited_get(key, nkey, t, exptime, should_touch, DO_UPDATE, &overflow);
+ if (it) {
+ int nbytes = it->nbytes;;
+ nbytes = it->nbytes;
+ char *p = resp->wbuf;
+ memcpy(p, "VALUE ", 6);
+ p += 6;
+ memcpy(p, ITEM_key(it), it->nkey);
+ p += it->nkey;
+ p += make_ascii_get_suffix(p, it, return_cas, nbytes);
+ resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
+
+#ifdef EXTSTORE
+ if (it->it_flags & ITEM_HDR) {
+ if (proxy_storage_get(t, it, resp, PROXY_STORAGE_GET) != 0) {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.get_oom_extstore++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ item_remove(it);
+ proxy_out_errstring(resp, "out of memory writing get response");
+ return;
+ }
+ } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ resp_add_iov(resp, ITEM_data(it), it->nbytes);
+ } else {
+ resp_add_chunked_iov(resp, it, it->nbytes);
+ }
+#else
+ if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ resp_add_iov(resp, ITEM_data(it), it->nbytes);
+ } else {
+ resp_add_chunked_iov(resp, it, it->nbytes);
+ }
+#endif
+
+ /* item_get() has incremented it->refcount for us */
+ pthread_mutex_lock(&t->stats.mutex);
+ if (should_touch) {
+ t->stats.touch_cmds++;
+ t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
+ } else {
+ t->stats.lru_hits[it->slabs_clsid]++;
+ t->stats.get_cmds++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+#ifdef EXTSTORE
+ /* If ITEM_HDR, an io_wrap owns the reference. */
+ if ((it->it_flags & ITEM_HDR) == 0) {
+ resp->item = it;
+ }
+#else
+ resp->item = it;
+#endif
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ if (should_touch) {
+ t->stats.touch_cmds++;
+ t->stats.touch_misses++;
+ } else {
+ t->stats.get_misses++;
+ t->stats.get_cmds++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+ }
+
+ resp_add_iov(resp, "END\r\n", 5);
+ return;
+}
+
+static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, int comm, bool handle_cas) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ unsigned int flags;
+ int32_t exptime_int = 0;
+ rel_time_t exptime = 0;
+ uint64_t req_cas_id = 0;
+ item *it;
+
+ assert(resp != NULL);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ // TODO (v2): these safe_str* functions operate on C _strings_, but these
+ // tokens simply end with a space or carriage return/newline, so we either
+ // need custom functions or validate harder that these calls won't bite us
+ // later.
+ if (! (safe_strtoul(&pr->request[pr->tokens[2]], (uint32_t *)&flags)
+ && safe_strtol(&pr->request[pr->tokens[3]], &exptime_int))) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
+
+ // does cas value exist?
+ if (handle_cas) {
+ if (!safe_strtoull(&pr->request[pr->tokens[5]], &req_cas_id)) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+ }
+
+ // vlen is validated from the main parser.
+
+ if (settings.detail_enabled) {
+ stats_prefix_record_set(key, nkey);
+ }
+
+ it = item_alloc(key, nkey, flags, exptime, pr->vlen);
+
+ if (it == 0) {
+ //enum store_item_type status;
+ if (! item_size_ok(nkey, flags, pr->vlen)) {
+ //out_string(c, "SERVER_ERROR object too large for cache");
+ //status = TOO_LARGE;
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.store_too_large++;
+ pthread_mutex_unlock(&t->stats.mutex);
+ } else {
+ //out_of_memory(c, "SERVER_ERROR out of memory storing object");
+ //status = NO_MEMORY;
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.store_no_memory++;
+ pthread_mutex_unlock(&t->stats.mutex);
+ }
+ //LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE,
+ // NULL, status, comm, key, nkey, 0, 0, c->sfd);
+
+ /* Avoid stale data persisting in cache because we failed alloc.
+ * Unacceptable for SET. Anywhere else too? */
+ if (comm == NREAD_SET) {
+ it = item_get(key, nkey, t, DONT_UPDATE);
+ if (it) {
+ item_unlink(it);
+ STORAGE_delete(t->storage, it);
+ item_remove(it);
+ }
+ }
+
+ return;
+ }
+ ITEM_set_cas(it, req_cas_id);
+
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ // complete_nread_proxy() does the data chunk check so all we need to do
+ // is copy the data.
+ if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
+ pout_string(resp, "SERVER_ERROR out of memory storing object");
+ item_remove(it);
+ return;
+ }
+
+ int ret = store_item(it, comm, t, NULL, CAS_NO_STALE);
+ switch (ret) {
+ case STORED:
+ pout_string(resp, "STORED");
+ break;
+ case EXISTS:
+ pout_string(resp, "EXISTS");
+ break;
+ case NOT_FOUND:
+ pout_string(resp, "NOT_FOUND");
+ break;
+ case NOT_STORED:
+ pout_string(resp, "NOT_STORED");
+ break;
+ default:
+ pout_string(resp, "SERVER_ERROR Unhandled storage type.");
+ }
+
+}
+
+static void process_arithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp, const bool incr) {
+ char temp[INCR_MAX_STORAGE_LEN];
+ uint64_t delta;
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+
+ assert(t != NULL);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (!safe_strtoull(&pr->request[pr->tokens[2]], &delta)) {
+ pout_string(resp, "CLIENT_ERROR invalid numeric delta argument");
+ return;
+ }
+
+ switch(add_delta(t, key, nkey, incr, delta, temp, NULL)) {
+ case OK:
+ pout_string(resp, temp);
+ break;
+ case NON_NUMERIC:
+ pout_string(resp, "CLIENT_ERROR cannot increment or decrement non-numeric value");
+ break;
+ case EOM:
+ pout_string(resp, "SERVER_ERROR out of memory");
+ break;
+ case DELTA_ITEM_NOT_FOUND:
+ pthread_mutex_lock(&t->stats.mutex);
+ if (incr) {
+ t->stats.incr_misses++;
+ } else {
+ t->stats.decr_misses++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ pout_string(resp, "NOT_FOUND");
+ break;
+ case DELTA_ITEM_CAS_MISMATCH:
+ break; /* Should never get here */
+ }
+}
+
+static void process_delete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ item *it;
+ uint32_t hv;
+
+ assert(t != NULL);
+
+ // NOTE: removed a compatibility bodge from a decade ago.
+ // delete used to take a "delay" argument, which was removed, but some
+ // ancient php clients always sent a 0 argument, which would then fail.
+ // It's been long enough that I don't want to carry this forward into the
+ // new parser.
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
+ if (it) {
+ MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
+
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ do_item_unlink(it, hv);
+ STORAGE_delete(t->storage, it);
+ do_item_remove(it); /* release our reference */
+ pout_string(resp, "DELETED");
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.delete_misses++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ pout_string(resp, "NOT_FOUND");
+ }
+ item_unlock(hv);
+}
+
+static void process_touch_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ int32_t exptime_int = 0;
+ rel_time_t exptime = 0;
+ item *it;
+
+ assert(t != NULL);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (!safe_strtol(&pr->request[pr->tokens[2]], &exptime_int)) {
+ pout_string(resp, "CLIENT_ERROR invalid exptime argument");
+ return;
+ }
+
+ exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
+ it = item_touch(key, nkey, exptime, t);
+ if (it) {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.touch_cmds++;
+ t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ pout_string(resp, "TOUCHED");
+ item_remove(it);
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.touch_cmds++;
+ t->stats.touch_misses++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ pout_string(resp, "NOT_FOUND");
+ }
+}
+
+/*** meta command handlers ***/
+
+// FIXME: macro or public interface, this is copypasted.
+static int _process_token_len(mcp_parser_t *pr, size_t token) {
+ const char *s = pr->request + pr->tokens[token];
+ const char *e = pr->request + pr->tokens[token+1];
+ // start of next token is after any space delimiters, so back those out.
+ while (*(e-1) == ' ') {
+ e--;
+ }
+ return e - s;
+}
+
+#define META_SPACE(p) { \
+ *p = ' '; \
+ p++; \
+}
+
+#define META_CHAR(p, c) { \
+ *p = ' '; \
+ *(p+1) = c; \
+ p += 2; \
+}
+
+// FIXME: binary key support.
+#define META_KEY(p, key, nkey, bin) { \
+ META_CHAR(p, 'k'); \
+ memcpy(p, key, nkey); \
+ p += nkey; \
+}
+
+#define MFLAG_MAX_OPT_LENGTH 20
+#define MFLAG_MAX_OPAQUE_LENGTH 32
+
+struct _meta_flags {
+ unsigned int has_error :1; // flipped if we found an error during parsing.
+ unsigned int no_update :1;
+ unsigned int locked :1;
+ unsigned int vivify :1;
+ unsigned int la :1;
+ unsigned int hit :1;
+ unsigned int value :1;
+ unsigned int set_stale :1;
+ unsigned int no_reply :1;
+ unsigned int has_cas :1;
+ unsigned int new_ttl :1;
+ unsigned int key_binary:1;
+ char mode; // single character mode switch, common to ms/ma
+ rel_time_t exptime;
+ rel_time_t autoviv_exptime;
+ rel_time_t recache_time;
+ uint32_t client_flags;
+ uint64_t req_cas_id;
+ uint64_t delta; // ma
+ uint64_t initial; // ma
+};
+
+static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start,
+ struct _meta_flags *of, char **errstr) {
+ unsigned int i;
+ //size_t ret;
+ int32_t tmp_int;
+ uint8_t seen[127] = {0};
+ // Start just past the key token. Look at first character of each token.
+ for (i = start; i < pr->ntokens; i++) {
+ uint8_t o = (uint8_t)pr->request[pr->tokens[i]];
+ // zero out repeat flags so we don't over-parse for return data.
+ if (o >= 127 || seen[o] != 0) {
+ *errstr = "CLIENT_ERROR duplicate flag";
+ return -1;
+ }
+ seen[o] = 1;
+ switch (o) {
+ // base64 decode the key in-place, as the binary should always be
+ // shorter and the conversion code buffers bytes.
+ // TODO: we need temporary space for the binary key decode since
+ // request should be const.
+ /*case 'b':
+ ret = base64_decode((unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length,
+ (unsigned char *)tokens[KEY_TOKEN].value, tokens[KEY_TOKEN].length);
+ if (ret == 0) {
+ // Failed to decode
+ *errstr = "CLIENT_ERROR error decoding key";
+ of->has_error = 1;
+ }
+ tokens[KEY_TOKEN].length = ret;
+ of->key_binary = 1;
+ break;*/
+ /* Negative exptimes can underflow and end up immortal. realtime() will
+ immediately expire values that are greater than REALTIME_MAXDELTA, but less
+ than process_started, so lets aim for that. */
+ case 'N':
+ of->locked = 1;
+ of->vivify = 1;
+ if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
+ *errstr = "CLIENT_ERROR bad token in command line format";
+ of->has_error = 1;
+ } else {
+ of->autoviv_exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
+ }
+ break;
+ case 'T':
+ of->locked = 1;
+ if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
+ *errstr = "CLIENT_ERROR bad token in command line format";
+ of->has_error = 1;
+ } else {
+ of->exptime = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
+ of->new_ttl = true;
+ }
+ break;
+ case 'R':
+ of->locked = 1;
+ if (!safe_strtol(&pr->request[pr->tokens[i]+1], &tmp_int)) {
+ *errstr = "CLIENT_ERROR bad token in command line format";
+ of->has_error = 1;
+ } else {
+ of->recache_time = realtime(EXPTIME_TO_POSITIVE_TIME(tmp_int));
+ }
+ break;
+ case 'l':
+ of->la = 1;
+ of->locked = 1; // need locked to delay LRU bump
+ break;
+ case 'O':
+ case 'P':
+ case 'L':
+ break;
+ case 'k': // known but no special handling
+ case 's':
+ case 't':
+ case 'c':
+ case 'f':
+ break;
+ case 'v':
+ of->value = 1;
+ break;
+ case 'h':
+ of->locked = 1; // need locked to delay LRU bump
+ break;
+ case 'u':
+ of->no_update = 1;
+ break;
+ case 'q':
+ of->no_reply = 1;
+ break;
+ // mset-related.
+ case 'F':
+ if (!safe_strtoul(&pr->request[pr->tokens[i]+1], &of->client_flags)) {
+ of->has_error = true;
+ }
+ break;
+ case 'C': // mset, mdelete, marithmetic
+ if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->req_cas_id)) {
+ *errstr = "CLIENT_ERROR bad token in command line format";
+ of->has_error = true;
+ } else {
+ of->has_cas = true;
+ }
+ break;
+ case 'M': // mset and marithmetic mode switch
+ // FIXME: this used to error if the token isn't a single byte.
+ // It probably should still?
+ of->mode = pr->request[pr->tokens[i]];
+ break;
+ case 'J': // marithmetic initial value
+ if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->initial)) {
+ *errstr = "CLIENT_ERROR invalid numeric initial value";
+ of->has_error = 1;
+ }
+ break;
+ case 'D': // marithmetic delta value
+ if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->delta)) {
+ *errstr = "CLIENT_ERROR invalid numeric delta value";
+ of->has_error = 1;
+ }
+ break;
+ case 'I':
+ of->set_stale = 1;
+ break;
+ default: // unknown flag, bail.
+ *errstr = "CLIENT_ERROR invalid flag";
+ return -1;
+ }
+ }
+
+ return of->has_error ? -1 : 0;
+}
+
+static void process_mget_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ item *it;
+ unsigned int i = 0;
+ struct _meta_flags of = {0}; // option bitflags.
+ uint32_t hv; // cached hash value for unlocking an item.
+ bool failed = false;
+ bool item_created = false;
+ bool won_token = false;
+ bool ttl_set = false;
+ char *errstr = "CLIENT_ERROR bad command line format";
+ assert(t != NULL);
+ char *p = resp->wbuf;
+ int tlen = 0;
+
+ // FIXME: still needed?
+ //WANT_TOKENS_MIN(ntokens, 3);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
+ // TODO: ensure the command tokenizer gives us at least this many
+ pout_errstring(resp, "CLIENT_ERROR options flags are too long");
+ return;
+ }
+
+ // scrubs duplicated options and sets flags for how to load the item.
+ // we pass in the first token that should be a flag.
+ if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
+ pout_errstring(resp, errstr);
+ return;
+ }
+
+ bool overflow = false;
+ if (!of.locked) {
+ it = limited_get(key, nkey, t, 0, false, !of.no_update, &overflow);
+ } else {
+ // If we had to lock the item, we're doing our own bump later.
+ it = limited_get_locked(key, nkey, t, DONT_UPDATE, &hv, &overflow);
+ }
+
+ // Since we're a new protocol, we can actually inform users that refcount
+ // overflow is happening by straight up throwing an error.
+ // We definitely don't want to re-autovivify by accident.
+ if (overflow) {
+ assert(it == NULL);
+ pout_errstring(resp, "SERVER_ERROR refcount overflow during fetch");
+ return;
+ }
+
+ if (it == NULL && of.vivify) {
+ // Fill in the exptime during parsing later.
+ it = item_alloc(key, nkey, 0, realtime(0), 2);
+ // We don't actually need any of do_store_item's logic:
+ // - already fetched and missed an existing item.
+ // - lock is still held.
+ // - not append/prepend/replace
+ // - not testing CAS
+ if (it != NULL) {
+ // I look forward to the day I get rid of this :)
+ memcpy(ITEM_data(it), "\r\n", 2);
+ // NOTE: This initializes the CAS value.
+ do_item_link(it, hv);
+ item_created = true;
+ }
+ }
+
+ // don't have to check result of add_iov() since the iov size defaults are
+ // enough.
+ if (it) {
+ if (of.value) {
+ memcpy(p, "VA ", 3);
+ p = itoa_u32(it->nbytes-2, p+3);
+ } else {
+ memcpy(p, "HD", 2);
+ p += 2;
+ }
+
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ case 'T':
+ ttl_set = true;
+ it->exptime = of.exptime;
+ break;
+ case 'N':
+ if (item_created) {
+ it->exptime = of.autoviv_exptime;
+ won_token = true;
+ }
+ break;
+ case 'R':
+ // If we haven't autovivified and supplied token is less
+ // than current TTL, mark a win.
+ if ((it->it_flags & ITEM_TOKEN_SENT) == 0
+ && !item_created
+ && it->exptime != 0
+ && it->exptime < of.recache_time) {
+ won_token = true;
+ }
+ break;
+ case 's':
+ META_CHAR(p, 's');
+ p = itoa_u32(it->nbytes-2, p);
+ break;
+ case 't':
+ // TTL remaining as of this request.
+ // needs to be relative because server clocks may not be in sync.
+ META_CHAR(p, 't');
+ if (it->exptime == 0) {
+ *p = '-';
+ *(p+1) = '1';
+ p += 2;
+ } else {
+ p = itoa_u32(it->exptime - current_time, p);
+ }
+ break;
+ case 'c':
+ META_CHAR(p, 'c');
+ p = itoa_u64(ITEM_get_cas(it), p);
+ break;
+ case 'f':
+ META_CHAR(p, 'f');
+ if (FLAGS_SIZE(it) == 0) {
+ *p = '0';
+ p++;
+ } else {
+ p = itoa_u32(*((uint32_t *) ITEM_suffix(it)), p);
+ }
+ break;
+ case 'l':
+ META_CHAR(p, 'l');
+ p = itoa_u32(current_time - it->time, p);
+ break;
+ case 'h':
+ META_CHAR(p, 'h');
+ if (it->it_flags & ITEM_FETCHED) {
+ *p = '1';
+ } else {
+ *p = '0';
+ }
+ p++;
+ break;
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ p += tlen;
+ break;
+ case 'k':
+ META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
+ break;
+ }
+ }
+
+ // Has this item already sent a token?
+ // Important to do this here so we don't send W with Z.
+ // Isn't critical, but easier for client authors to understand.
+ if (it->it_flags & ITEM_TOKEN_SENT) {
+ META_CHAR(p, 'Z');
+ }
+ if (it->it_flags & ITEM_STALE) {
+ META_CHAR(p, 'X');
+ // FIXME: think hard about this. is this a default, or a flag?
+ if ((it->it_flags & ITEM_TOKEN_SENT) == 0) {
+ // If we're stale but no token already sent, now send one.
+ won_token = true;
+ }
+ }
+
+ if (won_token) {
+ // Mark a win into the flag buffer.
+ META_CHAR(p, 'W');
+ it->it_flags |= ITEM_TOKEN_SENT;
+ }
+
+ *p = '\r';
+ *(p+1) = '\n';
+ *(p+2) = '\0';
+ p += 2;
+ // finally, chain in the buffer.
+ resp_add_iov(resp, resp->wbuf, p - resp->wbuf);
+
+ if (of.value) {
+#ifdef EXTSTORE
+ if (it->it_flags & ITEM_HDR) {
+ if (proxy_storage_get(t, it, resp, PROXY_STORAGE_MG) != 0) {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.get_oom_extstore++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ failed = true;
+ }
+ } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ resp_add_iov(resp, ITEM_data(it), it->nbytes);
+ } else {
+ resp_add_chunked_iov(resp, it, it->nbytes);
+ }
+#else
+ if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ resp_add_iov(resp, ITEM_data(it), it->nbytes);
+ } else {
+ resp_add_chunked_iov(resp, it, it->nbytes);
+ }
+#endif
+ }
+
+ // need to hold the ref at least because of the key above.
+#ifdef EXTSTORE
+ if (!failed) {
+ if ((it->it_flags & ITEM_HDR) != 0 && of.value) {
+ // Only have extstore clean if header and returning value.
+ resp->item = NULL;
+ } else {
+ resp->item = it;
+ }
+ } else {
+ // Failed to set up extstore fetch.
+ if (of.locked) {
+ do_item_remove(it);
+ } else {
+ item_remove(it);
+ }
+ }
+#else
+ resp->item = it;
+#endif
+ } else {
+ failed = true;
+ }
+
+ if (of.locked) {
+ // Delayed bump so we could get fetched/last access time pre-update.
+ if (!of.no_update && it != NULL) {
+ do_item_bump(t, it, hv);
+ }
+ item_unlock(hv);
+ }
+
+ // we count this command as a normal one if we've gotten this far.
+ // TODO: for autovivify case, miss never happens. Is this okay?
+ if (!failed) {
+ pthread_mutex_lock(&t->stats.mutex);
+ if (ttl_set) {
+ t->stats.touch_cmds++;
+ t->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
+ } else {
+ t->stats.lru_hits[it->slabs_clsid]++;
+ t->stats.get_cmds++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ if (ttl_set) {
+ t->stats.touch_cmds++;
+ t->stats.touch_misses++;
+ } else {
+ t->stats.get_misses++;
+ t->stats.get_cmds++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ // This gets elided in noreply mode.
+ if (of.no_reply)
+ resp->skip = true;
+ memcpy(p, "EN", 2);
+ p += 2;
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ // TODO: macro perhaps?
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ p += tlen;
+ break;
+ case 'k':
+ META_KEY(p, key, nkey, of.key_binary);
+ break;
+ }
+ }
+ resp->wbytes = p - resp->wbuf;
+ memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
+ resp->wbytes += 2;
+ resp_add_iov(resp, resp->wbuf, resp->wbytes);
+ }
+ return;
+error:
+ if (it) {
+ do_item_remove(it);
+ if (of.locked) {
+ item_unlock(hv);
+ }
+ }
+ pout_errstring(resp, errstr);
+}
+
+static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+
+ item *it;
+ int i;
+ short comm = NREAD_SET;
+ struct _meta_flags of = {0}; // option bitflags.
+ char *errstr = "CLIENT_ERROR bad command line format";
+ uint32_t hv; // cached hash value.
+ int vlen = pr->vlen; // value from data line.
+ assert(t != NULL);
+ char *p = resp->wbuf;
+ int tlen = 0;
+
+ //WANT_TOKENS_MIN(ntokens, 3);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
+ // TODO: ensure the command tokenizer gives us at least this many
+ pout_errstring(resp, "CLIENT_ERROR options flags are too long");
+ return;
+ }
+
+ if (pr->ntokens == 3) {
+ pout_errstring(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ // We need to at least try to get the size to properly slurp bad bytes
+ // after an error.
+ // we pass in the first token that should be a flag.
+ if (_meta_flag_preparse(pr, 3, &of, &errstr) != 0) {
+ goto error;
+ }
+
+ // "mode switch" to alternative commands
+ switch (of.mode) {
+ case 0:
+ break; // no mode supplied.
+ case 'E': // Add...
+ comm = NREAD_ADD;
+ break;
+ case 'A': // Append.
+ comm = NREAD_APPEND;
+ break;
+ case 'P': // Prepend.
+ comm = NREAD_PREPEND;
+ break;
+ case 'R': // Replace.
+ comm = NREAD_REPLACE;
+ break;
+ case 'S': // Set. Default.
+ comm = NREAD_SET;
+ break;
+ default:
+ errstr = "CLIENT_ERROR invalid mode for ms M token";
+ goto error;
+ }
+
+ // The item storage function doesn't exactly map to mset.
+ // If a CAS value is supplied, upgrade default SET mode to CAS mode.
+ // Also allows REPLACE to work, as REPLACE + CAS works the same as CAS.
+ // add-with-cas works the same as add; but could only LRU bump if match..
+ // APPEND/PREPEND allow a simplified CAS check.
+ if (of.has_cas && (comm == NREAD_SET || comm == NREAD_REPLACE)) {
+ comm = NREAD_CAS;
+ }
+
+ it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen);
+
+ if (it == 0) {
+ if (! item_size_ok(nkey, of.client_flags, vlen)) {
+ errstr = "SERVER_ERROR object too large for cache";
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.store_too_large++;
+ pthread_mutex_unlock(&t->stats.mutex);
+ } else {
+ errstr = "SERVER_ERROR out of memory storing object";
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.store_no_memory++;
+ pthread_mutex_unlock(&t->stats.mutex);
+ }
+
+ /* Avoid stale data persisting in cache because we failed alloc. */
+ // NOTE: only if SET mode?
+ it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
+ if (it) {
+ do_item_unlink(it, hv);
+ STORAGE_delete(t->storage, it);
+ do_item_remove(it);
+ }
+ item_unlock(hv);
+
+ goto error;
+ }
+ ITEM_set_cas(it, of.req_cas_id);
+
+ // data should already be read into the request.
+
+ // Prevent printing back the key in meta commands as garbage.
+ if (of.key_binary) {
+ it->it_flags |= ITEM_KEY_BINARY;
+ }
+
+ bool set_stale = CAS_NO_STALE;
+ if (of.set_stale && comm == NREAD_CAS) {
+ set_stale = CAS_ALLOW_STALE;
+ }
+ resp->wbytes = p - resp->wbuf;
+
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ // complete_nread_proxy() does the data chunk check so all we need to do
+ // is copy the data.
+ if (_store_item_copy_from_buf(it, pr->vbuf, it->nbytes) != 0) {
+ pout_string(resp, "SERVER_ERROR out of memory storing object");
+ item_remove(it);
+ return;
+ }
+
+ uint64_t cas = 0;
+ int ret = store_item(it, comm, t, &cas, set_stale);
+ switch (ret) {
+ case STORED:
+ memcpy(p, "HD", 2);
+ // Only place noreply is used for meta cmds is a nominal response.
+ if (of.no_reply) {
+ resp->skip = true;
+ }
+ break;
+ case EXISTS:
+ memcpy(p, "EX", 2);
+ break;
+ case NOT_FOUND:
+ memcpy(p, "NF", 2);
+ break;
+ case NOT_STORED:
+ memcpy(p, "NS", 2);
+ break;
+ default:
+ pout_errstring(resp, "SERVER_ERROR Unhandled storage type.");
+ return;
+
+ }
+ p += 2;
+
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ p += tlen;
+ break;
+ case 'k':
+ META_KEY(p, ITEM_key(it), it->nkey, (it->it_flags & ITEM_KEY_BINARY));
+ break;
+ case 'c':
+ META_CHAR(p, 'c');
+ p = itoa_u64(cas, p);
+ break;
+ }
+ }
+
+ // We don't need to free pr->vbuf as that is owned by *rq
+ // either way, there's no c->item or resp->item reference right now.
+
+ memcpy(p, "\r\n", 2);
+ p += 2;
+ // we're offset into wbuf, but good convention to track wbytes.
+ resp->wbytes = p - resp->wbuf;
+ resp_add_iov(resp, resp->wbuf, resp->wbytes);
+
+ return;
+error:
+ // Note: no errors possible after the item was successfully allocated.
+ // So we're just looking at dumping error codes and returning.
+ pout_errstring(resp, errstr);
+}
+
+static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ item *it = NULL;
+ int i;
+ uint32_t hv;
+ struct _meta_flags of = {0}; // option bitflags.
+ char *errstr = "CLIENT_ERROR bad command line format";
+ assert(t != NULL);
+ // reserve bytes for status code
+ char *p = resp->wbuf + 2;
+ int tlen = 0;
+
+ //WANT_TOKENS_MIN(ntokens, 3);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
+ // TODO: ensure the command tokenizer gives us at least this many
+ pout_errstring(resp, "CLIENT_ERROR options flags are too long");
+ return;
+ }
+
+ // scrubs duplicated options and sets flags for how to load the item.
+ // we pass in the first token that should be a flag.
+ // FIXME: not using the preparse errstr?
+ if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
+ pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
+ return;
+ }
+
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ // TODO: macro perhaps?
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ p += tlen;
+ break;
+ case 'k':
+ META_KEY(p, key, nkey, of.key_binary);
+ break;
+ }
+ }
+
+ it = item_get_locked(key, nkey, t, DONT_UPDATE, &hv);
+ if (it) {
+ // allow only deleting/marking if a CAS value matches.
+ if (of.has_cas && ITEM_get_cas(it) != of.req_cas_id) {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.delete_misses++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ memcpy(resp->wbuf, "EX", 2);
+ goto cleanup;
+ }
+
+ // If we're to set this item as stale, we don't actually want to
+ // delete it. We mark the stale bit, bump CAS, and update exptime if
+ // we were supplied a new TTL.
+ if (of.set_stale) {
+ if (of.new_ttl) {
+ it->exptime = of.exptime;
+ }
+ it->it_flags |= ITEM_STALE;
+ // Also need to remove TOKEN_SENT, so next client can win.
+ it->it_flags &= ~ITEM_TOKEN_SENT;
+
+ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
+
+ // Clients can noreply nominal responses.
+ if (of.no_reply)
+ resp->skip = true;
+ memcpy(resp->wbuf, "HD", 2);
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ do_item_unlink(it, hv);
+ STORAGE_delete(t->storage, it);
+ if (of.no_reply)
+ resp->skip = true;
+ memcpy(resp->wbuf, "HD", 2);
+ }
+ goto cleanup;
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.delete_misses++;
+ pthread_mutex_unlock(&t->stats.mutex);
+
+ memcpy(resp->wbuf, "NF", 2);
+ goto cleanup;
+ }
+cleanup:
+ if (it) {
+ do_item_remove(it);
+ }
+ // Item is always returned locked, even if missing.
+ item_unlock(hv);
+ resp->wbytes = p - resp->wbuf;
+ memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
+ resp->wbytes += 2;
+ resp_add_iov(resp, resp->wbuf, resp->wbytes);
+ //conn_set_state(c, conn_new_cmd);
+ return;
+error:
+ pout_errstring(resp, errstr);
+}
+
+static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp) {
+ const char *key = &pr->request[pr->tokens[pr->keytoken]];
+ size_t nkey = pr->klen;
+ int i;
+ struct _meta_flags of = {0}; // option bitflags.
+ char *errstr = "CLIENT_ERROR bad command line format";
+ assert(t != NULL);
+ // no reservation (like del/set) since we post-process the status line.
+ char *p = resp->wbuf;
+ int tlen = 0;
+
+ // If no argument supplied, incr or decr by one.
+ of.delta = 1;
+ of.initial = 0; // redundant, for clarity.
+ bool incr = true; // default mode is to increment.
+ bool locked = false;
+ uint32_t hv = 0;
+ item *it = NULL; // item returned by do_add_delta.
+
+ //WANT_TOKENS_MIN(ntokens, 3);
+
+ if (nkey > KEY_MAX_LENGTH) {
+ pout_string(resp, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (pr->ntokens > MFLAG_MAX_OPT_LENGTH) {
+ // TODO: ensure the command tokenizer gives us at least this many
+ pout_errstring(resp, "CLIENT_ERROR options flags are too long");
+ return;
+ }
+
+ // scrubs duplicated options and sets flags for how to load the item.
+ // we pass in the first token that should be a flag.
+ if (_meta_flag_preparse(pr, 2, &of, &errstr) != 0) {
+ pout_errstring(resp, "CLIENT_ERROR invalid or duplicate flag");
+ return;
+ }
+ //c->noreply = of.no_reply;
+
+ // "mode switch" to alternative commands
+ switch (of.mode) {
+ case 0: // no switch supplied.
+ break;
+ case 'I': // Incr (default)
+ case '+':
+ incr = true;
+ break;
+ case 'D': // Decr.
+ case '-':
+ incr = false;
+ break;
+ default:
+ errstr = "CLIENT_ERROR invalid mode for ma M token";
+ goto error;
+ break;
+ }
+
+ // take hash value and manually lock item... hold lock during store phase
+ // on miss and avoid recalculating the hash multiple times.
+ hv = hash(key, nkey);
+ item_lock(hv);
+ locked = true;
+ char tmpbuf[INCR_MAX_STORAGE_LEN];
+
+ // return a referenced item if it exists, so we can modify it here, rather
+ // than adding even more parameters to do_add_delta.
+ bool item_created = false;
+ uint64_t cas = 0;
+ switch(do_add_delta(t, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) {
+ case OK:
+ //if (c->noreply)
+ // resp->skip = true;
+ // *it was filled, set the status below.
+ cas = ITEM_get_cas(it);
+ break;
+ case NON_NUMERIC:
+ errstr = "CLIENT_ERROR cannot increment or decrement non-numeric value";
+ goto error;
+ break;
+ case EOM:
+ errstr = "SERVER_ERROR out of memory";
+ goto error;
+ break;
+ case DELTA_ITEM_NOT_FOUND:
+ if (of.vivify) {
+ itoa_u64(of.initial, tmpbuf);
+ int vlen = strlen(tmpbuf);
+
+ it = item_alloc(key, nkey, 0, 0, vlen+2);
+ if (it != NULL) {
+ memcpy(ITEM_data(it), tmpbuf, vlen);
+ memcpy(ITEM_data(it) + vlen, "\r\n", 2);
+ if (do_store_item(it, NREAD_ADD, t, hv, &cas, CAS_NO_STALE)) {
+ item_created = true;
+ } else {
+ // Not sure how we can get here if we're holding the lock.
+ memcpy(resp->wbuf, "NS", 2);
+ }
+ } else {
+ errstr = "SERVER_ERROR Out of memory allocating new item";
+ goto error;
+ }
+ } else {
+ pthread_mutex_lock(&t->stats.mutex);
+ if (incr) {
+ t->stats.incr_misses++;
+ } else {
+ t->stats.decr_misses++;
+ }
+ pthread_mutex_unlock(&t->stats.mutex);
+ // won't have a valid it here.
+ memcpy(p, "NF", 2);
+ p += 2;
+ }
+ break;
+ case DELTA_ITEM_CAS_MISMATCH:
+ // also returns without a valid it.
+ memcpy(p, "EX", 2);
+ p += 2;
+ break;
+ }
+
+ // final loop
+ // allows building the response with information after vivifying from a
+ // miss, or returning a new CAS value after add_delta().
+ if (it) {
+ size_t vlen = strlen(tmpbuf);
+ if (of.value) {
+ memcpy(p, "VA ", 3);
+ p = itoa_u32(vlen, p+3);
+ } else {
+ memcpy(p, "HD", 2);
+ p += 2;
+ }
+
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ case 'c':
+ META_CHAR(p, 'c');
+ p = itoa_u64(cas, p);
+ break;
+ case 't':
+ META_CHAR(p, 't');
+ if (it->exptime == 0) {
+ *p = '-';
+ *(p+1) = '1';
+ p += 2;
+ } else {
+ p = itoa_u32(it->exptime - current_time, p);
+ }
+ break;
+ case 'T':
+ it->exptime = of.exptime;
+ break;
+ case 'N':
+ if (item_created) {
+ it->exptime = of.autoviv_exptime;
+ }
+ break;
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ break;
+ case 'k':
+ META_KEY(p, key, nkey, of.key_binary);
+ break;
+ }
+ }
+
+ if (of.value) {
+ *p = '\r';
+ *(p+1) = '\n';
+ p += 2;
+ memcpy(p, tmpbuf, vlen);
+ p += vlen;
+ }
+
+ do_item_remove(it);
+ } else {
+ // No item to handle. still need to return opaque/key tokens
+ for (i = pr->keytoken+1; i < pr->ntokens; i++) {
+ switch (pr->request[pr->tokens[i]]) {
+ case 'O':
+ tlen = _process_token_len(pr, i);
+ if (tlen > MFLAG_MAX_OPAQUE_LENGTH) {
+ errstr = "CLIENT_ERROR opaque token too long";
+ goto error;
+ }
+ META_SPACE(p);
+ memcpy(p, &pr->request[pr->tokens[i]], tlen);
+ break;
+ case 'k':
+ META_KEY(p, key, nkey, of.key_binary);
+ break;
+ }
+ }
+ }
+
+ item_unlock(hv);
+
+ resp->wbytes = p - resp->wbuf;
+ memcpy(resp->wbuf + resp->wbytes, "\r\n", 2);
+ resp->wbytes += 2;
+ resp_add_iov(resp, resp->wbuf, resp->wbytes);
+ return;
+error:
+ if (it != NULL)
+ do_item_remove(it);
+ if (locked)
+ item_unlock(hv);
+ pout_errstring(resp, errstr);
+}
+
+/*** Lua and internal handler ***/
+
+int mcplib_internal(lua_State *L) {
+ luaL_checkudata(L, 1, "mcp.request");
+ mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
+ memset(r, 0, sizeof(mcp_resp_t));
+ luaL_getmetatable(L, "mcp.response");
+ lua_setmetatable(L, -2);
+
+ lua_pushinteger(L, MCP_YIELD_LOCAL);
+ return lua_yield(L, 2);
+}
+
+// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
+// the text parser should eventually move to the new tokenizer and we can
+// merge all of this code together.
+int mcplib_internal_run(lua_State *L, conn *c, mc_resp *top_resp, int coro_ref) {
+ mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
+ mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
+ mc_resp *resp = resp_start_unlinked(c);
+ LIBEVENT_THREAD *t = c->thread;
+ mcp_parser_t *pr = &rq->pr;
+ if (resp == NULL) {
+ return -1;
+ }
+
+ // TODO: meta no-op isn't handled here. haven't decided how yet.
+ switch (rq->pr.command) {
+ case CMD_MG:
+ process_mget_cmd(t, pr, resp);
+ break;
+ case CMD_MS:
+ process_mset_cmd(t, pr, resp);
+ break;
+ case CMD_MD:
+ process_mdelete_cmd(t, pr, resp);
+ break;
+ case CMD_MA:
+ process_marithmetic_cmd(t, pr, resp);
+ break;
+ case CMD_GET:
+ process_get_cmd(t, pr, resp, _NO_CAS, _NO_TOUCH);
+ break;
+ case CMD_GETS:
+ process_get_cmd(t, pr, resp, _DO_CAS, _NO_TOUCH);
+ break;
+ case CMD_GAT:
+ process_get_cmd(t, pr, resp, _NO_CAS, _DO_TOUCH);
+ break;
+ case CMD_GATS:
+ process_get_cmd(t, pr, resp, _DO_CAS, _DO_TOUCH);
+ break;
+ case CMD_SET:
+ process_update_cmd(t, pr, resp, NREAD_SET, _NO_CAS);
+ break;
+ case CMD_ADD:
+ process_update_cmd(t, pr, resp, NREAD_ADD, _NO_CAS);
+ break;
+ case CMD_APPEND:
+ process_update_cmd(t, pr, resp, NREAD_APPEND, _NO_CAS);
+ break;
+ case CMD_PREPEND:
+ process_update_cmd(t, pr, resp, NREAD_PREPEND, _NO_CAS);
+ break;
+ case CMD_CAS:
+ process_update_cmd(t, pr, resp, NREAD_CAS, _DO_CAS);
+ break;
+ case CMD_REPLACE:
+ process_update_cmd(t, pr, resp, NREAD_REPLACE, _DO_CAS);
+ break;
+ case CMD_INCR:
+ process_arithmetic_cmd(t, pr, resp, true);
+ break;
+ case CMD_DECR:
+ process_arithmetic_cmd(t, pr, resp, false);
+ break;
+ case CMD_DELETE:
+ process_delete_cmd(t, pr, resp);
+ break;
+ case CMD_TOUCH:
+ process_touch_cmd(t, pr, resp);
+ break;
+ default:
+ resp_free(t, resp);
+ return -1;
+ }
+
+ // TODO: I'd like to shortcut the parsing here, but if we want the resp
+ // object to have full support (ie: resp:line()/etc) it might be necessary
+ // to still do a full parsing. It might be possible to
+ // wrap the main commands with something that decorates r->resp directly
+ // instead of going through a parser to save some CPU.
+ // Either way this is a lot less code.
+ mcmc_bare_parse_buf(resp->iov[0].iov_base, resp->iov[0].iov_len, &r->resp);
+
+ // in case someone logs this response it should make sense.
+ memcpy(r->be_name, "internal", strlen("internal"));
+ memcpy(r->be_port, "0", 1);
+
+ // TODO: r-> will need status/code/mode copied from resp.
+ r->cresp = resp;
+ r->thread = c->thread;
+ r->cmd = rq->pr.command;
+ // Always return OK from here as this is signalling an internal error.
+ r->status = MCMC_OK;
+
+ if (resp->io_pending) {
+ // TODO (v2): here we move the IO from the temporary resp to the top
+ // resp, but this feels kludgy so I'm leaving an explicit note to find
+ // a better way to do this.
+ top_resp->io_pending = resp->io_pending;
+ resp->io_pending = NULL;
+
+ // Add io object to extstore submission queue.
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
+ io_pending_proxy_t *io = (io_pending_proxy_t *)top_resp->io_pending;
+
+ io->eio.next = q->stack_ctx;
+ q->stack_ctx = &io->eio;
+ assert(q->count >= 0);
+ q->count++;
+
+ io->coro_ref = coro_ref;
+ io->coro = L;
+ io->c = c;
+ // we need to associate the top level mc_resp here so the run routine
+ // can fill it in later.
+ io->resp = top_resp;
+ // mark the buffer into the mcp_resp for freeing later.
+ r->buf = io->eio.buf;
+ return 1;
+ }
+ return 0;
+}