summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assoc.c111
-rw-r--r--assoc.h3
-rw-r--r--memcached.c9
-rw-r--r--memcached.h1
-rw-r--r--thread.c14
5 files changed, 98 insertions, 40 deletions
diff --git a/assoc.c b/assoc.c
index cc31794..2fb4d53 100644
--- a/assoc.c
+++ b/assoc.c
@@ -25,6 +25,10 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
+#include <pthread.h>
+
+static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
+
/*
* Since the hash function does bit manipulation, it needs to know
@@ -543,39 +547,13 @@ static void assoc_expand(void) {
hashpower++;
expanding = true;
expand_bucket = 0;
- do_assoc_move_next_bucket();
+ pthread_cond_signal(&maintenance_cond);
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}
-/* migrates the next bucket to the primary hashtable if we're expanding. */
-void do_assoc_move_next_bucket(void) {
- item *it, *next;
- int bucket;
-
- if (expanding) {
- for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- next = it->h_next;
-
- bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
- it->h_next = primary_hashtable[bucket];
- primary_hashtable[bucket] = it;
- }
-
- old_hashtable[expand_bucket] = NULL;
-
- expand_bucket++;
- if (expand_bucket == hashsize(hashpower - 1)) {
- expanding = false;
- free(old_hashtable);
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion done\n");
- }
- }
-}
-
/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(item *it) {
uint32_t hv;
@@ -622,3 +600,82 @@ void assoc_delete(const char *key, const size_t nkey) {
they can't find. */
assert(*before != 0);
}
+
+
+static volatile int do_run_maintenance_thread = 1;
+
+extern pthread_mutex_t cache_lock;
+#define DEFAULT_HASH_BULK_MOVE 1
+int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
+
+static void *assoc_maintenance_thread(void *arg) {
+
+ while (do_run_maintenance_thread) {
+
+ /* Lock the cache, and bulk move multiple buckets to the new
+ * hash table. */
+ pthread_mutex_lock(&cache_lock);
+
+ for (int ii = 0; ii < hash_bulk_move && expanding; ++ii) {
+ item *it, *next;
+ int bucket;
+
+ for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
+ next = it->h_next;
+
+ bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
+ it->h_next = primary_hashtable[bucket];
+ primary_hashtable[bucket] = it;
+ }
+
+ old_hashtable[expand_bucket] = NULL;
+
+ expand_bucket++;
+ if (expand_bucket == hashsize(hashpower - 1)) {
+ expanding = false;
+ free(old_hashtable);
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion done\n");
+ }
+ }
+
+ if (!expanding) {
+ /* We are done expanding.. just wait for next invocation */
+ pthread_cond_wait(&maintenance_cond, &cache_lock);
+ }
+
+ pthread_mutex_unlock(&cache_lock);
+ }
+ return NULL;
+}
+
+static pthread_t maintenance_tid;
+
+int start_assoc_maintenance_thread() {
+ int ret;
+ char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
+ if (env != NULL) {
+ hash_bulk_move = atoi(env);
+ if (hash_bulk_move == 0) {
+ hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
+ }
+ }
+ if ((ret = pthread_create(&maintenance_tid, NULL,
+ assoc_maintenance_thread, NULL)) != 0) {
+ fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
+ return -1;
+ }
+ return 0;
+}
+
+void stop_assoc_maintenance_thread() {
+ pthread_mutex_lock(&cache_lock);
+ do_run_maintenance_thread = 0;
+ pthread_cond_signal(&maintenance_cond);
+ pthread_mutex_unlock(&cache_lock);
+
+ /* Wait for the maintenance thread to stop */
+ pthread_join(maintenance_tid, NULL);
+}
+
+
diff --git a/assoc.h b/assoc.h
index 3b32162..b8ae9c3 100644
--- a/assoc.h
+++ b/assoc.h
@@ -5,3 +5,6 @@ int assoc_insert(item *item);
void assoc_delete(const char *key, const size_t nkey);
void do_assoc_move_next_bucket(void);
uint32_t hash( const void *key, size_t length, const uint32_t initval);
+int start_assoc_maintenance_thread(void);
+void stop_assoc_maintenance_thread(void);
+
diff --git a/memcached.c b/memcached.c
index 0d80eaa..86245c0 100644
--- a/memcached.c
+++ b/memcached.c
@@ -3290,7 +3290,6 @@ static void drive_machine(conn *c) {
case conn_new_cmd:
reset_cmd_handler(c);
- assoc_move_next_bucket();
break;
case conn_nread:
@@ -4215,6 +4214,11 @@ int main (int argc, char **argv) {
thread_init(settings.num_threads, main_base);
/* save the PID in if we're a daemon, do this after thread_init due to
a file descriptor handling bug somewhere in libevent */
+
+ if (start_assoc_maintenance_thread() == -1) {
+ exit(EXIT_FAILURE);
+ }
+
if (do_daemonize)
save_pid(getpid(), pid_file);
/* initialise clock event */
@@ -4263,6 +4267,9 @@ int main (int argc, char **argv) {
/* enter the event loop */
event_base_loop(main_base, 0);
+
+ stop_assoc_maintenance_thread();
+
/* remove the PID file if we're a daemon */
if (do_daemonize)
remove_pidfile(pid_file);
diff --git a/memcached.h b/memcached.h
index 02a0e2a..c732633 100644
--- a/memcached.h
+++ b/memcached.h
@@ -323,7 +323,6 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in
/* Lock wrappers for cache functions that are called from main loop. */
char *add_delta(conn *c, item *item, const int incr, const int64_t delta,
char *buf);
-void assoc_move_next_bucket(void);
conn *conn_from_freelist(void);
bool conn_add_to_freelist(conn *c);
char *suffix_from_freelist(void);
diff --git a/thread.c b/thread.c
index 60d9e5a..c670d5e 100644
--- a/thread.c
+++ b/thread.c
@@ -51,7 +51,7 @@ static pthread_mutex_t conn_lock;
static pthread_mutex_t suffix_lock;
/* Lock for cache operations (item_*, assoc_*) */
-static pthread_mutex_t cache_lock;
+pthread_mutex_t cache_lock;
/* Lock for slab allocator operations */
static pthread_mutex_t slabs_lock;
@@ -530,14 +530,6 @@ char *item_stats_sizes(uint32_t (*add_stats)(char *buf,
return ret;
}
-/****************************** HASHTABLE MODULE *****************************/
-
-void assoc_move_next_bucket() {
- pthread_mutex_lock(&cache_lock);
- do_assoc_move_next_bucket();
- pthread_mutex_unlock(&cache_lock);
-}
-
/******************************* SLAB ALLOCATOR ******************************/
void *slabs_alloc(size_t size, unsigned int id) {
@@ -580,11 +572,11 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) {
/******************************* GLOBAL STATS ******************************/
void STATS_LOCK() {
- pthread_mutex_lock(&stats_lock);
+ /* pthread_mutex_lock(&stats_lock); */
}
void STATS_UNLOCK() {
- pthread_mutex_unlock(&stats_lock);
+ /* pthread_mutex_unlock(&stats_lock); */
}
/*