summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2011-09-27 00:57:06 -0700
committerdormando <dormando@rydia.net>2011-09-27 00:57:06 -0700
commitd87f568a95ed1abe468188476fa5ed9288799223 (patch)
treeae6717e84ccc88d5869bf9c95b225c3f3eb80543
parent51c8f31fc709d06c479de88bdc5e14e757faabc5 (diff)
downloadmemcached-d87f568a95ed1abe468188476fa5ed9288799223.tar.gz
Backport binary TOUCH/GAT/GATQ commands
Taken from the 1.6 branch, partly written by Trond. I hope the CAS handling is correct.
-rw-r--r--items.c8
-rw-r--r--items.h1
-rw-r--r--memcached.c91
-rw-r--r--memcached.h10
-rw-r--r--protocol_binary.h42
-rw-r--r--slabs.c3
-rwxr-xr-xt/binary.t43
-rwxr-xr-xt/stats.t2
-rw-r--r--testapp.c34
-rw-r--r--thread.c17
-rw-r--r--trace.h2
11 files changed, 249 insertions, 4 deletions
diff --git a/items.c b/items.c
index 12ffa29..288cdb8 100644
--- a/items.c
+++ b/items.c
@@ -531,6 +531,14 @@ item *do_item_get(const char *key, const size_t nkey) {
return it;
}
+item *do_item_touch(const char *key, size_t nkey, uint32_t exptime) {
+ item *it = do_item_get(key, nkey);
+ if (it != NULL) {
+ it->exptime = exptime;
+ }
+ return it;
+}
+
/** returns an item whether or not it's expired. */
item *do_item_get_nocheck(const char *key, const size_t nkey) {
item *it = assoc_find(key, nkey);
diff --git a/items.h b/items.h
index fee5f79..447aeed 100644
--- a/items.h
+++ b/items.h
@@ -21,5 +21,6 @@ void do_item_flush_expired(void);
item *do_item_get(const char *key, const size_t nkey);
item *do_item_get_nocheck(const char *key, const size_t nkey);
+item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime);
void item_stats_reset(void);
extern pthread_mutex_t cache_lock;
diff --git a/memcached.c b/memcached.c
index f52c32d..8ac4f51 100644
--- a/memcached.c
+++ b/memcached.c
@@ -166,6 +166,7 @@ static rel_time_t realtime(const time_t exptime) {
static void stats_init(void) {
stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0;
+ stats.touch_cmds = stats.touch_misses = 0;
stats.curr_bytes = stats.listen_disabled_num = 0;
stats.accepting_conns = true; /* assuming we start in this state. */
@@ -1178,6 +1179,78 @@ static void complete_update_bin(conn *c) {
c->item = 0;
}
+static void process_bin_touch(conn *c) {
+ item *it;
+
+ protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
+ char* key = binary_get_key(c);
+ size_t nkey = c->binary_header.request.keylen;
+ protocol_binary_request_touch *t = (void *)&c->binary_header;
+ uint32_t exptime = ntohl(t->message.body.expiration);
+
+ if (settings.verbose > 1) {
+ int ii;
+ /* May be GAT/GATQ/etc */
+ fprintf(stderr, "<%d TOUCH ", c->sfd);
+ for (ii = 0; ii < nkey; ++ii) {
+ fprintf(stderr, "%c", key[ii]);
+ }
+ fprintf(stderr, "\n");
+ }
+
+ it = item_touch(key, nkey, exptime);
+
+ if (it) {
+ /* the length has two unnecessary bytes ("\r\n") */
+ uint16_t keylen = 0;
+ uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
+
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.touch_cmds++;
+ c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+
+ MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
+ it->nbytes, ITEM_get_cas(it));
+
+ if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
+ bodylen -= it->nbytes - 2;
+ }
+
+ add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
+ rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
+
+ // add the flags
+ rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
+ add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
+
+ /* Add the data minus the CRLF */
+ if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) {
+ add_iov(c, ITEM_data(it), it->nbytes - 2);
+ }
+ conn_set_state(c, conn_mwrite);
+ /* Remember this command so we can garbage collect it later */
+ c->item = it;
+ } else {
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.touch_cmds++;
+ c->thread->stats.touch_misses++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+
+ MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);
+
+ if (c->noreply) {
+ conn_set_state(c, conn_new_cmd);
+ } else {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
+ }
+ }
+
+ if (settings.detail_enabled) {
+ stats_prefix_record_get(key, nkey, NULL != it);
+ }
+}
+
static void process_bin_get(conn *c) {
item *it;
@@ -1736,6 +1809,9 @@ static void dispatch_bin_command(conn *c) {
case PROTOCOL_BINARY_CMD_GETKQ:
c->cmd = PROTOCOL_BINARY_CMD_GETK;
break;
+ case PROTOCOL_BINARY_CMD_GATQ:
+ c->cmd = PROTOCOL_BINARY_CMD_GATQ;
+ break;
default:
c->noreply = false;
}
@@ -1837,6 +1913,15 @@ static void dispatch_bin_command(conn *c) {
protocol_error = 1;
}
break;
+ case PROTOCOL_BINARY_CMD_TOUCH:
+ case PROTOCOL_BINARY_CMD_GAT:
+ case PROTOCOL_BINARY_CMD_GATQ:
+ if (extlen == 4 && keylen != 0) {
+ bin_read_key(c, bin_reading_touch_key, 4);
+ } else {
+ protocol_error = 1;
+ }
+ break;
default:
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
}
@@ -2064,6 +2149,9 @@ static void complete_nread_binary(conn *c) {
case bin_reading_get_key:
process_bin_get(c);
break;
+ case bin_reading_touch_key:
+ process_bin_touch(c);
+ break;
case bin_reading_stat:
process_bin_stat(c);
break;
@@ -2413,6 +2501,7 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
+ APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
@@ -2424,6 +2513,8 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
+ APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
+ APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
diff --git a/memcached.h b/memcached.h
index 1146ecc..6aa0317 100644
--- a/memcached.h
+++ b/memcached.h
@@ -162,7 +162,8 @@ enum bin_substates {
bin_reading_incr_header,
bin_read_flush_exptime,
bin_reading_sasl_auth,
- bin_reading_sasl_auth_data
+ bin_reading_sasl_auth_data,
+ bin_reading_touch_key,
};
enum protocol {
@@ -201,6 +202,7 @@ typedef unsigned int rel_time_t;
struct slab_stats {
uint64_t set_cmds;
uint64_t get_hits;
+ uint64_t touch_hits;
uint64_t delete_hits;
uint64_t cas_hits;
uint64_t cas_badval;
@@ -215,6 +217,8 @@ struct thread_stats {
pthread_mutex_t mutex;
uint64_t get_cmds;
uint64_t get_misses;
+ uint64_t touch_cmds;
+ uint64_t touch_misses;
uint64_t delete_misses;
uint64_t incr_misses;
uint64_t decr_misses;
@@ -241,8 +245,11 @@ struct stats {
unsigned int conn_structs;
uint64_t get_cmds;
uint64_t set_cmds;
+ uint64_t touch_cmds;
uint64_t get_hits;
uint64_t get_misses;
+ uint64_t touch_hits;
+ uint64_t touch_misses;
uint64_t evictions;
uint64_t reclaimed;
time_t started; /* when the process was started */
@@ -476,6 +483,7 @@ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbyt
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
void item_flush_expired(void);
item *item_get(const char *key, const size_t nkey);
+item *item_touch(const char *key, const size_t nkey, uint32_t exptime);
int item_link(item *it);
void item_remove(item *it);
int item_replace(item *it, item *new_it);
diff --git a/protocol_binary.h b/protocol_binary.h
index aa5dcc4..4fd9df5 100644
--- a/protocol_binary.h
+++ b/protocol_binary.h
@@ -105,6 +105,9 @@ extern "C"
PROTOCOL_BINARY_CMD_FLUSHQ = 0x18,
PROTOCOL_BINARY_CMD_APPENDQ = 0x19,
PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a,
+ PROTOCOL_BINARY_CMD_TOUCH = 0x1c,
+ PROTOCOL_BINARY_CMD_GAT = 0x1d,
+ PROTOCOL_BINARY_CMD_GATQ = 0x1e,
PROTOCOL_BINARY_CMD_SASL_LIST_MECHS = 0x20,
PROTOCOL_BINARY_CMD_SASL_AUTH = 0x21,
@@ -382,6 +385,45 @@ extern "C"
typedef protocol_binary_response_no_extras protocol_binary_response_stats;
/**
+ * Definition of the packet used by the touch command.
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
+ } protocol_binary_request_touch;
+
+ /**
+ * Definition of the packet returned from the touch command
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_touch;
+
+ /**
+ * Definition of the packet used by the GAT(Q) command.
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
+ } protocol_binary_request_gat;
+
+ typedef protocol_binary_request_gat protocol_binary_request_gatq;
+
+ /**
+ * Definition of the packet returned from the GAT(Q)
+ */
+ typedef protocol_binary_response_get protocol_binary_response_gat;
+ typedef protocol_binary_response_get protocol_binary_response_gatq;
+
+ /**
* Definition of a request for a range operation.
* See http://code.google.com/p/memcached/wiki/RangeOps
*
diff --git a/slabs.c b/slabs.c
index 069317f..48fbdb9 100644
--- a/slabs.c
+++ b/slabs.c
@@ -377,7 +377,8 @@ static void do_slabs_stats(ADD_STAT add_stats, void *c) {
(unsigned long long)thread_stats.slab_stats[i].cas_hits);
APPEND_NUM_STAT(i, "cas_badval", "%llu",
(unsigned long long)thread_stats.slab_stats[i].cas_badval);
-
+ APPEND_NUM_STAT(i, "touch_hits", "%llu",
+ (unsigned long long)thread_stats.slab_stats[i].touch_hits);
total++;
}
}
diff --git a/t/binary.t b/t/binary.t
index 68d8c00..349c5a9 100755
--- a/t/binary.t
+++ b/t/binary.t
@@ -2,7 +2,7 @@
use strict;
use warnings;
-use Test::More tests => 3376;
+use Test::More tests => 3435;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
@@ -41,6 +41,9 @@ use constant CMD_QUITQ => 0x17;
use constant CMD_FLUSHQ => 0x18;
use constant CMD_APPENDQ => 0x19;
use constant CMD_PREPENDQ => 0x1A;
+use constant CMD_TOUCH => 0x1C;
+use constant CMD_GAT => 0x1D;
+use constant CMD_GATQ => 0x1E;
# REQ and RES formats are divided even though they currently share
# the same format, since they _could_ differ in the future.
@@ -237,6 +240,23 @@ is($mc->decr("x", 211), 0, "Floor is zero");
}
}
+# diag "Touch commands";
+{
+ $mc->flush;
+ $mc->set("totouch", "toast", 0, 1);
+ my $res = $mc->touch("totouch", 10);
+ sleep 2;
+ $check->("totouch", 0, "toast");
+
+ $mc->set("totouch", "toast2", 0, 1);
+ my ($flags, $val, $i) = $mc->gat("totouch", 10);
+ is($val, "toast2", "GAT returned correct value");
+ sleep 2;
+ $check->("totouch", 0, "toast2");
+
+ # Test miss as well
+}
+
# diag "Silent set.";
$mc->silent_mutation(::CMD_SETQ, 'silentset', 'silentsetval');
@@ -681,6 +701,27 @@ sub get_multi {
return \%return;
}
+sub touch {
+ my $self = shift;
+ my ($key, $expire) = @_;
+ my $extra_header = pack "N", $expire;
+ my $cas = 0;
+ return $self->_do_command(::CMD_TOUCH, $key, '', $extra_header, $cas);
+}
+
+sub gat {
+ my $self = shift;
+ my $key = shift;
+ my $expire = shift;
+ my $extra_header = pack "N", $expire;
+ my ($rv, $cas) = $self->_do_command(::CMD_GAT, $key, '', $extra_header);
+
+ my $header = substr $rv, 0, 4, '';
+ my $flags = unpack("N", $header);
+
+ return ($flags, $rv, $cas);
+}
+
sub version {
my $self = shift;
return $self->_do_command(::CMD_VERSION, '', '');
diff --git a/t/stats.t b/t/stats.t
index de412cc..f47d6a4 100755
--- a/t/stats.t
+++ b/t/stats.t
@@ -58,7 +58,7 @@ my $sock = $server->sock;
my $stats = mem_stats($sock);
# Test number of keys
-is(scalar(keys(%$stats)), 39, "39 stats values");
+is(scalar(keys(%$stats)), 42, "42 stats values");
# Test initial state
foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
diff --git a/testapp.c b/testapp.c
index 5d52d18..769a618 100644
--- a/testapp.c
+++ b/testapp.c
@@ -823,6 +823,33 @@ static off_t flush_command(char* buf, size_t bufsz, uint8_t cmd, uint32_t exptim
return size;
}
+
+static off_t touch_command(char* buf,
+ size_t bufsz,
+ uint8_t cmd,
+ const void* key,
+ size_t keylen,
+ uint32_t exptime) {
+ protocol_binary_request_touch *request = (void*)buf;
+ assert(bufsz > sizeof(*request));
+
+ memset(request, 0, sizeof(*request));
+ request->message.header.request.magic = PROTOCOL_BINARY_REQ;
+ request->message.header.request.opcode = cmd;
+
+ request->message.header.request.keylen = htons(keylen);
+ request->message.header.request.extlen = 4;
+ request->message.body.expiration = htonl(exptime);
+ request->message.header.request.bodylen = htonl(keylen + 4);
+
+ request->message.header.request.opaque = 0xdeadbeef;
+
+ off_t key_offset = sizeof(protocol_binary_request_no_extras) + 4;
+
+ memcpy(buf + key_offset, key, keylen);
+ return sizeof(protocol_binary_request_no_extras) + 4 + keylen;
+}
+
static off_t arithmetic_command(char* buf,
size_t bufsz,
uint8_t cmd,
@@ -1674,6 +1701,13 @@ static enum test_return test_binary_pipeline_hickup_chunk(void *buffer, size_t b
key, keylen, NULL, 0);
break;
+ case PROTOCOL_BINARY_CMD_TOUCH:
+ case PROTOCOL_BINARY_CMD_GAT:
+ case PROTOCOL_BINARY_CMD_GATQ:
+ len = touch_command(command.bytes, sizeof(command.bytes), cmd,
+ key, keylen, 10);
+ break;
+
case PROTOCOL_BINARY_CMD_STAT:
len = raw_command(command.bytes, sizeof(command.bytes),
PROTOCOL_BINARY_CMD_STAT,
diff --git a/thread.c b/thread.c
index 2166821..904368f 100644
--- a/thread.c
+++ b/thread.c
@@ -346,6 +346,14 @@ item *item_get(const char *key, const size_t nkey) {
return it;
}
+item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
+ item *it;
+ pthread_mutex_lock(&cache_lock);
+ it = do_item_touch(key, nkey, exptime);
+ pthread_mutex_unlock(&cache_lock);
+ return it;
+}
+
/*
* Links an item into the LRU and hashtable.
*/
@@ -478,6 +486,8 @@ void threadlocal_stats_reset(void) {
threads[ii].stats.get_cmds = 0;
threads[ii].stats.get_misses = 0;
+ threads[ii].stats.touch_cmds = 0;
+ threads[ii].stats.touch_misses = 0;
threads[ii].stats.delete_misses = 0;
threads[ii].stats.incr_misses = 0;
threads[ii].stats.decr_misses = 0;
@@ -492,6 +502,7 @@ void threadlocal_stats_reset(void) {
for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
threads[ii].stats.slab_stats[sid].set_cmds = 0;
threads[ii].stats.slab_stats[sid].get_hits = 0;
+ threads[ii].stats.slab_stats[sid].touch_hits = 0;
threads[ii].stats.slab_stats[sid].delete_hits = 0;
threads[ii].stats.slab_stats[sid].incr_hits = 0;
threads[ii].stats.slab_stats[sid].decr_hits = 0;
@@ -515,6 +526,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) {
stats->get_cmds += threads[ii].stats.get_cmds;
stats->get_misses += threads[ii].stats.get_misses;
+ stats->touch_cmds += threads[ii].stats.touch_cmds;
+ stats->touch_misses += threads[ii].stats.touch_misses;
stats->delete_misses += threads[ii].stats.delete_misses;
stats->decr_misses += threads[ii].stats.decr_misses;
stats->incr_misses += threads[ii].stats.incr_misses;
@@ -531,6 +544,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) {
threads[ii].stats.slab_stats[sid].set_cmds;
stats->slab_stats[sid].get_hits +=
threads[ii].stats.slab_stats[sid].get_hits;
+ stats->slab_stats[sid].touch_hits +=
+ threads[ii].stats.slab_stats[sid].touch_hits;
stats->slab_stats[sid].delete_hits +=
threads[ii].stats.slab_stats[sid].delete_hits;
stats->slab_stats[sid].decr_hits +=
@@ -552,6 +567,7 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
out->set_cmds = 0;
out->get_hits = 0;
+ out->touch_hits = 0;
out->delete_hits = 0;
out->incr_hits = 0;
out->decr_hits = 0;
@@ -561,6 +577,7 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
out->set_cmds += stats->slab_stats[sid].set_cmds;
out->get_hits += stats->slab_stats[sid].get_hits;
+ out->touch_hits += stats->slab_stats[sid].touch_hits;
out->delete_hits += stats->slab_stats[sid].delete_hits;
out->decr_hits += stats->slab_stats[sid].decr_hits;
out->incr_hits += stats->slab_stats[sid].incr_hits;
diff --git a/trace.h b/trace.h
index ced30a3..dc792a0 100644
--- a/trace.h
+++ b/trace.h
@@ -22,6 +22,8 @@
#define MEMCACHED_COMMAND_DELETE_ENABLED() (0)
#define MEMCACHED_COMMAND_GET(arg0, arg1, arg2, arg3, arg4)
#define MEMCACHED_COMMAND_GET_ENABLED() (0)
+#define MEMCACHED_COMMAND_TOUCH(arg0, arg1, arg2, arg3, arg4)
+#define MEMCACHED_COMMAND_TOUCH_ENABLED() (0)
#define MEMCACHED_COMMAND_INCR(arg0, arg1, arg2, arg3)
#define MEMCACHED_COMMAND_INCR_ENABLED() (0)
#define MEMCACHED_COMMAND_PREPEND(arg0, arg1, arg2, arg3, arg4)