From 1c94e12c3d7615e4859eddde88ed7e53cd127d32 Mon Sep 17 00:00:00 2001 From: dormando Date: Sat, 18 Aug 2012 12:54:01 -0700 Subject: item locks now lock hash table buckets expansion requires switching to a global lock temporarily, so all buckets have a covered read lock. slab rebalancer is paused during hash table expansion. internal item "trylocks" are always issued, and tracked as the hash power variable can change out from under it. --- assoc.c | 30 +++++++++++--- assoc.h | 2 +- items.c | 15 ++++--- memcached.h | 12 +++++- slabs.c | 16 +++++++- slabs.h | 3 ++ thread.c | 131 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 7 files changed, 178 insertions(+), 31 deletions(-) diff --git a/assoc.c b/assoc.c index bcaf6f2..0edce2b 100644 --- a/assoc.c +++ b/assoc.c @@ -32,7 +32,7 @@ typedef unsigned long int ub4; /* unsigned 4-byte quantities */ typedef unsigned char ub1; /* unsigned 1-byte quantities */ /* how many powers of 2's worth of buckets we use */ -static unsigned int hashpower = HASHPOWER_DEFAULT; +unsigned int hashpower = HASHPOWER_DEFAULT; #define hashsize(n) ((ub4)1<<(n)) #define hashmask(n) (hashsize(n)-1) @@ -51,6 +51,7 @@ static unsigned int hash_items = 0; /* Flag: Are we in the middle of expanding now? */ static bool expanding = false; +static bool started_expanding = false; /* * During expansion we migrate values with bucket granularity; this is how @@ -136,13 +137,19 @@ static void assoc_expand(void) { stats.hash_bytes += hashsize(hashpower) * sizeof(void *); stats.hash_is_expanding = 1; STATS_UNLOCK(); - pthread_cond_signal(&maintenance_cond); } else { primary_hashtable = old_hashtable; /* Bad news, but we can keep running. */ } } +static void assoc_start_expand(void) { + if (started_expanding) + return; + started_expanding = true; + pthread_cond_signal(&maintenance_cond); +} + /* Note: this isn't an assoc_update. The key must not already exist to call this */ int assoc_insert(item *it, const uint32_t hv) { unsigned int oldbucket; @@ -161,7 +168,7 @@ int assoc_insert(item *it, const uint32_t hv) { hash_items++; if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { - assoc_expand(); + assoc_start_expand(); } MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items); @@ -201,6 +208,7 @@ static void *assoc_maintenance_thread(void *arg) { /* Lock the cache, and bulk move multiple buckets to the new * hash table. */ + item_lock_global(); mutex_lock(&cache_lock); for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { @@ -230,12 +238,24 @@ static void *assoc_maintenance_thread(void *arg) { } } + mutex_unlock(&cache_lock); + item_unlock_global(); + if (!expanding) { + /* finished expanding. tell all threads to use fine-grained locks */ + switch_item_lock_type(ITEM_LOCK_GRANULAR); + started_expanding = false; + slabs_rebalancer_resume(); /* We are done expanding.. just wait for next invocation */ pthread_cond_wait(&maintenance_cond, &cache_lock); + /* Before doing anything, tell threads to use a global lock */ + mutex_unlock(&cache_lock); + slabs_rebalancer_pause(); + switch_item_lock_type(ITEM_LOCK_GLOBAL); + mutex_lock(&cache_lock); + assoc_expand(); + mutex_unlock(&cache_lock); } - - mutex_unlock(&cache_lock); } return NULL; } diff --git a/assoc.h b/assoc.h index ccdfdd5..fbea1d3 100644 --- a/assoc.h +++ b/assoc.h @@ -6,4 +6,4 @@ void assoc_delete(const char *key, const size_t nkey, const uint32_t hv); void do_assoc_move_next_bucket(void); int start_assoc_maintenance_thread(void); void stop_assoc_maintenance_thread(void); - +extern unsigned int hashpower; diff --git a/items.c b/items.c index 3b26265..d8f4188 100644 --- a/items.c +++ b/items.c @@ -105,6 +105,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, int tries = 5; int tried_alloc = 0; item *search; + void *hold_lock = NULL; rel_time_t oldest_live = settings.oldest_live; search = tails[id]; @@ -115,7 +116,8 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, /* Attempt to hash item lock the "search" item. If locked, no * other callers can incr the refcount */ - if (hv != cur_hv && item_trylock(hv) != 0) + /* FIXME: I think we need to mask the hv here for comparison? */ + if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL) continue; /* Now see if the item is refcount locked */ if (refcount_incr(&search->refcount) != 2) { @@ -128,7 +130,8 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, search->refcount = 1; do_item_unlink_nolock(search, hv); } - item_unlock(hv); + if (hold_lock) + item_trylock_unlock(hold_lock); continue; } @@ -187,7 +190,9 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, } refcount_decr(&search->refcount); - item_unlock(hv); + /* If hash values were equal, we don't grab a second lock */ + if (hold_lock) + item_trylock_unlock(hold_lock); break; } @@ -506,7 +511,7 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) { /** wrapper around assoc_find which does the lazy expiration logic */ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { - mutex_lock(&cache_lock); + //mutex_lock(&cache_lock); item *it = assoc_find(key, nkey, hv); if (it != NULL) { refcount_incr(&it->refcount); @@ -520,7 +525,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { it = NULL; } } - mutex_unlock(&cache_lock); + //mutex_unlock(&cache_lock); int was_found = 0; if (settings.verbose > 2) { diff --git a/memcached.h b/memcached.h index 0517d4a..0594ab9 100644 --- a/memcached.h +++ b/memcached.h @@ -180,6 +180,11 @@ enum network_transport { udp_transport }; +enum item_lock_types { + ITEM_LOCK_GRANULAR = 0, + ITEM_LOCK_GLOBAL +}; + #define IS_UDP(x) (x == udp_transport) #define NREAD_ADD 1 @@ -352,6 +357,7 @@ typedef struct { struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ + uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD; typedef struct { @@ -531,9 +537,13 @@ void item_stats_sizes(ADD_STAT add_stats, void *c); void item_unlink(item *it); void item_update(item *it); +void item_lock_global(void); +void item_unlock_global(void); void item_lock(uint32_t hv); -int item_trylock(uint32_t hv); +void *item_trylock(uint32_t hv); +void item_trylock_unlock(void *arg); void item_unlock(uint32_t hv); +void switch_item_lock_type(enum item_lock_types type); unsigned short refcount_incr(unsigned short *refcount); unsigned short refcount_decr(unsigned short *refcount); void STATS_LOCK(void); diff --git a/slabs.c b/slabs.c index 71202fd..c61722f 100644 --- a/slabs.c +++ b/slabs.c @@ -522,8 +522,9 @@ static int slab_rebalance_move(void) { item *it = slab_rebal.slab_pos; status = MOVE_PASS; if (it->slabs_clsid != 255) { + void *hold_lock = NULL; uint32_t hv = hash(ITEM_key(it), it->nkey, 0); - if (item_trylock(hv) != 0) { + if ((hold_lock = item_trylock(hv)) == NULL) { status = MOVE_LOCKED; } else { refcount = refcount_incr(&it->refcount); @@ -557,7 +558,7 @@ static int slab_rebalance_move(void) { } status = MOVE_BUSY; } - item_unlock(hv); + item_trylock_unlock(hold_lock); } } @@ -738,6 +739,8 @@ static void *slab_maintenance_thread(void *arg) { */ static void *slab_rebalance_thread(void *arg) { int was_busy = 0; + /* So we first pass into cond_wait with the mutex held */ + mutex_lock(&slabs_rebalance_lock); while (do_run_slab_rebalance_thread) { if (slab_rebalance_signal == 1) { @@ -826,6 +829,15 @@ enum reassign_result_type slabs_reassign(int src, int dst) { return ret; } +/* If we hold this lock, rebalancer can't wake up or move */ +void slabs_rebalancer_pause(void) { + pthread_mutex_lock(&slabs_rebalance_lock); +} + +void slabs_rebalancer_resume(void) { + pthread_mutex_unlock(&slabs_rebalance_lock); +} + static pthread_t maintenance_tid; static pthread_t rebalance_tid; diff --git a/slabs.h b/slabs.h index 7c6140b..c649c06 100644 --- a/slabs.h +++ b/slabs.h @@ -43,4 +43,7 @@ enum reassign_result_type { enum reassign_result_type slabs_reassign(int src, int dst); +void slabs_rebalancer_pause(void); +void slabs_rebalancer_resume(void); + #endif diff --git a/thread.c b/thread.c index 93867a5..086b783 100644 --- a/thread.c +++ b/thread.c @@ -57,8 +57,12 @@ static pthread_mutex_t cqi_freelist_lock; static pthread_mutex_t *item_locks; /* size of the item lock hash table */ static uint32_t item_lock_count; -/* size - 1 for lookup masking */ -static uint32_t item_lock_mask; +#define hashsize(n) ((unsigned long int)1<<(n)) +#define hashmask(n) (hashsize(n)-1) +/* this lock is temporarily engaged during a hash table expansion */ +static pthread_mutex_t item_global_lock; +/* thread-specific variable for deeply finding the item lock type */ +static pthread_key_t item_lock_type_key; static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; @@ -108,16 +112,92 @@ unsigned short refcount_decr(unsigned short *refcount) { #endif } +/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */ +void item_lock_global(void) { + mutex_lock(&item_global_lock); +} + +void item_unlock_global(void) { + mutex_unlock(&item_global_lock); +} + void item_lock(uint32_t hv) { - mutex_lock(&item_locks[hv & item_lock_mask]); + uint8_t *lock_type = pthread_getspecific(item_lock_type_key); + if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { + mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]); + } else { + mutex_lock(&item_global_lock); + } } -int item_trylock(uint32_t hv) { - return pthread_mutex_trylock(&item_locks[hv & item_lock_mask]); +/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a + * no-op, as it's only called from within the item lock if necessary. + * However, we can't mix a no-op and threads which are still synchronizing to + * GLOBAL. So instead we just always try to lock. When in GLOBAL mode this + * turns into an effective no-op. Threads re-synchronize after the power level + * switch so it should stay safe. + */ +void *item_trylock(uint32_t hv) { + pthread_mutex_t *lock = &item_locks[(hv & hashmask(hashpower)) % item_lock_count]; + if (pthread_mutex_trylock(lock) == 0) { + return lock; + } + return NULL; +} + +void item_trylock_unlock(void *lock) { + mutex_unlock((pthread_mutex_t *) lock); } void item_unlock(uint32_t hv) { - mutex_unlock(&item_locks[hv & item_lock_mask]); + uint8_t *lock_type = pthread_getspecific(item_lock_type_key); + if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { + mutex_unlock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]); + } else { + mutex_unlock(&item_global_lock); + } +} + +static void wait_for_thread_registration(int nthreads) { + while (init_count < nthreads) { + pthread_cond_wait(&init_cond, &init_lock); + } +} + +static void register_thread_initialized(void) { + pthread_mutex_lock(&init_lock); + init_count++; + pthread_cond_signal(&init_cond); + pthread_mutex_unlock(&init_lock); +} + +void switch_item_lock_type(enum item_lock_types type) { + char buf[1]; + int i; + + switch (type) { + case ITEM_LOCK_GRANULAR: + buf[0] = 'l'; + break; + case ITEM_LOCK_GLOBAL: + buf[0] = 'g'; + break; + default: + fprintf(stderr, "Unknown lock type: %d\n", type); + assert(1 == 0); + break; + } + + pthread_mutex_lock(&init_lock); + init_count = 0; + for (i = 0; i < settings.num_threads; i++) { + if (write(threads[i].notify_send_fd, buf, 1) != 1) { + perror("Failed writing to notify pipe"); + /* TODO: This is a fatal problem. Can it ever happen temporarily? */ + } + } + wait_for_thread_registration(settings.num_threads); + pthread_mutex_unlock(&init_lock); } /* @@ -282,7 +362,6 @@ static void setup_thread(LIBEVENT_THREAD *me) { } } - /* * Worker thread: main event loop */ @@ -293,10 +372,14 @@ static void *worker_libevent(void *arg) { * all threads have finished initializing. */ - pthread_mutex_lock(&init_lock); - init_count++; - pthread_cond_signal(&init_cond); - pthread_mutex_unlock(&init_lock); + /* set an indexable thread-specific memory item for the lock type. + * this could be unnecessary if we pass the conn *c struct through + * all item_lock calls... + */ + me->item_lock_type = ITEM_LOCK_GRANULAR; + pthread_setspecific(item_lock_type_key, &me->item_lock_type); + + register_thread_initialized(); event_base_loop(me->base, 0); return NULL; @@ -316,6 +399,8 @@ static void thread_libevent_process(int fd, short which, void *arg) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); + switch (buf[0]) { + case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { @@ -337,6 +422,17 @@ static void thread_libevent_process(int fd, short which, void *arg) { } cqi_free(item); } + break; + /* we were told to flip the lock type and report in */ + case 'l': + me->item_lock_type = ITEM_LOCK_GRANULAR; + register_thread_initialized(); + break; + case 'g': + me->item_lock_type = ITEM_LOCK_GLOBAL; + register_thread_initialized(); + break; + } } /* Which thread we assigned a connection to most recently. */ @@ -350,6 +446,7 @@ static int last_thread = -1; void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); + char buf[1]; int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; @@ -365,7 +462,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); - if (write(thread->notify_send_fd, "", 1) != 1) { + buf[0] = 'c'; + if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } } @@ -690,8 +788,7 @@ void thread_init(int nthreads, struct event_base *main_base) { power = 13; } - item_lock_count = ((unsigned long int)1 << (power)); - item_lock_mask = item_lock_count - 1; + item_lock_count = hashsize(power); item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t)); if (! item_locks) { @@ -701,6 +798,8 @@ void thread_init(int nthreads, struct event_base *main_base) { for (i = 0; i < item_lock_count; i++) { pthread_mutex_init(&item_locks[i], NULL); } + pthread_key_create(&item_lock_type_key, NULL); + pthread_mutex_init(&item_global_lock, NULL); threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { @@ -733,9 +832,7 @@ void thread_init(int nthreads, struct event_base *main_base) { /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); - while (init_count < nthreads) { - pthread_cond_wait(&init_cond, &init_lock); - } + wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); } -- cgit v1.2.1