diff options
author | Trond Norbye <Trond.Norbye@sun.com> | 2009-01-27 12:57:48 +0100 |
---|---|---|
committer | Dustin Sallings <dustin@spy.net> | 2009-01-27 11:42:44 -0800 |
commit | 7f09e20ba53830d3df7cfad4f560819a5f9d7b90 (patch) | |
tree | da40eecd40ea4868545e02084f2d45284dcbdfc1 | |
parent | 1c0285a0aa2c707283c3c07d3f729317dbe13bca (diff) | |
download | memcached-7f09e20ba53830d3df7cfad4f560819a5f9d7b90.tar.gz |
Do hash expansion in it's own thread
Previously we tried to migrate one bucket over to the new hash table before
we started a new command for a client, and we tried to lock the cache in order
to determine if we should move an item or not. This resulted in extra contention
on an already hot mutex...
-rw-r--r-- | assoc.c | 111 | ||||
-rw-r--r-- | assoc.h | 3 | ||||
-rw-r--r-- | memcached.c | 9 | ||||
-rw-r--r-- | memcached.h | 1 | ||||
-rw-r--r-- | thread.c | 14 |
5 files changed, 98 insertions, 40 deletions
@@ -25,6 +25,10 @@ #include <stdio.h> #include <string.h> #include <assert.h> +#include <pthread.h> + +static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER; + /* * Since the hash function does bit manipulation, it needs to know @@ -543,39 +547,13 @@ static void assoc_expand(void) { hashpower++; expanding = true; expand_bucket = 0; - do_assoc_move_next_bucket(); + pthread_cond_signal(&maintenance_cond); } else { primary_hashtable = old_hashtable; /* Bad news, but we can keep running. */ } } -/* migrates the next bucket to the primary hashtable if we're expanding. */ -void do_assoc_move_next_bucket(void) { - item *it, *next; - int bucket; - - if (expanding) { - for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { - next = it->h_next; - - bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower); - it->h_next = primary_hashtable[bucket]; - primary_hashtable[bucket] = it; - } - - old_hashtable[expand_bucket] = NULL; - - expand_bucket++; - if (expand_bucket == hashsize(hashpower - 1)) { - expanding = false; - free(old_hashtable); - if (settings.verbose > 1) - fprintf(stderr, "Hash table expansion done\n"); - } - } -} - /* Note: this isn't an assoc_update. The key must not already exist to call this */ int assoc_insert(item *it) { uint32_t hv; @@ -622,3 +600,82 @@ void assoc_delete(const char *key, const size_t nkey) { they can't find. */ assert(*before != 0); } + + +static volatile int do_run_maintenance_thread = 1; + +extern pthread_mutex_t cache_lock; +#define DEFAULT_HASH_BULK_MOVE 1 +int hash_bulk_move = DEFAULT_HASH_BULK_MOVE; + +static void *assoc_maintenance_thread(void *arg) { + + while (do_run_maintenance_thread) { + + /* Lock the cache, and bulk move multiple buckets to the new + * hash table. */ + pthread_mutex_lock(&cache_lock); + + for (int ii = 0; ii < hash_bulk_move && expanding; ++ii) { + item *it, *next; + int bucket; + + for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { + next = it->h_next; + + bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower); + it->h_next = primary_hashtable[bucket]; + primary_hashtable[bucket] = it; + } + + old_hashtable[expand_bucket] = NULL; + + expand_bucket++; + if (expand_bucket == hashsize(hashpower - 1)) { + expanding = false; + free(old_hashtable); + if (settings.verbose > 1) + fprintf(stderr, "Hash table expansion done\n"); + } + } + + if (!expanding) { + /* We are done expanding.. just wait for next invocation */ + pthread_cond_wait(&maintenance_cond, &cache_lock); + } + + pthread_mutex_unlock(&cache_lock); + } + return NULL; +} + +static pthread_t maintenance_tid; + +int start_assoc_maintenance_thread() { + int ret; + char *env = getenv("MEMCACHED_HASH_BULK_MOVE"); + if (env != NULL) { + hash_bulk_move = atoi(env); + if (hash_bulk_move == 0) { + hash_bulk_move = DEFAULT_HASH_BULK_MOVE; + } + } + if ((ret = pthread_create(&maintenance_tid, NULL, + assoc_maintenance_thread, NULL)) != 0) { + fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); + return -1; + } + return 0; +} + +void stop_assoc_maintenance_thread() { + pthread_mutex_lock(&cache_lock); + do_run_maintenance_thread = 0; + pthread_cond_signal(&maintenance_cond); + pthread_mutex_unlock(&cache_lock); + + /* Wait for the maintenance thread to stop */ + pthread_join(maintenance_tid, NULL); +} + + @@ -5,3 +5,6 @@ int assoc_insert(item *item); void assoc_delete(const char *key, const size_t nkey); void do_assoc_move_next_bucket(void); uint32_t hash( const void *key, size_t length, const uint32_t initval); +int start_assoc_maintenance_thread(void); +void stop_assoc_maintenance_thread(void); + diff --git a/memcached.c b/memcached.c index 0d80eaa..86245c0 100644 --- a/memcached.c +++ b/memcached.c @@ -3290,7 +3290,6 @@ static void drive_machine(conn *c) { case conn_new_cmd: reset_cmd_handler(c); - assoc_move_next_bucket(); break; case conn_nread: @@ -4215,6 +4214,11 @@ int main (int argc, char **argv) { thread_init(settings.num_threads, main_base); /* save the PID in if we're a daemon, do this after thread_init due to a file descriptor handling bug somewhere in libevent */ + + if (start_assoc_maintenance_thread() == -1) { + exit(EXIT_FAILURE); + } + if (do_daemonize) save_pid(getpid(), pid_file); /* initialise clock event */ @@ -4263,6 +4267,9 @@ int main (int argc, char **argv) { /* enter the event loop */ event_base_loop(main_base, 0); + + stop_assoc_maintenance_thread(); + /* remove the PID file if we're a daemon */ if (do_daemonize) remove_pidfile(pid_file); diff --git a/memcached.h b/memcached.h index 02a0e2a..c732633 100644 --- a/memcached.h +++ b/memcached.h @@ -323,7 +323,6 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in /* Lock wrappers for cache functions that are called from main loop. */ char *add_delta(conn *c, item *item, const int incr, const int64_t delta, char *buf); -void assoc_move_next_bucket(void); conn *conn_from_freelist(void); bool conn_add_to_freelist(conn *c); char *suffix_from_freelist(void); @@ -51,7 +51,7 @@ static pthread_mutex_t conn_lock; static pthread_mutex_t suffix_lock; /* Lock for cache operations (item_*, assoc_*) */ -static pthread_mutex_t cache_lock; +pthread_mutex_t cache_lock; /* Lock for slab allocator operations */ static pthread_mutex_t slabs_lock; @@ -530,14 +530,6 @@ char *item_stats_sizes(uint32_t (*add_stats)(char *buf, return ret; } -/****************************** HASHTABLE MODULE *****************************/ - -void assoc_move_next_bucket() { - pthread_mutex_lock(&cache_lock); - do_assoc_move_next_bucket(); - pthread_mutex_unlock(&cache_lock); -} - /******************************* SLAB ALLOCATOR ******************************/ void *slabs_alloc(size_t size, unsigned int id) { @@ -580,11 +572,11 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) { /******************************* GLOBAL STATS ******************************/ void STATS_LOCK() { - pthread_mutex_lock(&stats_lock); + /* pthread_mutex_lock(&stats_lock); */ } void STATS_UNLOCK() { - pthread_mutex_unlock(&stats_lock); + /* pthread_mutex_unlock(&stats_lock); */ } /* |