diff options
-rw-r--r-- | assoc.c | 35 | ||||
-rw-r--r-- | items.c | 9 | ||||
-rw-r--r-- | items.h | 2 | ||||
-rw-r--r-- | memcached.h | 13 | ||||
-rw-r--r-- | thread.c | 85 |
5 files changed, 73 insertions, 71 deletions
@@ -26,7 +26,7 @@ #include <pthread.h> static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER; - +static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER; typedef unsigned long int ub4; /* unsigned 4-byte quantities */ typedef unsigned char ub1; /* unsigned 1-byte quantities */ @@ -147,11 +147,6 @@ static void assoc_start_expand(void) { if (started_expanding) return; - /*With this condition, we can expanding holding only one item lock, - * and it should always be false*/ - if (item_lock_hashpower >= hashpower) - return; - started_expanding = true; pthread_cond_signal(&maintenance_cond); } @@ -209,11 +204,11 @@ int hash_bulk_move = DEFAULT_HASH_BULK_MOVE; static void *assoc_maintenance_thread(void *arg) { + mutex_lock(&maintenance_lock); while (do_run_maintenance_thread) { int ii = 0; - /* As there is only one thread process expanding, and we hold the item - * lock, it seems not necessary to hold the cache_lock . */ + /* There is only one expansion thread, so no need to global lock. */ for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { item *it, *next; int bucket; @@ -223,7 +218,6 @@ static void *assoc_maintenance_thread(void *arg) { * is the lowest N bits of the hv, and the bucket of item_locks is * also the lowest M bits of hv, and N is greater than M. * So we can process expanding with only one item_lock. cool! */ - /*Get item lock for the slot in old hashtable*/ if ((item_lock = item_trylock(expand_bucket))) { for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { next = it->h_next; @@ -247,9 +241,7 @@ static void *assoc_maintenance_thread(void *arg) { } } else { - /*wait for 100ms. since only one expanding thread, it's not - * necessary to sleep a random value*/ - usleep(100*1000); + usleep(10*1000); } if (item_lock) { @@ -260,11 +252,18 @@ static void *assoc_maintenance_thread(void *arg) { if (!expanding) { /* We are done expanding.. just wait for next invocation */ - mutex_lock(&cache_lock); started_expanding = false; - pthread_cond_wait(&maintenance_cond, &cache_lock); + pthread_cond_wait(&maintenance_cond, &maintenance_lock); + /* assoc_expand() swaps out the hash table entirely, so we need + * all threads to not hold any references related to the hash + * table while this happens. + * This is instead of a more complex, possibly slower algorithm to + * allow dynamic hash table expansion without causing significant + * wait times. + */ + pause_threads(PAUSE_ALL_THREADS); assoc_expand(); - mutex_unlock(&cache_lock); + pause_threads(RESUME_ALL_THREADS); } } return NULL; @@ -281,6 +280,7 @@ int start_assoc_maintenance_thread() { hash_bulk_move = DEFAULT_HASH_BULK_MOVE; } } + pthread_mutex_init(&maintenance_lock, NULL); if ((ret = pthread_create(&maintenance_tid, NULL, assoc_maintenance_thread, NULL)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); @@ -290,13 +290,12 @@ int start_assoc_maintenance_thread() { } void stop_assoc_maintenance_thread() { - mutex_lock(&cache_lock); + mutex_lock(&maintenance_lock); do_run_maintenance_thread = 0; pthread_cond_signal(&maintenance_cond); - mutex_unlock(&cache_lock); + mutex_unlock(&maintenance_lock); /* Wait for the maintenance thread to stop */ pthread_join(maintenance_tid, NULL); } - @@ -958,6 +958,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) { return CRAWLER_OK; } +/* If we hold this lock, crawler can't wake up or move */ +void lru_crawler_pause(void) { + pthread_mutex_lock(&lru_crawler_lock); +} + +void lru_crawler_resume(void) { + pthread_mutex_unlock(&lru_crawler_lock); +} + int init_lru_crawler(void) { if (lru_crawler_initialized == 0) { if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) { @@ -36,3 +36,5 @@ int start_item_crawler_thread(void); int stop_item_crawler_thread(void); int init_lru_crawler(void); enum crawler_result_type lru_crawler_crawl(char *slabs); +void lru_crawler_pause(void); +void lru_crawler_resume(void); diff --git a/memcached.h b/memcached.h index 15595f8..36b3e65 100644 --- a/memcached.h +++ b/memcached.h @@ -188,9 +188,11 @@ enum network_transport { udp_transport }; -enum item_lock_types { - ITEM_LOCK_GRANULAR = 0, - ITEM_LOCK_GLOBAL +enum pause_thread_types { + PAUSE_WORKER_THREADS = 0, + PAUSE_ALL_THREADS, + RESUME_ALL_THREADS, + RESUME_WORKER_THREADS }; #define IS_UDP(x) (x == udp_transport) @@ -389,7 +391,6 @@ typedef struct { struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ - uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD; typedef struct { @@ -574,13 +575,11 @@ void item_stats_sizes(ADD_STAT add_stats, void *c); void item_unlink(item *it); void item_update(item *it); -void item_lock_global(void); -void item_unlock_global(void); void item_lock(uint32_t hv); void *item_trylock(uint32_t hv); void item_trylock_unlock(void *arg); void item_unlock(uint32_t hv); -void switch_item_lock_type(enum item_lock_types type); +void pause_threads(enum pause_thread_types type); unsigned short refcount_incr(unsigned short *refcount); unsigned short refcount_decr(unsigned short *refcount); void STATS_LOCK(void); @@ -49,6 +49,9 @@ pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER; /* Lock for global stats */ static pthread_mutex_t stats_lock; +/* Lock to cause worker threads to hang up after being woken */ +static pthread_mutex_t worker_hang_lock; + /* Free list of CQ_ITEM structs */ static CQ_ITEM *cqi_freelist; static pthread_mutex_t cqi_freelist_lock; @@ -59,10 +62,6 @@ static uint32_t item_lock_count; unsigned int item_lock_hashpower; #define hashsize(n) ((unsigned long int)1<<(n)) #define hashmask(n) (hashsize(n)-1) -/* this lock is temporarily engaged during a hash table expansion */ -static pthread_mutex_t item_global_lock; -/* thread-specific variable for deeply finding the item lock type */ -static pthread_key_t item_lock_type_key; static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; @@ -112,22 +111,8 @@ unsigned short refcount_decr(unsigned short *refcount) { #endif } -/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */ -void item_lock_global(void) { - mutex_lock(&item_global_lock); -} - -void item_unlock_global(void) { - mutex_unlock(&item_global_lock); -} - void item_lock(uint32_t hv) { - uint8_t *lock_type = pthread_getspecific(item_lock_type_key); - if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { - mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]); - } else { - mutex_lock(&item_global_lock); - } + mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]); } /* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a @@ -150,12 +135,7 @@ void item_trylock_unlock(void *lock) { } void item_unlock(uint32_t hv) { - uint8_t *lock_type = pthread_getspecific(item_lock_type_key); - if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { - mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]); - } else { - mutex_unlock(&item_global_lock); - } + mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]); } static void wait_for_thread_registration(int nthreads) { @@ -169,18 +149,32 @@ static void register_thread_initialized(void) { init_count++; pthread_cond_signal(&init_cond); pthread_mutex_unlock(&init_lock); + /* Force worker threads to pile up if someone wants us to */ + pthread_mutex_lock(&worker_hang_lock); + pthread_mutex_unlock(&worker_hang_lock); } -void switch_item_lock_type(enum item_lock_types type) { +/* Must not be called with any deeper locks held: + * item locks, cache_lock, stats_lock, etc + */ +void pause_threads(enum pause_thread_types type) { char buf[1]; int i; + buf[0] = 0; switch (type) { - case ITEM_LOCK_GRANULAR: - buf[0] = 'l'; + case PAUSE_ALL_THREADS: + slabs_rebalancer_pause(); + lru_crawler_pause(); + case PAUSE_WORKER_THREADS: + buf[0] = 'p'; + pthread_mutex_lock(&worker_hang_lock); break; - case ITEM_LOCK_GLOBAL: - buf[0] = 'g'; + case RESUME_ALL_THREADS: + slabs_rebalancer_resume(); + lru_crawler_resume(); + case RESUME_WORKER_THREADS: + pthread_mutex_unlock(&worker_hang_lock); break; default: fprintf(stderr, "Unknown lock type: %d\n", type); @@ -188,6 +182,11 @@ void switch_item_lock_type(enum item_lock_types type) { break; } + /* Only send a message if we have one. */ + if (buf[0] == 0) { + return; + } + pthread_mutex_lock(&init_lock); init_count = 0; for (i = 0; i < settings.num_threads; i++) { @@ -374,13 +373,6 @@ static void *worker_libevent(void *arg) { * all threads have finished initializing. */ - /* set an indexable thread-specific memory item for the lock type. - * this could be unnecessary if we pass the conn *c struct through - * all item_lock calls... - */ - me->item_lock_type = ITEM_LOCK_GRANULAR; - pthread_setspecific(item_lock_type_key, &me->item_lock_type); - register_thread_initialized(); event_base_loop(me->base, 0); @@ -425,13 +417,8 @@ static void thread_libevent_process(int fd, short which, void *arg) { cqi_free(item); } break; - /* we were told to flip the lock type and report in */ - case 'l': - me->item_lock_type = ITEM_LOCK_GRANULAR; - register_thread_initialized(); - break; - case 'g': - me->item_lock_type = ITEM_LOCK_GLOBAL; + /* we were told to pause and report in */ + case 'p': register_thread_initialized(); break; } @@ -784,6 +771,7 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) { pthread_mutex_init(&cache_lock, NULL); pthread_mutex_init(&stats_lock, NULL); + pthread_mutex_init(&worker_hang_lock, NULL); pthread_mutex_init(&init_lock, NULL); pthread_cond_init(&init_cond, NULL); @@ -803,6 +791,13 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) { power = 13; } + if (power >= hashpower) { + fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power); + fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n"); + fprintf(stderr, "Hash table grows with `-o hashpower=N` \n"); + exit(1); + } + item_lock_count = hashsize(power); item_lock_hashpower = power; @@ -814,8 +809,6 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) { for (i = 0; i < item_lock_count; i++) { pthread_mutex_init(&item_locks[i], NULL); } - pthread_key_create(&item_lock_type_key, NULL); - pthread_mutex_init(&item_global_lock, NULL); threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { |