summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2011-10-03 03:49:24 -0700
committerdormando <dormando@rydia.net>2011-10-05 00:34:18 -0700
commit8af75043cfc1b3a1e997cb4311538ac744a3cc0b (patch)
tree1299c2539a8fa39e183cfa6aa4af3ac4ba38e6a1
parent54f11de1e1e28418c22f0fceaf7237ea731ee342 (diff)
downloadmemcached-8af75043cfc1b3a1e997cb4311538ac744a3cc0b.tar.gz
use item partitioned lock for as much as possible
push cache_lock deeper into the abyss
-rw-r--r--items.c64
-rw-r--r--items.h2
-rw-r--r--memcached.c6
-rw-r--r--memcached.h2
-rw-r--r--thread.c59
5 files changed, 80 insertions, 53 deletions
diff --git a/items.c b/items.c
index 97efb9c..ea8852c 100644
--- a/items.c
+++ b/items.c
@@ -121,7 +121,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
it = search;
it->refcount = 1;
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
- do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0));
+ do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
/* Initialize the item block: */
it->slabs_clsid = 0;
it->refcount = 0;
@@ -151,7 +151,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
it = search;
it->refcount = 1;
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
- do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0));
+ do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
/* Initialize the item block: */
it->slabs_clsid = 0;
it->refcount = 0;
@@ -172,7 +172,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
search->time + TAIL_REPAIR_TIME < current_time) {
itemstats[id].tailrepairs++;
search->refcount = 0;
- do_item_unlink(search, hash(ITEM_key(search), search->nkey, 0));
+ do_item_unlink_nolock(search, hash(ITEM_key(search), search->nkey, 0));
}
return NULL;
}
@@ -275,7 +275,6 @@ int do_item_link(item *it, const uint32_t hv) {
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
- assoc_insert(it, hv);
STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
@@ -283,10 +282,12 @@ int do_item_link(item *it, const uint32_t hv) {
stats.total_items += 1;
STATS_UNLOCK();
+ mutex_lock(&cache_lock);
/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
-
+ assoc_insert(it, hv);
item_link_q(it);
+ pthread_mutex_unlock(&cache_lock);
return 1;
}
@@ -299,6 +300,23 @@ void do_item_unlink(item *it, const uint32_t hv) {
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
STATS_UNLOCK();
+ mutex_lock(&cache_lock);
+ assoc_delete(ITEM_key(it), it->nkey, hv);
+ item_unlink_q(it);
+ pthread_mutex_unlock(&cache_lock);
+ if (it->refcount == 0) item_free(it);
+ }
+}
+
+/* FIXME: Is it necessary to keep thsi copy/pasted code? */
+void do_item_unlink_nolock(item *it, const uint32_t hv) {
+ MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
+ if ((it->it_flags & ITEM_LINKED) != 0) {
+ it->it_flags &= ~ITEM_LINKED;
+ STATS_LOCK();
+ stats.curr_bytes -= ITEM_ntotal(it);
+ stats.curr_items -= 1;
+ STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey, hv);
item_unlink_q(it);
if (it->refcount == 0) item_free(it);
@@ -323,9 +341,11 @@ void do_item_update(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
if ((it->it_flags & ITEM_LINKED) != 0) {
+ mutex_lock(&cache_lock);
item_unlink_q(it);
it->time = current_time;
item_link_q(it);
+ pthread_mutex_unlock(&cache_lock);
}
}
}
@@ -387,22 +407,6 @@ void do_item_stats(ADD_STAT add_stats, void *c) {
char key_str[STAT_KEY_LEN];
char val_str[STAT_VAL_LEN];
int klen = 0, vlen = 0;
- int search = 50;
- while (search > 0 &&
- tails[i] != NULL &&
- ((settings.oldest_live != 0 && /* Item flushd */
- settings.oldest_live <= current_time &&
- tails[i]->time <= settings.oldest_live) ||
- (tails[i]->exptime != 0 && /* and not expired */
- tails[i]->exptime < current_time))) {
- --search;
- if (tails[i]->refcount == 0) {
- do_item_unlink(tails[i], hash(ITEM_key(tails[i]),
- tails[i]->nkey, 0));
- } else {
- break;
- }
- }
if (tails[i] == NULL) {
/* We removed all of the items in this slab class */
continue;
@@ -470,7 +474,9 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) {
/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
+ mutex_lock(&cache_lock);
item *it = assoc_find(key, nkey, hv);
+ pthread_mutex_unlock(&cache_lock);
int was_found = 0;
if (settings.verbose > 2) {
@@ -484,7 +490,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
- do_item_unlink(it, hv); /* MTSAFE - cache_lock held */
+ do_item_unlink(it, hv); /* MTSAFE - item_lock held */
it = NULL;
}
@@ -494,7 +500,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
}
if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
- do_item_unlink(it, hv); /* MTSAFE - cache_lock held */
+ do_item_unlink(it, hv); /* MTSAFE - item_lock held */
it = NULL;
}
@@ -524,16 +530,6 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
return it;
}
-/** returns an item whether or not it's expired. */
-item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv) {
- item *it = assoc_find(key, nkey, hv);
- if (it) {
- it->refcount++;
- DEBUG_REFCNT(it, '+');
- }
- return it;
-}
-
/* expires items that are more recent than the oldest_live setting. */
void do_item_flush_expired(void) {
int i;
@@ -550,7 +546,7 @@ void do_item_flush_expired(void) {
if (iter->time >= settings.oldest_live) {
next = iter->next;
if ((iter->it_flags & ITEM_SLABBED) == 0) {
- do_item_unlink(iter, hash(ITEM_key(iter), iter->nkey, 0));
+ do_item_unlink_nolock(iter, hash(ITEM_key(iter), iter->nkey, 0));
}
} else {
/* We've hit the first old item. Continue to the next queue. */
diff --git a/items.h b/items.h
index acc1ef5..fc7b85e 100644
--- a/items.h
+++ b/items.h
@@ -8,6 +8,7 @@ bool item_size_ok(const size_t nkey, const int flags, const int nbytes);
int do_item_link(item *it, const uint32_t hv); /** may fail if transgresses limits */
void do_item_unlink(item *it, const uint32_t hv);
+void do_item_unlink_nolock(item *it, const uint32_t hv);
void do_item_remove(item *it);
void do_item_update(item *it); /** update LRU time to current and reposition */
int do_item_replace(item *it, item *new_it, const uint32_t hv);
@@ -20,7 +21,6 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c);
void do_item_flush_expired(void);
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv);
-item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv);
item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv);
void item_stats_reset(void);
extern pthread_mutex_t cache_lock;
diff --git a/memcached.c b/memcached.c
index d1be82f..2678b64 100644
--- a/memcached.c
+++ b/memcached.c
@@ -2310,7 +2310,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
- new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
+ new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
if (new_it == NULL) {
/* SERVER_ERROR out of memory */
@@ -3080,7 +3080,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
res = strlen(buf);
if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */
item *new_it;
- new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+ new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
if (new_it == 0) {
do_item_remove(it);
return EOM;
@@ -3095,7 +3095,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
} else { /* replace in-place */
/* When changing the value without replacing the item, we
need to update the CAS on the existing item. */
+ mutex_lock(&cache_lock); /* FIXME */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
+ pthread_mutex_unlock(&cache_lock);
memcpy(ITEM_data(it), buf, res);
memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
diff --git a/memcached.h b/memcached.h
index b6c20f2..f626643 100644
--- a/memcached.h
+++ b/memcached.h
@@ -513,6 +513,8 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
void item_unlink(item *it);
void item_update(item *it);
+void item_lock(uint32_t hv);
+void item_unlock(uint32_t hv);
void STATS_LOCK(void);
void STATS_UNLOCK(void);
void threadlocal_stats_reset(void);
diff --git a/thread.c b/thread.c
index efcc739..c201c5d 100644
--- a/thread.c
+++ b/thread.c
@@ -46,6 +46,10 @@ static pthread_mutex_t stats_lock;
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
+static pthread_mutex_t *item_locks;
+/* TODO: Make this a function of the # of threads */
+#define ITEM_LOCKS 4000
+
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
/*
@@ -64,6 +68,14 @@ static pthread_cond_t init_cond;
static void thread_libevent_process(int fd, short which, void *arg);
+void item_lock(uint32_t hv) {
+ mutex_lock(&item_locks[hv % ITEM_LOCKS]);
+}
+
+void item_unlock(uint32_t hv) {
+ pthread_mutex_unlock(&item_locks[hv % ITEM_LOCKS]);
+}
+
/*
* Initializes a connection queue.
*/
@@ -342,9 +354,9 @@ item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
it = do_item_get(key, nkey, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return it;
}
@@ -352,9 +364,9 @@ item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
it = do_item_touch(key, nkey, exptime, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return it;
}
@@ -366,9 +378,9 @@ int item_link(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_item_link(item, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -377,9 +389,12 @@ int item_link(item *item) {
* needed.
*/
void item_remove(item *item) {
- mutex_lock(&cache_lock);
+ uint32_t hv;
+ hv = hash(ITEM_key(item), item->nkey, 0);
+
+ item_lock(hv);
do_item_remove(item);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
@@ -397,18 +412,21 @@ int item_replace(item *old_it, item *new_it, const uint32_t hv) {
void item_unlink(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
do_item_unlink(item, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
* Moves an item to the back of the LRU queue.
*/
void item_update(item *item) {
- mutex_lock(&cache_lock);
+ uint32_t hv;
+ hv = hash(ITEM_key(item), item->nkey, 0);
+
+ item_lock(hv);
do_item_update(item);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
@@ -422,9 +440,9 @@ enum delta_result_type add_delta(conn *c, const char *key,
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -436,9 +454,9 @@ enum store_item_type store_item(item *item, int comm, conn* c) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_store_item(item, comm, c, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -616,6 +634,15 @@ void thread_init(int nthreads, struct event_base *main_base) {
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
+ item_locks = calloc(ITEM_LOCKS, sizeof(pthread_mutex_t));
+ if (! item_locks) {
+ perror("Can't allocate item locks");
+ exit(1);
+ }
+ for (i = 0; i < ITEM_LOCKS; i++) {
+ pthread_mutex_init(&item_locks[i], NULL);
+ }
+
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");