From d87f568a95ed1abe468188476fa5ed9288799223 Mon Sep 17 00:00:00 2001 From: dormando Date: Tue, 27 Sep 2011 00:57:06 -0700 Subject: Backport binary TOUCH/GAT/GATQ commands Taken from the 1.6 branch, partly written by Trond. I hope the CAS handling is correct. --- items.c | 8 +++++ items.h | 1 + memcached.c | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ memcached.h | 10 +++++- protocol_binary.h | 42 +++++++++++++++++++++++++ slabs.c | 3 +- t/binary.t | 43 +++++++++++++++++++++++++- t/stats.t | 2 +- testapp.c | 34 +++++++++++++++++++++ thread.c | 17 +++++++++++ trace.h | 2 ++ 11 files changed, 249 insertions(+), 4 deletions(-) diff --git a/items.c b/items.c index 12ffa29..288cdb8 100644 --- a/items.c +++ b/items.c @@ -531,6 +531,14 @@ item *do_item_get(const char *key, const size_t nkey) { return it; } +item *do_item_touch(const char *key, size_t nkey, uint32_t exptime) { + item *it = do_item_get(key, nkey); + if (it != NULL) { + it->exptime = exptime; + } + return it; +} + /** returns an item whether or not it's expired. */ item *do_item_get_nocheck(const char *key, const size_t nkey) { item *it = assoc_find(key, nkey); diff --git a/items.h b/items.h index fee5f79..447aeed 100644 --- a/items.h +++ b/items.h @@ -21,5 +21,6 @@ void do_item_flush_expired(void); item *do_item_get(const char *key, const size_t nkey); item *do_item_get_nocheck(const char *key, const size_t nkey); +item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime); void item_stats_reset(void); extern pthread_mutex_t cache_lock; diff --git a/memcached.c b/memcached.c index f52c32d..8ac4f51 100644 --- a/memcached.c +++ b/memcached.c @@ -166,6 +166,7 @@ static rel_time_t realtime(const time_t exptime) { static void stats_init(void) { stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0; stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0; + stats.touch_cmds = stats.touch_misses = 0; stats.curr_bytes = stats.listen_disabled_num = 0; stats.accepting_conns = true; /* assuming we start in this state. */ @@ -1178,6 +1179,78 @@ static void complete_update_bin(conn *c) { c->item = 0; } +static void process_bin_touch(conn *c) { + item *it; + + protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf; + char* key = binary_get_key(c); + size_t nkey = c->binary_header.request.keylen; + protocol_binary_request_touch *t = (void *)&c->binary_header; + uint32_t exptime = ntohl(t->message.body.expiration); + + if (settings.verbose > 1) { + int ii; + /* May be GAT/GATQ/etc */ + fprintf(stderr, "<%d TOUCH ", c->sfd); + for (ii = 0; ii < nkey; ++ii) { + fprintf(stderr, "%c", key[ii]); + } + fprintf(stderr, "\n"); + } + + it = item_touch(key, nkey, exptime); + + if (it) { + /* the length has two unnecessary bytes ("\r\n") */ + uint16_t keylen = 0; + uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2); + + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.touch_cmds++; + c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey, + it->nbytes, ITEM_get_cas(it)); + + if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) { + bodylen -= it->nbytes - 2; + } + + add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen); + rsp->message.header.response.cas = htonll(ITEM_get_cas(it)); + + // add the flags + rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10)); + add_iov(c, &rsp->message.body, sizeof(rsp->message.body)); + + /* Add the data minus the CRLF */ + if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) { + add_iov(c, ITEM_data(it), it->nbytes - 2); + } + conn_set_state(c, conn_mwrite); + /* Remember this command so we can garbage collect it later */ + c->item = it; + } else { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.touch_cmds++; + c->thread->stats.touch_misses++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0); + + if (c->noreply) { + conn_set_state(c, conn_new_cmd); + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0); + } + } + + if (settings.detail_enabled) { + stats_prefix_record_get(key, nkey, NULL != it); + } +} + static void process_bin_get(conn *c) { item *it; @@ -1736,6 +1809,9 @@ static void dispatch_bin_command(conn *c) { case PROTOCOL_BINARY_CMD_GETKQ: c->cmd = PROTOCOL_BINARY_CMD_GETK; break; + case PROTOCOL_BINARY_CMD_GATQ: + c->cmd = PROTOCOL_BINARY_CMD_GATQ; + break; default: c->noreply = false; } @@ -1837,6 +1913,15 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + case PROTOCOL_BINARY_CMD_TOUCH: + case PROTOCOL_BINARY_CMD_GAT: + case PROTOCOL_BINARY_CMD_GATQ: + if (extlen == 4 && keylen != 0) { + bin_read_key(c, bin_reading_touch_key, 4); + } else { + protocol_error = 1; + } + break; default: write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen); } @@ -2064,6 +2149,9 @@ static void complete_nread_binary(conn *c) { case bin_reading_get_key: process_bin_get(c); break; + case bin_reading_touch_key: + process_bin_touch(c); + break; case bin_reading_stat: process_bin_stat(c); break; @@ -2413,6 +2501,7 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds); APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds); APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds); + APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds); APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits); APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses); APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses); @@ -2424,6 +2513,8 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses); APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits); APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval); + APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits); + APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses); APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds); APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors); APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read); diff --git a/memcached.h b/memcached.h index 1146ecc..6aa0317 100644 --- a/memcached.h +++ b/memcached.h @@ -162,7 +162,8 @@ enum bin_substates { bin_reading_incr_header, bin_read_flush_exptime, bin_reading_sasl_auth, - bin_reading_sasl_auth_data + bin_reading_sasl_auth_data, + bin_reading_touch_key, }; enum protocol { @@ -201,6 +202,7 @@ typedef unsigned int rel_time_t; struct slab_stats { uint64_t set_cmds; uint64_t get_hits; + uint64_t touch_hits; uint64_t delete_hits; uint64_t cas_hits; uint64_t cas_badval; @@ -215,6 +217,8 @@ struct thread_stats { pthread_mutex_t mutex; uint64_t get_cmds; uint64_t get_misses; + uint64_t touch_cmds; + uint64_t touch_misses; uint64_t delete_misses; uint64_t incr_misses; uint64_t decr_misses; @@ -241,8 +245,11 @@ struct stats { unsigned int conn_structs; uint64_t get_cmds; uint64_t set_cmds; + uint64_t touch_cmds; uint64_t get_hits; uint64_t get_misses; + uint64_t touch_hits; + uint64_t touch_misses; uint64_t evictions; uint64_t reclaimed; time_t started; /* when the process was started */ @@ -476,6 +483,7 @@ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbyt char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes); void item_flush_expired(void); item *item_get(const char *key, const size_t nkey); +item *item_touch(const char *key, const size_t nkey, uint32_t exptime); int item_link(item *it); void item_remove(item *it); int item_replace(item *it, item *new_it); diff --git a/protocol_binary.h b/protocol_binary.h index aa5dcc4..4fd9df5 100644 --- a/protocol_binary.h +++ b/protocol_binary.h @@ -105,6 +105,9 @@ extern "C" PROTOCOL_BINARY_CMD_FLUSHQ = 0x18, PROTOCOL_BINARY_CMD_APPENDQ = 0x19, PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a, + PROTOCOL_BINARY_CMD_TOUCH = 0x1c, + PROTOCOL_BINARY_CMD_GAT = 0x1d, + PROTOCOL_BINARY_CMD_GATQ = 0x1e, PROTOCOL_BINARY_CMD_SASL_LIST_MECHS = 0x20, PROTOCOL_BINARY_CMD_SASL_AUTH = 0x21, @@ -381,6 +384,45 @@ extern "C" */ typedef protocol_binary_response_no_extras protocol_binary_response_stats; + /** + * Definition of the packet used by the touch command. + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + uint32_t expiration; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_touch; + + /** + * Definition of the packet returned from the touch command + */ + typedef protocol_binary_response_no_extras protocol_binary_response_touch; + + /** + * Definition of the packet used by the GAT(Q) command. + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + uint32_t expiration; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_gat; + + typedef protocol_binary_request_gat protocol_binary_request_gatq; + + /** + * Definition of the packet returned from the GAT(Q) + */ + typedef protocol_binary_response_get protocol_binary_response_gat; + typedef protocol_binary_response_get protocol_binary_response_gatq; + /** * Definition of a request for a range operation. * See http://code.google.com/p/memcached/wiki/RangeOps diff --git a/slabs.c b/slabs.c index 069317f..48fbdb9 100644 --- a/slabs.c +++ b/slabs.c @@ -377,7 +377,8 @@ static void do_slabs_stats(ADD_STAT add_stats, void *c) { (unsigned long long)thread_stats.slab_stats[i].cas_hits); APPEND_NUM_STAT(i, "cas_badval", "%llu", (unsigned long long)thread_stats.slab_stats[i].cas_badval); - + APPEND_NUM_STAT(i, "touch_hits", "%llu", + (unsigned long long)thread_stats.slab_stats[i].touch_hits); total++; } } diff --git a/t/binary.t b/t/binary.t index 68d8c00..349c5a9 100755 --- a/t/binary.t +++ b/t/binary.t @@ -2,7 +2,7 @@ use strict; use warnings; -use Test::More tests => 3376; +use Test::More tests => 3435; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; @@ -41,6 +41,9 @@ use constant CMD_QUITQ => 0x17; use constant CMD_FLUSHQ => 0x18; use constant CMD_APPENDQ => 0x19; use constant CMD_PREPENDQ => 0x1A; +use constant CMD_TOUCH => 0x1C; +use constant CMD_GAT => 0x1D; +use constant CMD_GATQ => 0x1E; # REQ and RES formats are divided even though they currently share # the same format, since they _could_ differ in the future. @@ -237,6 +240,23 @@ is($mc->decr("x", 211), 0, "Floor is zero"); } } +# diag "Touch commands"; +{ + $mc->flush; + $mc->set("totouch", "toast", 0, 1); + my $res = $mc->touch("totouch", 10); + sleep 2; + $check->("totouch", 0, "toast"); + + $mc->set("totouch", "toast2", 0, 1); + my ($flags, $val, $i) = $mc->gat("totouch", 10); + is($val, "toast2", "GAT returned correct value"); + sleep 2; + $check->("totouch", 0, "toast2"); + + # Test miss as well +} + # diag "Silent set."; $mc->silent_mutation(::CMD_SETQ, 'silentset', 'silentsetval'); @@ -681,6 +701,27 @@ sub get_multi { return \%return; } +sub touch { + my $self = shift; + my ($key, $expire) = @_; + my $extra_header = pack "N", $expire; + my $cas = 0; + return $self->_do_command(::CMD_TOUCH, $key, '', $extra_header, $cas); +} + +sub gat { + my $self = shift; + my $key = shift; + my $expire = shift; + my $extra_header = pack "N", $expire; + my ($rv, $cas) = $self->_do_command(::CMD_GAT, $key, '', $extra_header); + + my $header = substr $rv, 0, 4, ''; + my $flags = unpack("N", $header); + + return ($flags, $rv, $cas); +} + sub version { my $self = shift; return $self->_do_command(::CMD_VERSION, '', ''); diff --git a/t/stats.t b/t/stats.t index de412cc..f47d6a4 100755 --- a/t/stats.t +++ b/t/stats.t @@ -58,7 +58,7 @@ my $sock = $server->sock; my $stats = mem_stats($sock); # Test number of keys -is(scalar(keys(%$stats)), 39, "39 stats values"); +is(scalar(keys(%$stats)), 42, "42 stats values"); # Test initial state foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses diff --git a/testapp.c b/testapp.c index 5d52d18..769a618 100644 --- a/testapp.c +++ b/testapp.c @@ -823,6 +823,33 @@ static off_t flush_command(char* buf, size_t bufsz, uint8_t cmd, uint32_t exptim return size; } + +static off_t touch_command(char* buf, + size_t bufsz, + uint8_t cmd, + const void* key, + size_t keylen, + uint32_t exptime) { + protocol_binary_request_touch *request = (void*)buf; + assert(bufsz > sizeof(*request)); + + memset(request, 0, sizeof(*request)); + request->message.header.request.magic = PROTOCOL_BINARY_REQ; + request->message.header.request.opcode = cmd; + + request->message.header.request.keylen = htons(keylen); + request->message.header.request.extlen = 4; + request->message.body.expiration = htonl(exptime); + request->message.header.request.bodylen = htonl(keylen + 4); + + request->message.header.request.opaque = 0xdeadbeef; + + off_t key_offset = sizeof(protocol_binary_request_no_extras) + 4; + + memcpy(buf + key_offset, key, keylen); + return sizeof(protocol_binary_request_no_extras) + 4 + keylen; +} + static off_t arithmetic_command(char* buf, size_t bufsz, uint8_t cmd, @@ -1674,6 +1701,13 @@ static enum test_return test_binary_pipeline_hickup_chunk(void *buffer, size_t b key, keylen, NULL, 0); break; + case PROTOCOL_BINARY_CMD_TOUCH: + case PROTOCOL_BINARY_CMD_GAT: + case PROTOCOL_BINARY_CMD_GATQ: + len = touch_command(command.bytes, sizeof(command.bytes), cmd, + key, keylen, 10); + break; + case PROTOCOL_BINARY_CMD_STAT: len = raw_command(command.bytes, sizeof(command.bytes), PROTOCOL_BINARY_CMD_STAT, diff --git a/thread.c b/thread.c index 2166821..904368f 100644 --- a/thread.c +++ b/thread.c @@ -346,6 +346,14 @@ item *item_get(const char *key, const size_t nkey) { return it; } +item *item_touch(const char *key, size_t nkey, uint32_t exptime) { + item *it; + pthread_mutex_lock(&cache_lock); + it = do_item_touch(key, nkey, exptime); + pthread_mutex_unlock(&cache_lock); + return it; +} + /* * Links an item into the LRU and hashtable. */ @@ -478,6 +486,8 @@ void threadlocal_stats_reset(void) { threads[ii].stats.get_cmds = 0; threads[ii].stats.get_misses = 0; + threads[ii].stats.touch_cmds = 0; + threads[ii].stats.touch_misses = 0; threads[ii].stats.delete_misses = 0; threads[ii].stats.incr_misses = 0; threads[ii].stats.decr_misses = 0; @@ -492,6 +502,7 @@ void threadlocal_stats_reset(void) { for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { threads[ii].stats.slab_stats[sid].set_cmds = 0; threads[ii].stats.slab_stats[sid].get_hits = 0; + threads[ii].stats.slab_stats[sid].touch_hits = 0; threads[ii].stats.slab_stats[sid].delete_hits = 0; threads[ii].stats.slab_stats[sid].incr_hits = 0; threads[ii].stats.slab_stats[sid].decr_hits = 0; @@ -515,6 +526,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) { stats->get_cmds += threads[ii].stats.get_cmds; stats->get_misses += threads[ii].stats.get_misses; + stats->touch_cmds += threads[ii].stats.touch_cmds; + stats->touch_misses += threads[ii].stats.touch_misses; stats->delete_misses += threads[ii].stats.delete_misses; stats->decr_misses += threads[ii].stats.decr_misses; stats->incr_misses += threads[ii].stats.incr_misses; @@ -531,6 +544,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) { threads[ii].stats.slab_stats[sid].set_cmds; stats->slab_stats[sid].get_hits += threads[ii].stats.slab_stats[sid].get_hits; + stats->slab_stats[sid].touch_hits += + threads[ii].stats.slab_stats[sid].touch_hits; stats->slab_stats[sid].delete_hits += threads[ii].stats.slab_stats[sid].delete_hits; stats->slab_stats[sid].decr_hits += @@ -552,6 +567,7 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) { out->set_cmds = 0; out->get_hits = 0; + out->touch_hits = 0; out->delete_hits = 0; out->incr_hits = 0; out->decr_hits = 0; @@ -561,6 +577,7 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) { for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { out->set_cmds += stats->slab_stats[sid].set_cmds; out->get_hits += stats->slab_stats[sid].get_hits; + out->touch_hits += stats->slab_stats[sid].touch_hits; out->delete_hits += stats->slab_stats[sid].delete_hits; out->decr_hits += stats->slab_stats[sid].decr_hits; out->incr_hits += stats->slab_stats[sid].incr_hits; diff --git a/trace.h b/trace.h index ced30a3..dc792a0 100644 --- a/trace.h +++ b/trace.h @@ -22,6 +22,8 @@ #define MEMCACHED_COMMAND_DELETE_ENABLED() (0) #define MEMCACHED_COMMAND_GET(arg0, arg1, arg2, arg3, arg4) #define MEMCACHED_COMMAND_GET_ENABLED() (0) +#define MEMCACHED_COMMAND_TOUCH(arg0, arg1, arg2, arg3, arg4) +#define MEMCACHED_COMMAND_TOUCH_ENABLED() (0) #define MEMCACHED_COMMAND_INCR(arg0, arg1, arg2, arg3) #define MEMCACHED_COMMAND_INCR_ENABLED() (0) #define MEMCACHED_COMMAND_PREPEND(arg0, arg1, arg2, arg3, arg4) -- cgit v1.2.1