summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2015-01-01 23:48:28 -0800
committerdormando <dormando@rydia.net>2015-01-01 23:48:28 -0800
commit90593dca21bff56c17916497523273c0a5872a2a (patch)
tree728a3d3081d94872805fa0ccd72ac710d958c4c7
parent69d1c69997660e79755480b4e4085691ad45911e (diff)
downloadmemcached-90593dca21bff56c17916497523273c0a5872a2a.tar.gz
flush_all was not thread safe.
Unfortunately if you disable CAS, all items set in the same second as a flush_all will immediately expire. This is the old (2006ish) behavior. However, if CAS is enabled (as is the default), it will still be more or less exact. The locking issue is that if the LRU lock is held, you may not be able to modify an item if the item lock is also held. This means that some items may not be flushed if locking is done correctly. In the current code, it could lead to corruption as an item could be locked and in use while the expunging is happening.
-rw-r--r--items.c49
-rw-r--r--items.h1
-rw-r--r--memcached.c50
-rw-r--r--memcached.h2
-rwxr-xr-xt/flush-all.t30
-rw-r--r--thread.c9
6 files changed, 77 insertions, 64 deletions
diff --git a/items.c b/items.c
index 2f350df..5328a59 100644
--- a/items.c
+++ b/items.c
@@ -62,6 +62,19 @@ uint64_t get_cas_id(void) {
return next_id;
}
+static int is_flushed(item *it) {
+ rel_time_t oldest_live = settings.oldest_live;
+ uint64_t cas = ITEM_get_cas(it);
+ uint64_t oldest_cas = settings.oldest_cas;
+ if (oldest_live == 0 || oldest_live > current_time)
+ return 0;
+ if ((it->time <= oldest_live)
+ || (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {
+ return 1;
+ }
+ return 0;
+}
+
/* Enable this for reference-count debugging. */
#if 0
# define DEBUG_REFCNT(it,op) \
@@ -117,7 +130,6 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
item *search;
item *next_it;
void *hold_lock = NULL;
- rel_time_t oldest_live = settings.oldest_live;
search = tails[id];
/* We walk up *only* for locked items. Never searching for expired.
@@ -165,7 +177,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
/* Expired or flushed */
if ((search->exptime != 0 && search->exptime < current_time)
- || (search->time <= oldest_live && oldest_live <= current_time)) {
+ || is_flushed(search)) {
itemstats[id].reclaimed++;
if ((search->it_flags & ITEM_FETCHED) == 0) {
itemstats[id].expired_unfetched++;
@@ -617,8 +629,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
}
if (it != NULL) {
- if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
- it->time <= settings.oldest_live) {
+ if (is_flushed(it)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
@@ -653,33 +664,6 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
return it;
}
-/* expires items that are more recent than the oldest_live setting. */
-void do_item_flush_expired(void) {
- int i;
- item *iter, *next;
- if (settings.oldest_live == 0)
- return;
- for (i = 0; i < LARGEST_ID; i++) {
- /* The LRU is sorted in decreasing time order, and an item's timestamp
- * is never newer than its last access time, so we only need to walk
- * back until we hit an item older than the oldest_live time.
- * The oldest_live checking will auto-expire the remaining items.
- */
- for (iter = heads[i]; iter != NULL; iter = next) {
- /* iter->time of 0 are magic objects. */
- if (iter->time != 0 && iter->time >= settings.oldest_live) {
- next = iter->next;
- if ((iter->it_flags & ITEM_SLABBED) == 0) {
- do_item_unlink_nolock(iter, hash(ITEM_key(iter), iter->nkey));
- }
- } else {
- /* We've hit the first old item. Continue to the next queue. */
- break;
- }
- }
- }
-}
-
static void crawler_link_q(item *it) { /* item is the new tail */
item **head, **tail;
assert(it->slabs_clsid < LARGEST_ID);
@@ -785,9 +769,8 @@ static item *crawler_crawl_q(item *it) {
* main thread's values too much. Should rethink again.
*/
static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
- rel_time_t oldest_live = settings.oldest_live;
if ((search->exptime != 0 && search->exptime < current_time)
- || (search->time <= oldest_live && oldest_live <= current_time)) {
+ || is_flushed(search)) {
itemstats[i].crawler_reclaimed++;
if (settings.verbose > 1) {
diff --git a/items.h b/items.h
index 51ae39e..b6ae612 100644
--- a/items.h
+++ b/items.h
@@ -20,7 +20,6 @@ void do_item_stats(ADD_STAT add_stats, void *c);
void do_item_stats_totals(ADD_STAT add_stats, void *c);
/*@null@*/
void do_item_stats_sizes(ADD_STAT add_stats, void *c);
-void do_item_flush_expired(void);
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv);
item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv);
diff --git a/memcached.c b/memcached.c
index b673f08..a731ea4 100644
--- a/memcached.c
+++ b/memcached.c
@@ -217,6 +217,7 @@ static void settings_init(void) {
settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
settings.verbose = 0;
settings.oldest_live = 0;
+ settings.oldest_cas = 0; /* supplements accuracy of oldest_live */
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
settings.socketpath = NULL; /* by default, not using a unix socket */
settings.factor = 1.25;
@@ -2138,6 +2139,7 @@ static void process_bin_append_prepend(conn *c) {
static void process_bin_flush(conn *c) {
time_t exptime = 0;
protocol_binary_request_flush* req = binary_get_request(c);
+ rel_time_t new_oldest = 0;
if (!settings.flush_enabled) {
// flush_all is not allowed but we log it on stats
@@ -2150,11 +2152,17 @@ static void process_bin_flush(conn *c) {
}
if (exptime > 0) {
- settings.oldest_live = realtime(exptime) - 1;
+ new_oldest = realtime(exptime);
} else {
- settings.oldest_live = current_time - 1;
+ new_oldest = current_time;
+ }
+ if (settings.use_cas) {
+ settings.oldest_live = new_oldest - 1;
+ if (settings.oldest_live <= current_time)
+ settings.oldest_cas = get_cas_id();
+ } else {
+ settings.oldest_live = new_oldest;
}
- item_flush_expired();
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.flush_cmds++;
@@ -3469,6 +3477,7 @@ static void process_command(conn *c, char *command) {
} else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
time_t exptime = 0;
+ rel_time_t new_oldest = 0;
set_noreply_maybe(c, tokens, ntokens);
@@ -3482,17 +3491,12 @@ static void process_command(conn *c, char *command) {
return;
}
- if(ntokens == (c->noreply ? 3 : 2)) {
- settings.oldest_live = current_time - 1;
- item_flush_expired();
- out_string(c, "OK");
- return;
- }
-
- exptime = strtol(tokens[1].value, NULL, 10);
- if(errno == ERANGE) {
- out_string(c, "CLIENT_ERROR bad command line format");
- return;
+ if (ntokens != (c->noreply ? 3 : 2)) {
+ exptime = strtol(tokens[1].value, NULL, 10);
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
}
/*
@@ -3501,11 +3505,19 @@ static void process_command(conn *c, char *command) {
value. So we process exptime == 0 the same way we do when
no delay is given at all.
*/
- if (exptime > 0)
- settings.oldest_live = realtime(exptime) - 1;
- else /* exptime == 0 */
- settings.oldest_live = current_time - 1;
- item_flush_expired();
+ if (exptime > 0) {
+ new_oldest = realtime(exptime);
+ } else { /* exptime == 0 */
+ new_oldest = current_time;
+ }
+
+ if (settings.use_cas) {
+ settings.oldest_live = new_oldest - 1;
+ if (settings.oldest_live <= current_time)
+ settings.oldest_cas = get_cas_id();
+ } else {
+ settings.oldest_live = new_oldest;
+ }
out_string(c, "OK");
return;
diff --git a/memcached.h b/memcached.h
index 4d2d4b7..ee37c9e 100644
--- a/memcached.h
+++ b/memcached.h
@@ -299,6 +299,7 @@ struct settings {
char *inter;
int verbose;
rel_time_t oldest_live; /* ignore existing items older than this */
+ uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */
int evict_to_free;
char *socketpath; /* path to unix socket if using local socket */
int access; /* access mask (a la chmod) for unix domain socket */
@@ -565,7 +566,6 @@ bool conn_add_to_freelist(conn *c);
int is_listen_thread(void);
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
-void item_flush_expired(void);
item *item_get(const char *key, const size_t nkey);
item *item_touch(const char *key, const size_t nkey, uint32_t exptime);
int item_link(item *it);
diff --git a/t/flush-all.t b/t/flush-all.t
index 4f14389..6df6d34 100755
--- a/t/flush-all.t
+++ b/t/flush-all.t
@@ -1,7 +1,7 @@
#!/usr/bin/perl
use strict;
-use Test::More tests => 25;
+use Test::More tests => 26;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
@@ -67,3 +67,31 @@ print $sock "flush_all\r\n";
is(scalar <$sock>, "CLIENT_ERROR flush_all not allowed\r\n", "flush_all was not allowed");
mem_get_is($sock, "foo", "fooval2");
+# Test that disabling CAS makes flush_all less accurate.
+# Due to lock ordering issues we can no longer evict items newer than
+# oldest_live, so we rely on the CAS counter for an exact cliff. So disabling
+# CAS now means all items set in the same second will fail to set.
+$server = new_memcached('-C');
+$sock = $server->sock;
+
+my $failed_nocas = 0;
+# Running this 100,000 times failed the test a handful of times. 50 tries
+# should be enough.
+for (1..50) {
+ print $sock "flush_all 0\r\n";
+ my $foo = scalar <$sock>;
+ print $sock "set foo 0 0 3\r\nnew\r\n";
+ $foo = scalar <$sock>;
+ print $sock "get foo\r\n";
+ my $line = scalar <$sock>;
+ if ($line =~ /^VALUE/) {
+ $line = scalar <$sock>;
+ $line = scalar <$sock>;
+ print STDERR "Succeeded!!!\n";
+ next;
+ } elsif ($line =~ /^END/) {
+ $failed_nocas++;
+ next;
+ }
+}
+is($failed_nocas, 1, "failed to set value after flush with no CAS at least once");
diff --git a/thread.c b/thread.c
index 69e00a5..c449f71 100644
--- a/thread.c
+++ b/thread.c
@@ -598,15 +598,6 @@ enum store_item_type store_item(item *item, int comm, conn* c) {
}
/*
- * Flushes expired items after a flush_all call
- */
-void item_flush_expired() {
- mutex_lock(&cache_lock);
- do_item_flush_expired();
- mutex_unlock(&cache_lock);
-}
-
-/*
* Dumps part of the cache
*/
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {