summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2012-08-18 12:54:01 -0700
committerdormando <dormando@rydia.net>2012-09-03 00:35:54 -0700
commit1c94e12c3d7615e4859eddde88ed7e53cd127d32 (patch)
treefdb6063550b11adf9620de885a85a5e47db527a8
parent2db1bf462c67c66323850272acd0f2b60d6e62ec (diff)
downloadmemcached-1c94e12c3d7615e4859eddde88ed7e53cd127d32.tar.gz
item locks now lock hash table buckets
expansion requires switching to a global lock temporarily, so all buckets have a covered read lock. slab rebalancer is paused during hash table expansion. internal item "trylocks" are always issued, and tracked as the hash power variable can change out from under it.
-rw-r--r--assoc.c30
-rw-r--r--assoc.h2
-rw-r--r--items.c15
-rw-r--r--memcached.h12
-rw-r--r--slabs.c16
-rw-r--r--slabs.h3
-rw-r--r--thread.c131
7 files changed, 178 insertions, 31 deletions
diff --git a/assoc.c b/assoc.c
index bcaf6f2..0edce2b 100644
--- a/assoc.c
+++ b/assoc.c
@@ -32,7 +32,7 @@ typedef unsigned long int ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
/* how many powers of 2's worth of buckets we use */
-static unsigned int hashpower = HASHPOWER_DEFAULT;
+unsigned int hashpower = HASHPOWER_DEFAULT;
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)
@@ -51,6 +51,7 @@ static unsigned int hash_items = 0;
/* Flag: Are we in the middle of expanding now? */
static bool expanding = false;
+static bool started_expanding = false;
/*
* During expansion we migrate values with bucket granularity; this is how
@@ -136,13 +137,19 @@ static void assoc_expand(void) {
stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
stats.hash_is_expanding = 1;
STATS_UNLOCK();
- pthread_cond_signal(&maintenance_cond);
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}
+static void assoc_start_expand(void) {
+ if (started_expanding)
+ return;
+ started_expanding = true;
+ pthread_cond_signal(&maintenance_cond);
+}
+
/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
@@ -161,7 +168,7 @@ int assoc_insert(item *it, const uint32_t hv) {
hash_items++;
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
- assoc_expand();
+ assoc_start_expand();
}
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
@@ -201,6 +208,7 @@ static void *assoc_maintenance_thread(void *arg) {
/* Lock the cache, and bulk move multiple buckets to the new
* hash table. */
+ item_lock_global();
mutex_lock(&cache_lock);
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
@@ -230,12 +238,24 @@ static void *assoc_maintenance_thread(void *arg) {
}
}
+ mutex_unlock(&cache_lock);
+ item_unlock_global();
+
if (!expanding) {
+ /* finished expanding. tell all threads to use fine-grained locks */
+ switch_item_lock_type(ITEM_LOCK_GRANULAR);
+ started_expanding = false;
+ slabs_rebalancer_resume();
/* We are done expanding.. just wait for next invocation */
pthread_cond_wait(&maintenance_cond, &cache_lock);
+ /* Before doing anything, tell threads to use a global lock */
+ mutex_unlock(&cache_lock);
+ slabs_rebalancer_pause();
+ switch_item_lock_type(ITEM_LOCK_GLOBAL);
+ mutex_lock(&cache_lock);
+ assoc_expand();
+ mutex_unlock(&cache_lock);
}
-
- mutex_unlock(&cache_lock);
}
return NULL;
}
diff --git a/assoc.h b/assoc.h
index ccdfdd5..fbea1d3 100644
--- a/assoc.h
+++ b/assoc.h
@@ -6,4 +6,4 @@ void assoc_delete(const char *key, const size_t nkey, const uint32_t hv);
void do_assoc_move_next_bucket(void);
int start_assoc_maintenance_thread(void);
void stop_assoc_maintenance_thread(void);
-
+extern unsigned int hashpower;
diff --git a/items.c b/items.c
index 3b26265..d8f4188 100644
--- a/items.c
+++ b/items.c
@@ -105,6 +105,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
int tries = 5;
int tried_alloc = 0;
item *search;
+ void *hold_lock = NULL;
rel_time_t oldest_live = settings.oldest_live;
search = tails[id];
@@ -115,7 +116,8 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
/* Attempt to hash item lock the "search" item. If locked, no
* other callers can incr the refcount
*/
- if (hv != cur_hv && item_trylock(hv) != 0)
+ /* FIXME: I think we need to mask the hv here for comparison? */
+ if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL)
continue;
/* Now see if the item is refcount locked */
if (refcount_incr(&search->refcount) != 2) {
@@ -128,7 +130,8 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
search->refcount = 1;
do_item_unlink_nolock(search, hv);
}
- item_unlock(hv);
+ if (hold_lock)
+ item_trylock_unlock(hold_lock);
continue;
}
@@ -187,7 +190,9 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags,
}
refcount_decr(&search->refcount);
- item_unlock(hv);
+ /* If hash values were equal, we don't grab a second lock */
+ if (hold_lock)
+ item_trylock_unlock(hold_lock);
break;
}
@@ -506,7 +511,7 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) {
/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
- mutex_lock(&cache_lock);
+ //mutex_lock(&cache_lock);
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(&it->refcount);
@@ -520,7 +525,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
it = NULL;
}
}
- mutex_unlock(&cache_lock);
+ //mutex_unlock(&cache_lock);
int was_found = 0;
if (settings.verbose > 2) {
diff --git a/memcached.h b/memcached.h
index 0517d4a..0594ab9 100644
--- a/memcached.h
+++ b/memcached.h
@@ -180,6 +180,11 @@ enum network_transport {
udp_transport
};
+enum item_lock_types {
+ ITEM_LOCK_GRANULAR = 0,
+ ITEM_LOCK_GLOBAL
+};
+
#define IS_UDP(x) (x == udp_transport)
#define NREAD_ADD 1
@@ -352,6 +357,7 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
+ uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
typedef struct {
@@ -531,9 +537,13 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
void item_unlink(item *it);
void item_update(item *it);
+void item_lock_global(void);
+void item_unlock_global(void);
void item_lock(uint32_t hv);
-int item_trylock(uint32_t hv);
+void *item_trylock(uint32_t hv);
+void item_trylock_unlock(void *arg);
void item_unlock(uint32_t hv);
+void switch_item_lock_type(enum item_lock_types type);
unsigned short refcount_incr(unsigned short *refcount);
unsigned short refcount_decr(unsigned short *refcount);
void STATS_LOCK(void);
diff --git a/slabs.c b/slabs.c
index 71202fd..c61722f 100644
--- a/slabs.c
+++ b/slabs.c
@@ -522,8 +522,9 @@ static int slab_rebalance_move(void) {
item *it = slab_rebal.slab_pos;
status = MOVE_PASS;
if (it->slabs_clsid != 255) {
+ void *hold_lock = NULL;
uint32_t hv = hash(ITEM_key(it), it->nkey, 0);
- if (item_trylock(hv) != 0) {
+ if ((hold_lock = item_trylock(hv)) == NULL) {
status = MOVE_LOCKED;
} else {
refcount = refcount_incr(&it->refcount);
@@ -557,7 +558,7 @@ static int slab_rebalance_move(void) {
}
status = MOVE_BUSY;
}
- item_unlock(hv);
+ item_trylock_unlock(hold_lock);
}
}
@@ -738,6 +739,8 @@ static void *slab_maintenance_thread(void *arg) {
*/
static void *slab_rebalance_thread(void *arg) {
int was_busy = 0;
+ /* So we first pass into cond_wait with the mutex held */
+ mutex_lock(&slabs_rebalance_lock);
while (do_run_slab_rebalance_thread) {
if (slab_rebalance_signal == 1) {
@@ -826,6 +829,15 @@ enum reassign_result_type slabs_reassign(int src, int dst) {
return ret;
}
+/* If we hold this lock, rebalancer can't wake up or move */
+void slabs_rebalancer_pause(void) {
+ pthread_mutex_lock(&slabs_rebalance_lock);
+}
+
+void slabs_rebalancer_resume(void) {
+ pthread_mutex_unlock(&slabs_rebalance_lock);
+}
+
static pthread_t maintenance_tid;
static pthread_t rebalance_tid;
diff --git a/slabs.h b/slabs.h
index 7c6140b..c649c06 100644
--- a/slabs.h
+++ b/slabs.h
@@ -43,4 +43,7 @@ enum reassign_result_type {
enum reassign_result_type slabs_reassign(int src, int dst);
+void slabs_rebalancer_pause(void);
+void slabs_rebalancer_resume(void);
+
#endif
diff --git a/thread.c b/thread.c
index 93867a5..086b783 100644
--- a/thread.c
+++ b/thread.c
@@ -57,8 +57,12 @@ static pthread_mutex_t cqi_freelist_lock;
static pthread_mutex_t *item_locks;
/* size of the item lock hash table */
static uint32_t item_lock_count;
-/* size - 1 for lookup masking */
-static uint32_t item_lock_mask;
+#define hashsize(n) ((unsigned long int)1<<(n))
+#define hashmask(n) (hashsize(n)-1)
+/* this lock is temporarily engaged during a hash table expansion */
+static pthread_mutex_t item_global_lock;
+/* thread-specific variable for deeply finding the item lock type */
+static pthread_key_t item_lock_type_key;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
@@ -108,16 +112,92 @@ unsigned short refcount_decr(unsigned short *refcount) {
#endif
}
+/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */
+void item_lock_global(void) {
+ mutex_lock(&item_global_lock);
+}
+
+void item_unlock_global(void) {
+ mutex_unlock(&item_global_lock);
+}
+
void item_lock(uint32_t hv) {
- mutex_lock(&item_locks[hv & item_lock_mask]);
+ uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
+ if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
+ mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
+ } else {
+ mutex_lock(&item_global_lock);
+ }
}
-int item_trylock(uint32_t hv) {
- return pthread_mutex_trylock(&item_locks[hv & item_lock_mask]);
+/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
+ * no-op, as it's only called from within the item lock if necessary.
+ * However, we can't mix a no-op and threads which are still synchronizing to
+ * GLOBAL. So instead we just always try to lock. When in GLOBAL mode this
+ * turns into an effective no-op. Threads re-synchronize after the power level
+ * switch so it should stay safe.
+ */
+void *item_trylock(uint32_t hv) {
+ pthread_mutex_t *lock = &item_locks[(hv & hashmask(hashpower)) % item_lock_count];
+ if (pthread_mutex_trylock(lock) == 0) {
+ return lock;
+ }
+ return NULL;
+}
+
+void item_trylock_unlock(void *lock) {
+ mutex_unlock((pthread_mutex_t *) lock);
}
void item_unlock(uint32_t hv) {
- mutex_unlock(&item_locks[hv & item_lock_mask]);
+ uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
+ if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
+ mutex_unlock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
+ } else {
+ mutex_unlock(&item_global_lock);
+ }
+}
+
+static void wait_for_thread_registration(int nthreads) {
+ while (init_count < nthreads) {
+ pthread_cond_wait(&init_cond, &init_lock);
+ }
+}
+
+static void register_thread_initialized(void) {
+ pthread_mutex_lock(&init_lock);
+ init_count++;
+ pthread_cond_signal(&init_cond);
+ pthread_mutex_unlock(&init_lock);
+}
+
+void switch_item_lock_type(enum item_lock_types type) {
+ char buf[1];
+ int i;
+
+ switch (type) {
+ case ITEM_LOCK_GRANULAR:
+ buf[0] = 'l';
+ break;
+ case ITEM_LOCK_GLOBAL:
+ buf[0] = 'g';
+ break;
+ default:
+ fprintf(stderr, "Unknown lock type: %d\n", type);
+ assert(1 == 0);
+ break;
+ }
+
+ pthread_mutex_lock(&init_lock);
+ init_count = 0;
+ for (i = 0; i < settings.num_threads; i++) {
+ if (write(threads[i].notify_send_fd, buf, 1) != 1) {
+ perror("Failed writing to notify pipe");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+ }
+ wait_for_thread_registration(settings.num_threads);
+ pthread_mutex_unlock(&init_lock);
}
/*
@@ -282,7 +362,6 @@ static void setup_thread(LIBEVENT_THREAD *me) {
}
}
-
/*
* Worker thread: main event loop
*/
@@ -293,10 +372,14 @@ static void *worker_libevent(void *arg) {
* all threads have finished initializing.
*/
- pthread_mutex_lock(&init_lock);
- init_count++;
- pthread_cond_signal(&init_cond);
- pthread_mutex_unlock(&init_lock);
+ /* set an indexable thread-specific memory item for the lock type.
+ * this could be unnecessary if we pass the conn *c struct through
+ * all item_lock calls...
+ */
+ me->item_lock_type = ITEM_LOCK_GRANULAR;
+ pthread_setspecific(item_lock_type_key, &me->item_lock_type);
+
+ register_thread_initialized();
event_base_loop(me->base, 0);
return NULL;
@@ -316,6 +399,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
+ switch (buf[0]) {
+ case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
@@ -337,6 +422,17 @@ static void thread_libevent_process(int fd, short which, void *arg) {
}
cqi_free(item);
}
+ break;
+ /* we were told to flip the lock type and report in */
+ case 'l':
+ me->item_lock_type = ITEM_LOCK_GRANULAR;
+ register_thread_initialized();
+ break;
+ case 'g':
+ me->item_lock_type = ITEM_LOCK_GLOBAL;
+ register_thread_initialized();
+ break;
+ }
}
/* Which thread we assigned a connection to most recently. */
@@ -350,6 +446,7 @@ static int last_thread = -1;
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
+ char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
@@ -365,7 +462,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
- if (write(thread->notify_send_fd, "", 1) != 1) {
+ buf[0] = 'c';
+ if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
@@ -690,8 +788,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
power = 13;
}
- item_lock_count = ((unsigned long int)1 << (power));
- item_lock_mask = item_lock_count - 1;
+ item_lock_count = hashsize(power);
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
@@ -701,6 +798,8 @@ void thread_init(int nthreads, struct event_base *main_base) {
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
+ pthread_key_create(&item_lock_type_key, NULL);
+ pthread_mutex_init(&item_global_lock, NULL);
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
@@ -733,9 +832,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
- while (init_count < nthreads) {
- pthread_cond_wait(&init_cond, &init_lock);
- }
+ wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}