diff options
-rw-r--r-- | doc/protocol.txt | 10 | ||||
-rw-r--r-- | logger.c | 4 | ||||
-rw-r--r-- | memcached.c | 12 | ||||
-rw-r--r-- | memcached.h | 6 | ||||
-rw-r--r-- | proto_bin.c | 4 | ||||
-rw-r--r-- | proto_text.c | 40 | ||||
-rw-r--r-- | proxy_internal.c | 34 | ||||
-rw-r--r-- | t/metaget.t | 37 | ||||
-rw-r--r-- | thread.c | 4 |
9 files changed, 128 insertions, 23 deletions
diff --git a/doc/protocol.txt b/doc/protocol.txt index acb837c..8a261ed 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -726,8 +726,10 @@ The flags used by the 'ms' command are: - k: return key as a token - O(token): opaque value, consumes a token and copies back with response - q: use noreply semantics for return codes +- s: return the size of the stored item on success (ie; new size on append) - T(token): Time-To-Live for item, see "Expiration" above. - M(token): mode switch to change behavior to add, replace, append, prepend +- N(token): if in append mode, autovivify on miss with supplied TTL The flags are now repeated with detailed information where useful: @@ -775,6 +777,14 @@ S: "set" command. The default mode, added for completeness. The "cas" command is supplanted by specifying the cas value with the 'C' flag. Append and Prepend modes will also respect a supplied cas value. +- N(token): if in append mode, autovivify on miss with supplied TTL + +Append and Prepend modes normally ignore the T argument, as they cannot create +a new item on a miss. If N is supplied, and append reaches a miss, it will +create a new item seeded with the data from the append command. It uses the +TTL from N instead of T to be consistent with the usage of N in other +commands. + Meta Delete ----------- @@ -206,9 +206,9 @@ static int _logger_parse_ise(logentry *e, char *scratch) { const char * const status_map[] = { "not_stored", "stored", "exists", "not_found", "too_large", "no_memory" }; const char * const cmd_map[] = { - "null", "add", "set", "replace", "append", "prepend", "cas" }; + "null", "add", "set", "replace", "append", "prepend", "cas", "append", "prepend" }; - if (le->cmd <= 6) + if (le->cmd <= 8) cmd = cmd_map[le->cmd]; uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH); diff --git a/memcached.c b/memcached.c index 15bbf36..8e8a180 100644 --- a/memcached.c +++ b/memcached.c @@ -1532,7 +1532,7 @@ static int _store_item_copy_chunks(item *d_it, item *s_it, const int len) { } static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) { - if (comm == NREAD_APPEND) { + if (comm == NREAD_APPEND || comm == NREAD_APPENDVIV) { if (new_it->it_flags & ITEM_CHUNKED) { if (_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2) == -1 || _store_item_copy_chunks(new_it, add_it, add_it->nbytes) == -1) { @@ -1563,7 +1563,7 @@ static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add * * Returns the state of storage. */ -enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale) { +enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale) { char *key = ITEM_key(it); item *old_it = do_item_get(key, it->nkey, hv, t, DONT_UPDATE); enum store_item_type stored = NOT_STORED; @@ -1636,6 +1636,8 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const break; case NREAD_APPEND: case NREAD_PREPEND: + case NREAD_APPENDVIV: + case NREAD_PREPENDVIV: if (cas_res != CAS_NONE && cas_res != CAS_MATCH) { stored = EXISTS; break; @@ -1663,6 +1665,10 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const // it's original ref is managed outside of this function it = new_it; do_store = true; + // Upstream final object size for meta + if (nbytes != NULL) { + *nbytes = it->nbytes; + } } break; case NREAD_REPLACE: @@ -1692,6 +1698,8 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const switch (comm) { case NREAD_ADD: case NREAD_SET: + case NREAD_APPENDVIV: + case NREAD_PREPENDVIV: do_store = true; break; case NREAD_CAS: diff --git a/memcached.h b/memcached.h index 9bd80b0..7ebe65f 100644 --- a/memcached.h +++ b/memcached.h @@ -266,6 +266,8 @@ enum close_reasons { #define NREAD_APPEND 4 #define NREAD_PREPEND 5 #define NREAD_CAS 6 +#define NREAD_APPENDVIV 7 // specific to meta +#define NREAD_PREPENDVIV 8 // specific to meta #define CAS_ALLOW_STALE true #define CAS_NO_STALE false @@ -909,7 +911,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, const int64_t delta, char *buf, uint64_t *cas, const uint32_t hv, item **it_ret); -enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale); +enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale); void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb); void conn_io_queue_setup(conn *c); io_queue_t *conn_io_queue_get(conn *c, int type); @@ -992,7 +994,7 @@ LIBEVENT_THREAD *get_worker_thread(int id); void append_stat(const char *name, ADD_STAT add_stats, conn *c, const char *fmt, ...); -enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale); +enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, bool cas_stale); /* Protocol related code */ void out_string(conn *c, const char *str); diff --git a/proto_bin.c b/proto_bin.c index ef82de0..875afa6 100644 --- a/proto_bin.c +++ b/proto_bin.c @@ -328,7 +328,7 @@ static void complete_incr_bin(conn *c, char *extbuf) { memcpy(ITEM_data(it) + res, "\r\n", 2); c->thread->cur_sfd = c->sfd; // for store_item logging. - if (store_item(it, NREAD_ADD, c->thread, &cas, CAS_NO_STALE)) { + if (store_item(it, NREAD_ADD, c->thread, NULL, &cas, CAS_NO_STALE)) { c->cas = cas; write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); } else { @@ -386,7 +386,7 @@ static void complete_update_bin(conn *c) { uint64_t cas = 0; c->thread->cur_sfd = c->sfd; // for store_item logging. - ret = store_item(it, c->cmd, c->thread, &cas, CAS_NO_STALE); + ret = store_item(it, c->cmd, c->thread, NULL, &cas, CAS_NO_STALE); c->cas = cas; #ifdef ENABLE_DTRACE diff --git a/proto_text.c b/proto_text.c index d316792..1f69ddc 100644 --- a/proto_text.c +++ b/proto_text.c @@ -51,7 +51,7 @@ typedef struct token_s { size_t length; } token_t; -static void _finalize_mset(conn *c, enum store_item_type ret) { +static void _finalize_mset(conn *c, int nbytes, enum store_item_type ret) { mc_resp *resp = c->resp; item *it = c->item; conn_set_state(c, conn_new_cmd); @@ -105,6 +105,16 @@ static void _finalize_mset(conn *c, enum store_item_type ret) { META_CHAR(p, 'c'); p = itoa_u64(c->cas, p); break; + case 's': + // Get final item size, ie from append/prepend + META_CHAR(p, 's'); + // If the size changed during append/prepend + if (nbytes != 0) { + p = itoa_u32(nbytes-2, p); + } else { + p = itoa_u32(it->nbytes-2, p); + } + break; default: break; } @@ -128,6 +138,7 @@ void complete_nread_ascii(conn *c) { int comm = c->cmd; enum store_item_type ret; bool is_valid = false; + int nbytes = 0; pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; @@ -170,7 +181,7 @@ void complete_nread_ascii(conn *c) { } else { uint64_t cas = 0; c->thread->cur_sfd = c->sfd; // cuddle sfd for logging. - ret = store_item(it, comm, c->thread, &cas, c->set_stale); + ret = store_item(it, comm, c->thread, &nbytes, &cas, c->set_stale); #ifdef ENABLE_DTRACE switch (c->cmd) { @@ -203,7 +214,7 @@ void complete_nread_ascii(conn *c) { if (c->mset_res) { c->cas = cas; - _finalize_mset(c, ret); + _finalize_mset(c, nbytes, ret); } else { switch (ret) { case STORED: @@ -1379,6 +1390,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) assert(c != NULL); mc_resp *resp = c->resp; char *p = resp->wbuf; + rel_time_t exptime = 0; WANT_TOKENS_MIN(ntokens, 3); @@ -1427,6 +1439,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) c->noreply = of.no_reply; // Clear cas return value c->cas = 0; + exptime = of.exptime; bool has_error = false; for (i = KEY_TOKEN+1; i < ntokens-1; i++) { @@ -1448,6 +1461,9 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) case 'c': // need to set the cas value post-assignment. META_CHAR(p, 'c'); + case 's': + // get the final size post-fill + META_CHAR(p, 's'); break; } } @@ -1460,10 +1476,20 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) comm = NREAD_ADD; break; case 'A': // Append. - comm = NREAD_APPEND; + if (of.vivify) { + comm = NREAD_APPENDVIV; + exptime = of.autoviv_exptime; + } else { + comm = NREAD_APPEND; + } break; case 'P': // Prepend. - comm = NREAD_PREPEND; + if (of.vivify) { + comm = NREAD_PREPENDVIV; + exptime = of.autoviv_exptime; + } else { + comm = NREAD_PREPEND; + } break; case 'R': // Replace. comm = NREAD_REPLACE; @@ -1490,7 +1516,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) if (has_error) goto error; - it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen); + it = item_alloc(key, nkey, of.client_flags, exptime, vlen); if (it == 0) { enum store_item_type status; @@ -1784,7 +1810,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n if (it != NULL) { memcpy(ITEM_data(it), tmpbuf, vlen); memcpy(ITEM_data(it) + vlen, "\r\n", 2); - if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, CAS_NO_STALE)) { + if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, NULL, CAS_NO_STALE)) { item_created = true; } else { // Not sure how we can get here if we're holding the lock. diff --git a/proxy_internal.c b/proxy_internal.c index b826181..5484dc7 100644 --- a/proxy_internal.c +++ b/proxy_internal.c @@ -389,7 +389,7 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re return; } - int ret = store_item(it, comm, t, NULL, CAS_NO_STALE); + int ret = store_item(it, comm, t, NULL, NULL, CAS_NO_STALE); switch (ret) { case STORED: pout_string(resp, "STORED"); @@ -1046,6 +1046,7 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp assert(t != NULL); char *p = resp->wbuf; int tlen = 0; + rel_time_t exptime = 0; //WANT_TOKENS_MIN(ntokens, 3); @@ -1080,10 +1081,20 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp comm = NREAD_ADD; break; case 'A': // Append. - comm = NREAD_APPEND; + if (of.vivify) { + comm = NREAD_APPENDVIV; + exptime = of.autoviv_exptime; + } else { + comm = NREAD_APPEND; + } break; case 'P': // Prepend. - comm = NREAD_PREPEND; + if (of.vivify) { + comm = NREAD_PREPENDVIV; + exptime = of.autoviv_exptime; + } else { + comm = NREAD_PREPEND; + } break; case 'R': // Replace. comm = NREAD_REPLACE; @@ -1105,7 +1116,7 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp comm = NREAD_CAS; } - it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen); + it = item_alloc(key, nkey, of.client_flags, exptime, vlen); if (it == 0) { if (! item_size_ok(nkey, of.client_flags, vlen)) { @@ -1160,7 +1171,8 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp } uint64_t cas = 0; - int ret = store_item(it, comm, t, &cas, set_stale); + int nbytes = 0; + int ret = store_item(it, comm, t, &nbytes, &cas, set_stale); switch (ret) { case STORED: memcpy(p, "HD", 2); @@ -1204,6 +1216,16 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp META_CHAR(p, 'c'); p = itoa_u64(cas, p); break; + case 's': + // Get final item size, ie from append/prepend + META_CHAR(p, 's'); + // If the size changed during append/prepend + if (nbytes != 0) { + p = itoa_u32(nbytes-2, p); + } else { + p = itoa_u32(it->nbytes-2, p); + } + break; } } @@ -1436,7 +1458,7 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res if (it != NULL) { memcpy(ITEM_data(it), tmpbuf, vlen); memcpy(ITEM_data(it) + vlen, "\r\n", 2); - if (do_store_item(it, NREAD_ADD, t, hv, &cas, CAS_NO_STALE)) { + if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas, CAS_NO_STALE)) { item_created = true; } else { // Not sure how we can get here if we're holding the lock. diff --git a/t/metaget.t b/t/metaget.t index 6c306f3..0f2b799 100644 --- a/t/metaget.t +++ b/t/metaget.t @@ -329,6 +329,32 @@ my $sock = $server->sock; like(scalar <$sock>, qr/^CLIENT_ERROR /, "invalid mode"); } +# Append tests +{ + print $sock "ms appendcas 2 MA C5000 T30\r\nhi\r\n"; + is(scalar <$sock>, "NS\r\n", "ms append with bad cas"); + print $sock "ms appendcas 2 MA T30\r\nhi\r\n"; + is(scalar <$sock>, "NS\r\n", "ms append straight miss"); + print $sock "ms appendcas 2 T30 c\r\nho\r\n"; + my $res = <$sock>; + my $r = parse_res($res); + my $cas = get_flag($r, 'c'); + print $sock "ms appendcas 2 MA C$cas T30\r\nhi\r\n"; + is(scalar <$sock>, "HD\r\n", "ms append with good cas"); + + # Autovivify append. + print $sock "ms appendviv 2 MA N30\r\nmo\r\n"; + is(scalar <$sock>, "HD\r\n", "ms append with autovivify"); + mget_is({ sock => $sock, + flags => 's v', + eflags => 's2' }, + 'appendviv', 'mo', "retrieved autoviv append"); + + # Test full size on append. + print $sock "ms appendviv 2 MA N30 s\r\nko\r\n"; + is(scalar <$sock>, "HD s4\r\n", "got appended length"); +} + # lease-test, use two sockets? one socket should be fine, actually. # - get a win on autovivify # - get a loss on the same command @@ -776,6 +802,17 @@ sub mget_res { return \%r; } +sub parse_res { + my $resp = shift; + my %r = (); + if ($resp =~ m/^(\w\w)\s*([^\r]+)\r\n/gm) { + $r{status} = $1; + $r{flags} = $2; + } + + return \%r; +} + sub get_flag { my $res = shift; my $flag = shift; @@ -917,13 +917,13 @@ enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key, /* * Stores an item in the cache (high level, obeys set/add/replace semantics) */ -enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale) { +enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, bool cas_stale) { enum store_item_type ret; uint32_t hv; hv = hash(ITEM_key(item), item->nkey); item_lock(hv); - ret = do_store_item(item, comm, t, hv, cas, cas_stale); + ret = do_store_item(item, comm, t, hv, nbytes, cas, cas_stale); item_unlock(hv); return ret; } |