summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/new_lru.txt2
-rw-r--r--items.c161
-rw-r--r--items.h4
-rw-r--r--memcached.h1
-rw-r--r--thread.c3
5 files changed, 163 insertions, 8 deletions
diff --git a/doc/new_lru.txt b/doc/new_lru.txt
index 38f6ff3..a944671 100644
--- a/doc/new_lru.txt
+++ b/doc/new_lru.txt
@@ -18,7 +18,7 @@ Now, enabling `-o lru_maintainer` changes all of the behavior below:
is uncapped (by default, as of this writing).
* Items flow from HOT/WARM into COLD.
* A background thread exists which shuffles items between/within the LRU's as
- limits are reached.
+ limits are reached. This includes moves from COLD to WARM.
* The background thread can also control the lru_crawler, if enabled.
The primary goal is to better protect active items from "scanning". Items
diff --git a/items.c b/items.c
index 2f0a7a7..1c4685e 100644
--- a/items.c
+++ b/items.c
@@ -1,5 +1,6 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "memcached.h"
+#include "bipbuffer.h"
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/resource.h>
@@ -79,6 +80,28 @@ void do_item_stats_add_crawl(const int i, const uint64_t reclaimed,
static int lru_pull_tail(const int orig_id, const int cur_lru,
const uint64_t total_bytes, const uint8_t flags, const rel_time_t max_age);
+typedef struct _lru_bump_buf {
+ struct _lru_bump_buf *prev;
+ struct _lru_bump_buf *next;
+ pthread_mutex_t mutex;
+ bipbuf_t *buf;
+ uint64_t dropped;
+} lru_bump_buf;
+
+typedef struct {
+ item *it;
+ uint32_t hv;
+} lru_bump_entry;
+
+static lru_bump_buf *bump_buf_head = NULL;
+static lru_bump_buf *bump_buf_tail = NULL;
+static pthread_mutex_t bump_buf_lock = PTHREAD_MUTEX_INITIALIZER;
+/* TODO: tunable? Need bench results */
+#define LRU_BUMP_BUF_SIZE 8192
+
+static bool lru_bump_async(lru_bump_buf *b, item *it, uint32_t hv);
+static uint64_t lru_total_bumps_dropped(void);
+
/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
uint64_t get_cas_id(void) {
@@ -589,6 +612,8 @@ void item_stats_totals(ADD_STAT add_stats, void *c) {
(unsigned long long)totals.moves_within_lru);
APPEND_STAT("direct_reclaims", "%llu",
(unsigned long long)totals.direct_reclaims);
+ APPEND_STAT("lru_bumps_dropped", "%llu",
+ (unsigned long long)lru_total_bumps_dropped());
}
}
@@ -860,14 +885,24 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
* afterward.
* FETCHED tells if an item has ever been active.
*/
- if ((it->it_flags & ITEM_ACTIVE) == 0) {
- if ((it->it_flags & ITEM_FETCHED) == 0) {
- it->it_flags |= ITEM_FETCHED;
- } else {
- it->it_flags |= ITEM_ACTIVE;
+ if (settings.lru_maintainer_thread) {
+ if ((it->it_flags & ITEM_ACTIVE) == 0) {
+ if ((it->it_flags & ITEM_FETCHED) == 0) {
+ it->it_flags |= ITEM_FETCHED;
+ } else {
+ it->it_flags |= ITEM_ACTIVE;
+ if (ITEM_lruid(it) != COLD_LRU) {
+ do_item_update(it); // bump LA time
+ } else if (!lru_bump_async(c->thread->lru_bump_buf, it, hv)) {
+ // add flag before async bump to avoid race.
+ it->it_flags &= ~ITEM_ACTIVE;
+ }
+ }
}
+ } else {
+ it->it_flags |= ITEM_FETCHED;
+ do_item_update(it);
}
- do_item_update(it);
}
DEBUG_REFCNT(it, '+');
}
@@ -1060,6 +1095,113 @@ static int lru_pull_tail(const int orig_id, const int cur_lru,
return removed;
}
+
+/* TODO: Third place this code needs to be deduped */
+static void lru_bump_buf_link_q(lru_bump_buf *b) {
+ pthread_mutex_lock(&bump_buf_lock);
+ assert(b != bump_buf_head);
+
+ b->prev = 0;
+ b->next = bump_buf_head;
+ if (b->next) b->next->prev = b;
+ bump_buf_head = b;
+ if (bump_buf_tail == 0) bump_buf_tail = b;
+ pthread_mutex_unlock(&bump_buf_lock);
+ return;
+}
+
+void *item_lru_bump_buf_create(void) {
+ lru_bump_buf *b = calloc(1, sizeof(lru_bump_buf));
+ if (b == NULL) {
+ return NULL;
+ }
+
+ b->buf = bipbuf_new(sizeof(lru_bump_entry) * LRU_BUMP_BUF_SIZE);
+ if (b->buf == NULL) {
+ free(b);
+ return NULL;
+ }
+
+ pthread_mutex_init(&b->mutex, NULL);
+
+ lru_bump_buf_link_q(b);
+ return b;
+}
+
+static bool lru_bump_async(lru_bump_buf *b, item *it, uint32_t hv) {
+ bool ret = true;
+ refcount_incr(it);
+ pthread_mutex_lock(&b->mutex);
+ lru_bump_entry *be = (lru_bump_entry *) bipbuf_request(b->buf, sizeof(lru_bump_entry));
+ if (be != NULL) {
+ be->it = it;
+ be->hv = hv;
+ if (bipbuf_push(b->buf, sizeof(lru_bump_entry)) == 0) {
+ ret = false;
+ b->dropped++;
+ }
+ } else {
+ ret = false;
+ b->dropped++;
+ }
+ pthread_mutex_unlock(&b->mutex);
+ return ret;
+}
+
+/* TODO: Might be worth a micro-optimization of having bump buffers link
+ * themselves back into the central queue when queue goes from zero to
+ * non-zero, then remove from list if zero more than N times.
+ * If very few hits on cold this would avoid extra memory barriers from LRU
+ * maintainer thread. If many hits, they'll just stay in the list.
+ */
+static bool lru_maintainer_bumps(void) {
+ lru_bump_buf *b;
+ lru_bump_entry *be;
+ unsigned int size;
+ unsigned int todo;
+ bool bumped = false;
+ pthread_mutex_lock(&bump_buf_lock);
+ for (b = bump_buf_head; b != NULL; b=b->next) {
+ pthread_mutex_lock(&b->mutex);
+ be = (lru_bump_entry *) bipbuf_peek_all(b->buf, &size);
+ pthread_mutex_unlock(&b->mutex);
+
+ if (be == NULL) {
+ continue;
+ }
+ todo = size;
+ bumped = true;
+
+ while (todo) {
+ item_lock(be->hv);
+ do_item_update(be->it);
+ do_item_remove(be->it);
+ item_unlock(be->hv);
+ be++;
+ todo -= sizeof(lru_bump_entry);
+ }
+
+ pthread_mutex_lock(&b->mutex);
+ be = (lru_bump_entry *) bipbuf_poll(b->buf, size);
+ pthread_mutex_unlock(&b->mutex);
+ }
+ pthread_mutex_unlock(&bump_buf_lock);
+ return bumped;
+}
+
+static uint64_t lru_total_bumps_dropped(void) {
+ uint64_t total = 0;
+ lru_bump_buf *b;
+ pthread_mutex_lock(&bump_buf_lock);
+ for (b = bump_buf_head; b != NULL; b=b->next) {
+ pthread_mutex_lock(&b->mutex);
+ total += b->dropped;
+ pthread_mutex_unlock(&b->mutex);
+ }
+ pthread_mutex_unlock(&bump_buf_lock);
+ return total;
+}
+
/* Loop up to N times:
* If too many items are in HOT_LRU, push to COLD_LRU
* If too many items are in WARM_LRU, push to COLD_LRU
@@ -1241,6 +1383,7 @@ static void *lru_maintainer_thread(void *arg) {
STATS_LOCK();
stats.lru_maintainer_juggles++;
STATS_UNLOCK();
+
/* Each slab class gets its own sleep to avoid hammering locks */
for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
next_juggles[i] = next_juggles[i] > last_sleep ? next_juggles[i] - last_sleep : 0;
@@ -1264,6 +1407,12 @@ static void *lru_maintainer_thread(void *arg) {
}
next_juggles[i] = backoff_juggles[i];
}
+
+ /* Minimize the sleep if we had async LRU bumps to process */
+ if (lru_maintainer_bumps() && to_sleep > 1000) {
+ to_sleep = 1000;
+ }
+
/* Once per second at most */
if (settings.lru_crawler && last_crawler_check != current_time) {
lru_maintainer_crawler_check(&cdata, l);
diff --git a/items.h b/items.h
index 1487801..fd1d485 100644
--- a/items.h
+++ b/items.h
@@ -27,6 +27,8 @@ void do_item_linktail_q(item *it);
void do_item_unlinktail_q(item *it);
item *do_item_crawl_q(item *it);
+void *item_lru_bump_buf_create(void);
+
/*@null@*/
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
void item_stats(ADD_STAT add_stats, void *c);
@@ -52,3 +54,5 @@ int stop_lru_maintainer_thread(void);
int init_lru_maintainer(void);
void lru_maintainer_pause(void);
void lru_maintainer_resume(void);
+
+void *lru_bump_buf_create(void);
diff --git a/memcached.h b/memcached.h
index 4698f64..82ecba8 100644
--- a/memcached.h
+++ b/memcached.h
@@ -474,6 +474,7 @@ typedef struct {
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
logger *l; /* logger buffer */
+ void *lru_bump_buf; /* async LRU bump buffer */
} LIBEVENT_THREAD;
/**
diff --git a/thread.c b/thread.c
index baae274..45b82ac 100644
--- a/thread.c
+++ b/thread.c
@@ -341,7 +341,8 @@ static void *worker_libevent(void *arg) {
* all threads have finished initializing.
*/
me->l = logger_create();
- if (me->l == NULL) {
+ me->lru_bump_buf = item_lru_bump_buf_create();
+ if (me->l == NULL || me->lru_bump_buf == NULL) {
abort();
}