diff options
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 131 |
1 files changed, 114 insertions, 17 deletions
@@ -57,8 +57,12 @@ static pthread_mutex_t cqi_freelist_lock; static pthread_mutex_t *item_locks; /* size of the item lock hash table */ static uint32_t item_lock_count; -/* size - 1 for lookup masking */ -static uint32_t item_lock_mask; +#define hashsize(n) ((unsigned long int)1<<(n)) +#define hashmask(n) (hashsize(n)-1) +/* this lock is temporarily engaged during a hash table expansion */ +static pthread_mutex_t item_global_lock; +/* thread-specific variable for deeply finding the item lock type */ +static pthread_key_t item_lock_type_key; static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; @@ -108,16 +112,92 @@ unsigned short refcount_decr(unsigned short *refcount) { #endif } +/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */ +void item_lock_global(void) { + mutex_lock(&item_global_lock); +} + +void item_unlock_global(void) { + mutex_unlock(&item_global_lock); +} + void item_lock(uint32_t hv) { - mutex_lock(&item_locks[hv & item_lock_mask]); + uint8_t *lock_type = pthread_getspecific(item_lock_type_key); + if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { + mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]); + } else { + mutex_lock(&item_global_lock); + } } -int item_trylock(uint32_t hv) { - return pthread_mutex_trylock(&item_locks[hv & item_lock_mask]); +/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a + * no-op, as it's only called from within the item lock if necessary. + * However, we can't mix a no-op and threads which are still synchronizing to + * GLOBAL. So instead we just always try to lock. When in GLOBAL mode this + * turns into an effective no-op. Threads re-synchronize after the power level + * switch so it should stay safe. + */ +void *item_trylock(uint32_t hv) { + pthread_mutex_t *lock = &item_locks[(hv & hashmask(hashpower)) % item_lock_count]; + if (pthread_mutex_trylock(lock) == 0) { + return lock; + } + return NULL; +} + +void item_trylock_unlock(void *lock) { + mutex_unlock((pthread_mutex_t *) lock); } void item_unlock(uint32_t hv) { - mutex_unlock(&item_locks[hv & item_lock_mask]); + uint8_t *lock_type = pthread_getspecific(item_lock_type_key); + if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { + mutex_unlock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]); + } else { + mutex_unlock(&item_global_lock); + } +} + +static void wait_for_thread_registration(int nthreads) { + while (init_count < nthreads) { + pthread_cond_wait(&init_cond, &init_lock); + } +} + +static void register_thread_initialized(void) { + pthread_mutex_lock(&init_lock); + init_count++; + pthread_cond_signal(&init_cond); + pthread_mutex_unlock(&init_lock); +} + +void switch_item_lock_type(enum item_lock_types type) { + char buf[1]; + int i; + + switch (type) { + case ITEM_LOCK_GRANULAR: + buf[0] = 'l'; + break; + case ITEM_LOCK_GLOBAL: + buf[0] = 'g'; + break; + default: + fprintf(stderr, "Unknown lock type: %d\n", type); + assert(1 == 0); + break; + } + + pthread_mutex_lock(&init_lock); + init_count = 0; + for (i = 0; i < settings.num_threads; i++) { + if (write(threads[i].notify_send_fd, buf, 1) != 1) { + perror("Failed writing to notify pipe"); + /* TODO: This is a fatal problem. Can it ever happen temporarily? */ + } + } + wait_for_thread_registration(settings.num_threads); + pthread_mutex_unlock(&init_lock); } /* @@ -282,7 +362,6 @@ static void setup_thread(LIBEVENT_THREAD *me) { } } - /* * Worker thread: main event loop */ @@ -293,10 +372,14 @@ static void *worker_libevent(void *arg) { * all threads have finished initializing. */ - pthread_mutex_lock(&init_lock); - init_count++; - pthread_cond_signal(&init_cond); - pthread_mutex_unlock(&init_lock); + /* set an indexable thread-specific memory item for the lock type. + * this could be unnecessary if we pass the conn *c struct through + * all item_lock calls... + */ + me->item_lock_type = ITEM_LOCK_GRANULAR; + pthread_setspecific(item_lock_type_key, &me->item_lock_type); + + register_thread_initialized(); event_base_loop(me->base, 0); return NULL; @@ -316,6 +399,8 @@ static void thread_libevent_process(int fd, short which, void *arg) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); + switch (buf[0]) { + case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { @@ -337,6 +422,17 @@ static void thread_libevent_process(int fd, short which, void *arg) { } cqi_free(item); } + break; + /* we were told to flip the lock type and report in */ + case 'l': + me->item_lock_type = ITEM_LOCK_GRANULAR; + register_thread_initialized(); + break; + case 'g': + me->item_lock_type = ITEM_LOCK_GLOBAL; + register_thread_initialized(); + break; + } } /* Which thread we assigned a connection to most recently. */ @@ -350,6 +446,7 @@ static int last_thread = -1; void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); + char buf[1]; int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; @@ -365,7 +462,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); - if (write(thread->notify_send_fd, "", 1) != 1) { + buf[0] = 'c'; + if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } } @@ -690,8 +788,7 @@ void thread_init(int nthreads, struct event_base *main_base) { power = 13; } - item_lock_count = ((unsigned long int)1 << (power)); - item_lock_mask = item_lock_count - 1; + item_lock_count = hashsize(power); item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t)); if (! item_locks) { @@ -701,6 +798,8 @@ void thread_init(int nthreads, struct event_base *main_base) { for (i = 0; i < item_lock_count; i++) { pthread_mutex_init(&item_locks[i], NULL); } + pthread_key_create(&item_lock_type_key, NULL); + pthread_mutex_init(&item_global_lock, NULL); threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { @@ -733,9 +832,7 @@ void thread_init(int nthreads, struct event_base *main_base) { /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); - while (init_count < nthreads) { - pthread_cond_wait(&init_cond, &init_lock); - } + wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); } |