summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-03-08 17:30:53 -0800
committerdormando <dormando@rydia.net>2023-03-11 18:19:23 -0800
commitc04701654413719d4abd7645c6d7b3fba4255e85 (patch)
treed415716c5ce3f9290c473bd5e82df4884bd5ab1d
parentaef5b580a5b1528cd418433857adfd7a87f1b4e4 (diff)
downloadmemcached-c04701654413719d4abd7645c6d7b3fba4255e85.tar.gz
meta: N flag changes append/prepend. ms s flag.
Sending 's' flag to metaset now returns the size of the item stored. Useful if you want to know how large an append/prepended item now is. If the 'N' flag is supplied while in append/prepend mode, allows autovivifying (with exptime supplied from N) for append/prepend style keys that don't need headers created first.
-rw-r--r--doc/protocol.txt10
-rw-r--r--logger.c4
-rw-r--r--memcached.c12
-rw-r--r--memcached.h6
-rw-r--r--proto_bin.c4
-rw-r--r--proto_text.c40
-rw-r--r--proxy_internal.c34
-rw-r--r--t/metaget.t37
-rw-r--r--thread.c4
9 files changed, 128 insertions, 23 deletions
diff --git a/doc/protocol.txt b/doc/protocol.txt
index acb837c..8a261ed 100644
--- a/doc/protocol.txt
+++ b/doc/protocol.txt
@@ -726,8 +726,10 @@ The flags used by the 'ms' command are:
- k: return key as a token
- O(token): opaque value, consumes a token and copies back with response
- q: use noreply semantics for return codes
+- s: return the size of the stored item on success (ie; new size on append)
- T(token): Time-To-Live for item, see "Expiration" above.
- M(token): mode switch to change behavior to add, replace, append, prepend
+- N(token): if in append mode, autovivify on miss with supplied TTL
The flags are now repeated with detailed information where useful:
@@ -775,6 +777,14 @@ S: "set" command. The default mode, added for completeness.
The "cas" command is supplanted by specifying the cas value with the 'C' flag.
Append and Prepend modes will also respect a supplied cas value.
+- N(token): if in append mode, autovivify on miss with supplied TTL
+
+Append and Prepend modes normally ignore the T argument, as they cannot create
+a new item on a miss. If N is supplied, and append reaches a miss, it will
+create a new item seeded with the data from the append command. It uses the
+TTL from N instead of T to be consistent with the usage of N in other
+commands.
+
Meta Delete
-----------
diff --git a/logger.c b/logger.c
index ce97d26..3f483ed 100644
--- a/logger.c
+++ b/logger.c
@@ -206,9 +206,9 @@ static int _logger_parse_ise(logentry *e, char *scratch) {
const char * const status_map[] = {
"not_stored", "stored", "exists", "not_found", "too_large", "no_memory" };
const char * const cmd_map[] = {
- "null", "add", "set", "replace", "append", "prepend", "cas" };
+ "null", "add", "set", "replace", "append", "prepend", "cas", "append", "prepend" };
- if (le->cmd <= 6)
+ if (le->cmd <= 8)
cmd = cmd_map[le->cmd];
uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
diff --git a/memcached.c b/memcached.c
index 15bbf36..8e8a180 100644
--- a/memcached.c
+++ b/memcached.c
@@ -1532,7 +1532,7 @@ static int _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
}
static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) {
- if (comm == NREAD_APPEND) {
+ if (comm == NREAD_APPEND || comm == NREAD_APPENDVIV) {
if (new_it->it_flags & ITEM_CHUNKED) {
if (_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2) == -1 ||
_store_item_copy_chunks(new_it, add_it, add_it->nbytes) == -1) {
@@ -1563,7 +1563,7 @@ static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add
*
* Returns the state of storage.
*/
-enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale) {
+enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv, t, DONT_UPDATE);
enum store_item_type stored = NOT_STORED;
@@ -1636,6 +1636,8 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const
break;
case NREAD_APPEND:
case NREAD_PREPEND:
+ case NREAD_APPENDVIV:
+ case NREAD_PREPENDVIV:
if (cas_res != CAS_NONE && cas_res != CAS_MATCH) {
stored = EXISTS;
break;
@@ -1663,6 +1665,10 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const
// it's original ref is managed outside of this function
it = new_it;
do_store = true;
+ // Upstream final object size for meta
+ if (nbytes != NULL) {
+ *nbytes = it->nbytes;
+ }
}
break;
case NREAD_REPLACE:
@@ -1692,6 +1698,8 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const
switch (comm) {
case NREAD_ADD:
case NREAD_SET:
+ case NREAD_APPENDVIV:
+ case NREAD_PREPENDVIV:
do_store = true;
break;
case NREAD_CAS:
diff --git a/memcached.h b/memcached.h
index 9bd80b0..7ebe65f 100644
--- a/memcached.h
+++ b/memcached.h
@@ -266,6 +266,8 @@ enum close_reasons {
#define NREAD_APPEND 4
#define NREAD_PREPEND 5
#define NREAD_CAS 6
+#define NREAD_APPENDVIV 7 // specific to meta
+#define NREAD_PREPENDVIV 8 // specific to meta
#define CAS_ALLOW_STALE true
#define CAS_NO_STALE false
@@ -909,7 +911,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
const int64_t delta, char *buf,
uint64_t *cas, const uint32_t hv,
item **it_ret);
-enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale);
+enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale);
void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb);
void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
@@ -992,7 +994,7 @@ LIBEVENT_THREAD *get_worker_thread(int id);
void append_stat(const char *name, ADD_STAT add_stats, conn *c,
const char *fmt, ...);
-enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale);
+enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, bool cas_stale);
/* Protocol related code */
void out_string(conn *c, const char *str);
diff --git a/proto_bin.c b/proto_bin.c
index ef82de0..875afa6 100644
--- a/proto_bin.c
+++ b/proto_bin.c
@@ -328,7 +328,7 @@ static void complete_incr_bin(conn *c, char *extbuf) {
memcpy(ITEM_data(it) + res, "\r\n", 2);
c->thread->cur_sfd = c->sfd; // for store_item logging.
- if (store_item(it, NREAD_ADD, c->thread, &cas, CAS_NO_STALE)) {
+ if (store_item(it, NREAD_ADD, c->thread, NULL, &cas, CAS_NO_STALE)) {
c->cas = cas;
write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
} else {
@@ -386,7 +386,7 @@ static void complete_update_bin(conn *c) {
uint64_t cas = 0;
c->thread->cur_sfd = c->sfd; // for store_item logging.
- ret = store_item(it, c->cmd, c->thread, &cas, CAS_NO_STALE);
+ ret = store_item(it, c->cmd, c->thread, NULL, &cas, CAS_NO_STALE);
c->cas = cas;
#ifdef ENABLE_DTRACE
diff --git a/proto_text.c b/proto_text.c
index d316792..1f69ddc 100644
--- a/proto_text.c
+++ b/proto_text.c
@@ -51,7 +51,7 @@ typedef struct token_s {
size_t length;
} token_t;
-static void _finalize_mset(conn *c, enum store_item_type ret) {
+static void _finalize_mset(conn *c, int nbytes, enum store_item_type ret) {
mc_resp *resp = c->resp;
item *it = c->item;
conn_set_state(c, conn_new_cmd);
@@ -105,6 +105,16 @@ static void _finalize_mset(conn *c, enum store_item_type ret) {
META_CHAR(p, 'c');
p = itoa_u64(c->cas, p);
break;
+ case 's':
+ // Get final item size, ie from append/prepend
+ META_CHAR(p, 's');
+ // If the size changed during append/prepend
+ if (nbytes != 0) {
+ p = itoa_u32(nbytes-2, p);
+ } else {
+ p = itoa_u32(it->nbytes-2, p);
+ }
+ break;
default:
break;
}
@@ -128,6 +138,7 @@ void complete_nread_ascii(conn *c) {
int comm = c->cmd;
enum store_item_type ret;
bool is_valid = false;
+ int nbytes = 0;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
@@ -170,7 +181,7 @@ void complete_nread_ascii(conn *c) {
} else {
uint64_t cas = 0;
c->thread->cur_sfd = c->sfd; // cuddle sfd for logging.
- ret = store_item(it, comm, c->thread, &cas, c->set_stale);
+ ret = store_item(it, comm, c->thread, &nbytes, &cas, c->set_stale);
#ifdef ENABLE_DTRACE
switch (c->cmd) {
@@ -203,7 +214,7 @@ void complete_nread_ascii(conn *c) {
if (c->mset_res) {
c->cas = cas;
- _finalize_mset(c, ret);
+ _finalize_mset(c, nbytes, ret);
} else {
switch (ret) {
case STORED:
@@ -1379,6 +1390,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
assert(c != NULL);
mc_resp *resp = c->resp;
char *p = resp->wbuf;
+ rel_time_t exptime = 0;
WANT_TOKENS_MIN(ntokens, 3);
@@ -1427,6 +1439,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
c->noreply = of.no_reply;
// Clear cas return value
c->cas = 0;
+ exptime = of.exptime;
bool has_error = false;
for (i = KEY_TOKEN+1; i < ntokens-1; i++) {
@@ -1448,6 +1461,9 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
case 'c':
// need to set the cas value post-assignment.
META_CHAR(p, 'c');
+ case 's':
+ // get the final size post-fill
+ META_CHAR(p, 's');
break;
}
}
@@ -1460,10 +1476,20 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
comm = NREAD_ADD;
break;
case 'A': // Append.
- comm = NREAD_APPEND;
+ if (of.vivify) {
+ comm = NREAD_APPENDVIV;
+ exptime = of.autoviv_exptime;
+ } else {
+ comm = NREAD_APPEND;
+ }
break;
case 'P': // Prepend.
- comm = NREAD_PREPEND;
+ if (of.vivify) {
+ comm = NREAD_PREPENDVIV;
+ exptime = of.autoviv_exptime;
+ } else {
+ comm = NREAD_PREPEND;
+ }
break;
case 'R': // Replace.
comm = NREAD_REPLACE;
@@ -1490,7 +1516,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
if (has_error)
goto error;
- it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen);
+ it = item_alloc(key, nkey, of.client_flags, exptime, vlen);
if (it == 0) {
enum store_item_type status;
@@ -1784,7 +1810,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, vlen);
memcpy(ITEM_data(it) + vlen, "\r\n", 2);
- if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, CAS_NO_STALE)) {
+ if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, NULL, CAS_NO_STALE)) {
item_created = true;
} else {
// Not sure how we can get here if we're holding the lock.
diff --git a/proxy_internal.c b/proxy_internal.c
index b826181..5484dc7 100644
--- a/proxy_internal.c
+++ b/proxy_internal.c
@@ -389,7 +389,7 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
return;
}
- int ret = store_item(it, comm, t, NULL, CAS_NO_STALE);
+ int ret = store_item(it, comm, t, NULL, NULL, CAS_NO_STALE);
switch (ret) {
case STORED:
pout_string(resp, "STORED");
@@ -1046,6 +1046,7 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
assert(t != NULL);
char *p = resp->wbuf;
int tlen = 0;
+ rel_time_t exptime = 0;
//WANT_TOKENS_MIN(ntokens, 3);
@@ -1080,10 +1081,20 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
comm = NREAD_ADD;
break;
case 'A': // Append.
- comm = NREAD_APPEND;
+ if (of.vivify) {
+ comm = NREAD_APPENDVIV;
+ exptime = of.autoviv_exptime;
+ } else {
+ comm = NREAD_APPEND;
+ }
break;
case 'P': // Prepend.
- comm = NREAD_PREPEND;
+ if (of.vivify) {
+ comm = NREAD_PREPENDVIV;
+ exptime = of.autoviv_exptime;
+ } else {
+ comm = NREAD_PREPEND;
+ }
break;
case 'R': // Replace.
comm = NREAD_REPLACE;
@@ -1105,7 +1116,7 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
comm = NREAD_CAS;
}
- it = item_alloc(key, nkey, of.client_flags, of.exptime, vlen);
+ it = item_alloc(key, nkey, of.client_flags, exptime, vlen);
if (it == 0) {
if (! item_size_ok(nkey, of.client_flags, vlen)) {
@@ -1160,7 +1171,8 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
}
uint64_t cas = 0;
- int ret = store_item(it, comm, t, &cas, set_stale);
+ int nbytes = 0;
+ int ret = store_item(it, comm, t, &nbytes, &cas, set_stale);
switch (ret) {
case STORED:
memcpy(p, "HD", 2);
@@ -1204,6 +1216,16 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
META_CHAR(p, 'c');
p = itoa_u64(cas, p);
break;
+ case 's':
+ // Get final item size, ie from append/prepend
+ META_CHAR(p, 's');
+ // If the size changed during append/prepend
+ if (nbytes != 0) {
+ p = itoa_u32(nbytes-2, p);
+ } else {
+ p = itoa_u32(it->nbytes-2, p);
+ }
+ break;
}
}
@@ -1436,7 +1458,7 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, vlen);
memcpy(ITEM_data(it) + vlen, "\r\n", 2);
- if (do_store_item(it, NREAD_ADD, t, hv, &cas, CAS_NO_STALE)) {
+ if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas, CAS_NO_STALE)) {
item_created = true;
} else {
// Not sure how we can get here if we're holding the lock.
diff --git a/t/metaget.t b/t/metaget.t
index 6c306f3..0f2b799 100644
--- a/t/metaget.t
+++ b/t/metaget.t
@@ -329,6 +329,32 @@ my $sock = $server->sock;
like(scalar <$sock>, qr/^CLIENT_ERROR /, "invalid mode");
}
+# Append tests
+{
+ print $sock "ms appendcas 2 MA C5000 T30\r\nhi\r\n";
+ is(scalar <$sock>, "NS\r\n", "ms append with bad cas");
+ print $sock "ms appendcas 2 MA T30\r\nhi\r\n";
+ is(scalar <$sock>, "NS\r\n", "ms append straight miss");
+ print $sock "ms appendcas 2 T30 c\r\nho\r\n";
+ my $res = <$sock>;
+ my $r = parse_res($res);
+ my $cas = get_flag($r, 'c');
+ print $sock "ms appendcas 2 MA C$cas T30\r\nhi\r\n";
+ is(scalar <$sock>, "HD\r\n", "ms append with good cas");
+
+ # Autovivify append.
+ print $sock "ms appendviv 2 MA N30\r\nmo\r\n";
+ is(scalar <$sock>, "HD\r\n", "ms append with autovivify");
+ mget_is({ sock => $sock,
+ flags => 's v',
+ eflags => 's2' },
+ 'appendviv', 'mo', "retrieved autoviv append");
+
+ # Test full size on append.
+ print $sock "ms appendviv 2 MA N30 s\r\nko\r\n";
+ is(scalar <$sock>, "HD s4\r\n", "got appended length");
+}
+
# lease-test, use two sockets? one socket should be fine, actually.
# - get a win on autovivify
# - get a loss on the same command
@@ -776,6 +802,17 @@ sub mget_res {
return \%r;
}
+sub parse_res {
+ my $resp = shift;
+ my %r = ();
+ if ($resp =~ m/^(\w\w)\s*([^\r]+)\r\n/gm) {
+ $r{status} = $1;
+ $r{flags} = $2;
+ }
+
+ return \%r;
+}
+
sub get_flag {
my $res = shift;
my $flag = shift;
diff --git a/thread.c b/thread.c
index 3ca1da3..ee120fa 100644
--- a/thread.c
+++ b/thread.c
@@ -917,13 +917,13 @@ enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
-enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale) {
+enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, bool cas_stale) {
enum store_item_type ret;
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
- ret = do_store_item(item, comm, t, hv, cas, cas_stale);
+ ret = do_store_item(item, comm, t, hv, nbytes, cas, cas_stale);
item_unlock(hv);
return ret;
}