diff options
-rw-r--r-- | doc/new_lru.txt | 2 | ||||
-rw-r--r-- | items.c | 161 | ||||
-rw-r--r-- | items.h | 4 | ||||
-rw-r--r-- | memcached.h | 1 | ||||
-rw-r--r-- | thread.c | 3 |
5 files changed, 163 insertions, 8 deletions
diff --git a/doc/new_lru.txt b/doc/new_lru.txt index 38f6ff3..a944671 100644 --- a/doc/new_lru.txt +++ b/doc/new_lru.txt @@ -18,7 +18,7 @@ Now, enabling `-o lru_maintainer` changes all of the behavior below: is uncapped (by default, as of this writing). * Items flow from HOT/WARM into COLD. * A background thread exists which shuffles items between/within the LRU's as - limits are reached. + limits are reached. This includes moves from COLD to WARM. * The background thread can also control the lru_crawler, if enabled. The primary goal is to better protect active items from "scanning". Items @@ -1,5 +1,6 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ #include "memcached.h" +#include "bipbuffer.h" #include <sys/stat.h> #include <sys/socket.h> #include <sys/resource.h> @@ -79,6 +80,28 @@ void do_item_stats_add_crawl(const int i, const uint64_t reclaimed, static int lru_pull_tail(const int orig_id, const int cur_lru, const uint64_t total_bytes, const uint8_t flags, const rel_time_t max_age); +typedef struct _lru_bump_buf { + struct _lru_bump_buf *prev; + struct _lru_bump_buf *next; + pthread_mutex_t mutex; + bipbuf_t *buf; + uint64_t dropped; +} lru_bump_buf; + +typedef struct { + item *it; + uint32_t hv; +} lru_bump_entry; + +static lru_bump_buf *bump_buf_head = NULL; +static lru_bump_buf *bump_buf_tail = NULL; +static pthread_mutex_t bump_buf_lock = PTHREAD_MUTEX_INITIALIZER; +/* TODO: tunable? Need bench results */ +#define LRU_BUMP_BUF_SIZE 8192 + +static bool lru_bump_async(lru_bump_buf *b, item *it, uint32_t hv); +static uint64_t lru_total_bumps_dropped(void); + /* Get the next CAS id for a new item. */ /* TODO: refactor some atomics for this. */ uint64_t get_cas_id(void) { @@ -589,6 +612,8 @@ void item_stats_totals(ADD_STAT add_stats, void *c) { (unsigned long long)totals.moves_within_lru); APPEND_STAT("direct_reclaims", "%llu", (unsigned long long)totals.direct_reclaims); + APPEND_STAT("lru_bumps_dropped", "%llu", + (unsigned long long)lru_total_bumps_dropped()); } } @@ -860,14 +885,24 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c * afterward. * FETCHED tells if an item has ever been active. */ - if ((it->it_flags & ITEM_ACTIVE) == 0) { - if ((it->it_flags & ITEM_FETCHED) == 0) { - it->it_flags |= ITEM_FETCHED; - } else { - it->it_flags |= ITEM_ACTIVE; + if (settings.lru_maintainer_thread) { + if ((it->it_flags & ITEM_ACTIVE) == 0) { + if ((it->it_flags & ITEM_FETCHED) == 0) { + it->it_flags |= ITEM_FETCHED; + } else { + it->it_flags |= ITEM_ACTIVE; + if (ITEM_lruid(it) != COLD_LRU) { + do_item_update(it); // bump LA time + } else if (!lru_bump_async(c->thread->lru_bump_buf, it, hv)) { + // add flag before async bump to avoid race. + it->it_flags &= ~ITEM_ACTIVE; + } + } } + } else { + it->it_flags |= ITEM_FETCHED; + do_item_update(it); } - do_item_update(it); } DEBUG_REFCNT(it, '+'); } @@ -1060,6 +1095,113 @@ static int lru_pull_tail(const int orig_id, const int cur_lru, return removed; } + +/* TODO: Third place this code needs to be deduped */ +static void lru_bump_buf_link_q(lru_bump_buf *b) { + pthread_mutex_lock(&bump_buf_lock); + assert(b != bump_buf_head); + + b->prev = 0; + b->next = bump_buf_head; + if (b->next) b->next->prev = b; + bump_buf_head = b; + if (bump_buf_tail == 0) bump_buf_tail = b; + pthread_mutex_unlock(&bump_buf_lock); + return; +} + +void *item_lru_bump_buf_create(void) { + lru_bump_buf *b = calloc(1, sizeof(lru_bump_buf)); + if (b == NULL) { + return NULL; + } + + b->buf = bipbuf_new(sizeof(lru_bump_entry) * LRU_BUMP_BUF_SIZE); + if (b->buf == NULL) { + free(b); + return NULL; + } + + pthread_mutex_init(&b->mutex, NULL); + + lru_bump_buf_link_q(b); + return b; +} + +static bool lru_bump_async(lru_bump_buf *b, item *it, uint32_t hv) { + bool ret = true; + refcount_incr(it); + pthread_mutex_lock(&b->mutex); + lru_bump_entry *be = (lru_bump_entry *) bipbuf_request(b->buf, sizeof(lru_bump_entry)); + if (be != NULL) { + be->it = it; + be->hv = hv; + if (bipbuf_push(b->buf, sizeof(lru_bump_entry)) == 0) { + ret = false; + b->dropped++; + } + } else { + ret = false; + b->dropped++; + } + pthread_mutex_unlock(&b->mutex); + return ret; +} + +/* TODO: Might be worth a micro-optimization of having bump buffers link + * themselves back into the central queue when queue goes from zero to + * non-zero, then remove from list if zero more than N times. + * If very few hits on cold this would avoid extra memory barriers from LRU + * maintainer thread. If many hits, they'll just stay in the list. + */ +static bool lru_maintainer_bumps(void) { + lru_bump_buf *b; + lru_bump_entry *be; + unsigned int size; + unsigned int todo; + bool bumped = false; + pthread_mutex_lock(&bump_buf_lock); + for (b = bump_buf_head; b != NULL; b=b->next) { + pthread_mutex_lock(&b->mutex); + be = (lru_bump_entry *) bipbuf_peek_all(b->buf, &size); + pthread_mutex_unlock(&b->mutex); + + if (be == NULL) { + continue; + } + todo = size; + bumped = true; + + while (todo) { + item_lock(be->hv); + do_item_update(be->it); + do_item_remove(be->it); + item_unlock(be->hv); + be++; + todo -= sizeof(lru_bump_entry); + } + + pthread_mutex_lock(&b->mutex); + be = (lru_bump_entry *) bipbuf_poll(b->buf, size); + pthread_mutex_unlock(&b->mutex); + } + pthread_mutex_unlock(&bump_buf_lock); + return bumped; +} + +static uint64_t lru_total_bumps_dropped(void) { + uint64_t total = 0; + lru_bump_buf *b; + pthread_mutex_lock(&bump_buf_lock); + for (b = bump_buf_head; b != NULL; b=b->next) { + pthread_mutex_lock(&b->mutex); + total += b->dropped; + pthread_mutex_unlock(&b->mutex); + } + pthread_mutex_unlock(&bump_buf_lock); + return total; +} + /* Loop up to N times: * If too many items are in HOT_LRU, push to COLD_LRU * If too many items are in WARM_LRU, push to COLD_LRU @@ -1241,6 +1383,7 @@ static void *lru_maintainer_thread(void *arg) { STATS_LOCK(); stats.lru_maintainer_juggles++; STATS_UNLOCK(); + /* Each slab class gets its own sleep to avoid hammering locks */ for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) { next_juggles[i] = next_juggles[i] > last_sleep ? next_juggles[i] - last_sleep : 0; @@ -1264,6 +1407,12 @@ static void *lru_maintainer_thread(void *arg) { } next_juggles[i] = backoff_juggles[i]; } + + /* Minimize the sleep if we had async LRU bumps to process */ + if (lru_maintainer_bumps() && to_sleep > 1000) { + to_sleep = 1000; + } + /* Once per second at most */ if (settings.lru_crawler && last_crawler_check != current_time) { lru_maintainer_crawler_check(&cdata, l); @@ -27,6 +27,8 @@ void do_item_linktail_q(item *it); void do_item_unlinktail_q(item *it); item *do_item_crawl_q(item *it); +void *item_lru_bump_buf_create(void); + /*@null@*/ char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes); void item_stats(ADD_STAT add_stats, void *c); @@ -52,3 +54,5 @@ int stop_lru_maintainer_thread(void); int init_lru_maintainer(void); void lru_maintainer_pause(void); void lru_maintainer_resume(void); + +void *lru_bump_buf_create(void); diff --git a/memcached.h b/memcached.h index 4698f64..82ecba8 100644 --- a/memcached.h +++ b/memcached.h @@ -474,6 +474,7 @@ typedef struct { struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ logger *l; /* logger buffer */ + void *lru_bump_buf; /* async LRU bump buffer */ } LIBEVENT_THREAD; /** @@ -341,7 +341,8 @@ static void *worker_libevent(void *arg) { * all threads have finished initializing. */ me->l = logger_create(); - if (me->l == NULL) { + me->lru_bump_buf = item_lru_bump_buf_create(); + if (me->l == NULL || me->lru_bump_buf == NULL) { abort(); } |