From 8af75043cfc1b3a1e997cb4311538ac744a3cc0b Mon Sep 17 00:00:00 2001 From: dormando Date: Mon, 3 Oct 2011 03:49:24 -0700 Subject: use item partitioned lock for as much as possible push cache_lock deeper into the abyss --- items.c | 64 +++++++++++++++++++++++++++++-------------------------------- items.h | 2 +- memcached.c | 6 ++++-- memcached.h | 2 ++ thread.c | 59 ++++++++++++++++++++++++++++++++++++++++---------------- 5 files changed, 80 insertions(+), 53 deletions(-) diff --git a/items.c b/items.c index 97efb9c..ea8852c 100644 --- a/items.c +++ b/items.c @@ -121,7 +121,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim it = search; it->refcount = 1; slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); - do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0)); + do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); /* Initialize the item block: */ it->slabs_clsid = 0; it->refcount = 0; @@ -151,7 +151,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim it = search; it->refcount = 1; slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); - do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0)); + do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); /* Initialize the item block: */ it->slabs_clsid = 0; it->refcount = 0; @@ -172,7 +172,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim search->time + TAIL_REPAIR_TIME < current_time) { itemstats[id].tailrepairs++; search->refcount = 0; - do_item_unlink(search, hash(ITEM_key(search), search->nkey, 0)); + do_item_unlink_nolock(search, hash(ITEM_key(search), search->nkey, 0)); } return NULL; } @@ -275,7 +275,6 @@ int do_item_link(item *it, const uint32_t hv) { assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0); it->it_flags |= ITEM_LINKED; it->time = current_time; - assoc_insert(it, hv); STATS_LOCK(); stats.curr_bytes += ITEM_ntotal(it); @@ -283,15 +282,34 @@ int do_item_link(item *it, const uint32_t hv) { stats.total_items += 1; STATS_UNLOCK(); + mutex_lock(&cache_lock); /* Allocate a new CAS ID on link. */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); - + assoc_insert(it, hv); item_link_q(it); + pthread_mutex_unlock(&cache_lock); return 1; } void do_item_unlink(item *it, const uint32_t hv) { + MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes); + if ((it->it_flags & ITEM_LINKED) != 0) { + it->it_flags &= ~ITEM_LINKED; + STATS_LOCK(); + stats.curr_bytes -= ITEM_ntotal(it); + stats.curr_items -= 1; + STATS_UNLOCK(); + mutex_lock(&cache_lock); + assoc_delete(ITEM_key(it), it->nkey, hv); + item_unlink_q(it); + pthread_mutex_unlock(&cache_lock); + if (it->refcount == 0) item_free(it); + } +} + +/* FIXME: Is it necessary to keep thsi copy/pasted code? */ +void do_item_unlink_nolock(item *it, const uint32_t hv) { MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes); if ((it->it_flags & ITEM_LINKED) != 0) { it->it_flags &= ~ITEM_LINKED; @@ -323,9 +341,11 @@ void do_item_update(item *it) { assert((it->it_flags & ITEM_SLABBED) == 0); if ((it->it_flags & ITEM_LINKED) != 0) { + mutex_lock(&cache_lock); item_unlink_q(it); it->time = current_time; item_link_q(it); + pthread_mutex_unlock(&cache_lock); } } } @@ -387,22 +407,6 @@ void do_item_stats(ADD_STAT add_stats, void *c) { char key_str[STAT_KEY_LEN]; char val_str[STAT_VAL_LEN]; int klen = 0, vlen = 0; - int search = 50; - while (search > 0 && - tails[i] != NULL && - ((settings.oldest_live != 0 && /* Item flushd */ - settings.oldest_live <= current_time && - tails[i]->time <= settings.oldest_live) || - (tails[i]->exptime != 0 && /* and not expired */ - tails[i]->exptime < current_time))) { - --search; - if (tails[i]->refcount == 0) { - do_item_unlink(tails[i], hash(ITEM_key(tails[i]), - tails[i]->nkey, 0)); - } else { - break; - } - } if (tails[i] == NULL) { /* We removed all of the items in this slab class */ continue; @@ -470,7 +474,9 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) { /** wrapper around assoc_find which does the lazy expiration logic */ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { + mutex_lock(&cache_lock); item *it = assoc_find(key, nkey, hv); + pthread_mutex_unlock(&cache_lock); int was_found = 0; if (settings.verbose > 2) { @@ -484,7 +490,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time && it->time <= settings.oldest_live) { - do_item_unlink(it, hv); /* MTSAFE - cache_lock held */ + do_item_unlink(it, hv); /* MTSAFE - item_lock held */ it = NULL; } @@ -494,7 +500,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { } if (it != NULL && it->exptime != 0 && it->exptime <= current_time) { - do_item_unlink(it, hv); /* MTSAFE - cache_lock held */ + do_item_unlink(it, hv); /* MTSAFE - item_lock held */ it = NULL; } @@ -524,16 +530,6 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime, return it; } -/** returns an item whether or not it's expired. */ -item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv) { - item *it = assoc_find(key, nkey, hv); - if (it) { - it->refcount++; - DEBUG_REFCNT(it, '+'); - } - return it; -} - /* expires items that are more recent than the oldest_live setting. */ void do_item_flush_expired(void) { int i; @@ -550,7 +546,7 @@ void do_item_flush_expired(void) { if (iter->time >= settings.oldest_live) { next = iter->next; if ((iter->it_flags & ITEM_SLABBED) == 0) { - do_item_unlink(iter, hash(ITEM_key(iter), iter->nkey, 0)); + do_item_unlink_nolock(iter, hash(ITEM_key(iter), iter->nkey, 0)); } } else { /* We've hit the first old item. Continue to the next queue. */ diff --git a/items.h b/items.h index acc1ef5..fc7b85e 100644 --- a/items.h +++ b/items.h @@ -8,6 +8,7 @@ bool item_size_ok(const size_t nkey, const int flags, const int nbytes); int do_item_link(item *it, const uint32_t hv); /** may fail if transgresses limits */ void do_item_unlink(item *it, const uint32_t hv); +void do_item_unlink_nolock(item *it, const uint32_t hv); void do_item_remove(item *it); void do_item_update(item *it); /** update LRU time to current and reposition */ int do_item_replace(item *it, item *new_it, const uint32_t hv); @@ -20,7 +21,6 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c); void do_item_flush_expired(void); item *do_item_get(const char *key, const size_t nkey, const uint32_t hv); -item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv); item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv); void item_stats_reset(void); extern pthread_mutex_t cache_lock; diff --git a/memcached.c b/memcached.c index d1be82f..2678b64 100644 --- a/memcached.c +++ b/memcached.c @@ -2310,7 +2310,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10); - new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */); + new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */); if (new_it == NULL) { /* SERVER_ERROR out of memory */ @@ -3080,7 +3080,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, res = strlen(buf); if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */ item *new_it; - new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); + new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); if (new_it == 0) { do_item_remove(it); return EOM; @@ -3095,7 +3095,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, } else { /* replace in-place */ /* When changing the value without replacing the item, we need to update the CAS on the existing item. */ + mutex_lock(&cache_lock); /* FIXME */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); + pthread_mutex_unlock(&cache_lock); memcpy(ITEM_data(it), buf, res); memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2); diff --git a/memcached.h b/memcached.h index b6c20f2..f626643 100644 --- a/memcached.h +++ b/memcached.h @@ -513,6 +513,8 @@ void item_stats_sizes(ADD_STAT add_stats, void *c); void item_unlink(item *it); void item_update(item *it); +void item_lock(uint32_t hv); +void item_unlock(uint32_t hv); void STATS_LOCK(void); void STATS_UNLOCK(void); void threadlocal_stats_reset(void); diff --git a/thread.c b/thread.c index efcc739..c201c5d 100644 --- a/thread.c +++ b/thread.c @@ -46,6 +46,10 @@ static pthread_mutex_t stats_lock; static CQ_ITEM *cqi_freelist; static pthread_mutex_t cqi_freelist_lock; +static pthread_mutex_t *item_locks; +/* TODO: Make this a function of the # of threads */ +#define ITEM_LOCKS 4000 + static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; /* @@ -64,6 +68,14 @@ static pthread_cond_t init_cond; static void thread_libevent_process(int fd, short which, void *arg); +void item_lock(uint32_t hv) { + mutex_lock(&item_locks[hv % ITEM_LOCKS]); +} + +void item_unlock(uint32_t hv) { + pthread_mutex_unlock(&item_locks[hv % ITEM_LOCKS]); +} + /* * Initializes a connection queue. */ @@ -342,9 +354,9 @@ item *item_get(const char *key, const size_t nkey) { item *it; uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); it = do_item_get(key, nkey, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return it; } @@ -352,9 +364,9 @@ item *item_touch(const char *key, size_t nkey, uint32_t exptime) { item *it; uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); it = do_item_touch(key, nkey, exptime, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return it; } @@ -366,9 +378,9 @@ int item_link(item *item) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_item_link(item, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -377,9 +389,12 @@ int item_link(item *item) { * needed. */ void item_remove(item *item) { - mutex_lock(&cache_lock); + uint32_t hv; + hv = hash(ITEM_key(item), item->nkey, 0); + + item_lock(hv); do_item_remove(item); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* @@ -397,18 +412,21 @@ int item_replace(item *old_it, item *new_it, const uint32_t hv) { void item_unlink(item *item) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); do_item_unlink(item, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* * Moves an item to the back of the LRU queue. */ void item_update(item *item) { - mutex_lock(&cache_lock); + uint32_t hv; + hv = hash(ITEM_key(item), item->nkey, 0); + + item_lock(hv); do_item_update(item); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* @@ -422,9 +440,9 @@ enum delta_result_type add_delta(conn *c, const char *key, uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -436,9 +454,9 @@ enum store_item_type store_item(item *item, int comm, conn* c) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_store_item(item, comm, c, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -616,6 +634,15 @@ void thread_init(int nthreads, struct event_base *main_base) { pthread_mutex_init(&cqi_freelist_lock, NULL); cqi_freelist = NULL; + item_locks = calloc(ITEM_LOCKS, sizeof(pthread_mutex_t)); + if (! item_locks) { + perror("Can't allocate item locks"); + exit(1); + } + for (i = 0; i < ITEM_LOCKS; i++) { + pthread_mutex_init(&item_locks[i], NULL); + } + threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); -- cgit v1.2.1