summaryrefslogtreecommitdiff
path: root/assoc.c
diff options
context:
space:
mode:
Diffstat (limited to 'assoc.c')
-rw-r--r--assoc.c111
1 files changed, 84 insertions, 27 deletions
diff --git a/assoc.c b/assoc.c
index cc31794..2fb4d53 100644
--- a/assoc.c
+++ b/assoc.c
@@ -25,6 +25,10 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
+#include <pthread.h>
+
+static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
+
/*
* Since the hash function does bit manipulation, it needs to know
@@ -543,39 +547,13 @@ static void assoc_expand(void) {
hashpower++;
expanding = true;
expand_bucket = 0;
- do_assoc_move_next_bucket();
+ pthread_cond_signal(&maintenance_cond);
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}
-/* migrates the next bucket to the primary hashtable if we're expanding. */
-void do_assoc_move_next_bucket(void) {
- item *it, *next;
- int bucket;
-
- if (expanding) {
- for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- next = it->h_next;
-
- bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
- it->h_next = primary_hashtable[bucket];
- primary_hashtable[bucket] = it;
- }
-
- old_hashtable[expand_bucket] = NULL;
-
- expand_bucket++;
- if (expand_bucket == hashsize(hashpower - 1)) {
- expanding = false;
- free(old_hashtable);
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion done\n");
- }
- }
-}
-
/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(item *it) {
uint32_t hv;
@@ -622,3 +600,82 @@ void assoc_delete(const char *key, const size_t nkey) {
they can't find. */
assert(*before != 0);
}
+
+
+static volatile int do_run_maintenance_thread = 1;
+
+extern pthread_mutex_t cache_lock;
+#define DEFAULT_HASH_BULK_MOVE 1
+int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
+
+static void *assoc_maintenance_thread(void *arg) {
+
+ while (do_run_maintenance_thread) {
+
+ /* Lock the cache, and bulk move multiple buckets to the new
+ * hash table. */
+ pthread_mutex_lock(&cache_lock);
+
+ for (int ii = 0; ii < hash_bulk_move && expanding; ++ii) {
+ item *it, *next;
+ int bucket;
+
+ for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
+ next = it->h_next;
+
+ bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
+ it->h_next = primary_hashtable[bucket];
+ primary_hashtable[bucket] = it;
+ }
+
+ old_hashtable[expand_bucket] = NULL;
+
+ expand_bucket++;
+ if (expand_bucket == hashsize(hashpower - 1)) {
+ expanding = false;
+ free(old_hashtable);
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion done\n");
+ }
+ }
+
+ if (!expanding) {
+ /* We are done expanding.. just wait for next invocation */
+ pthread_cond_wait(&maintenance_cond, &cache_lock);
+ }
+
+ pthread_mutex_unlock(&cache_lock);
+ }
+ return NULL;
+}
+
+static pthread_t maintenance_tid;
+
+int start_assoc_maintenance_thread() {
+ int ret;
+ char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
+ if (env != NULL) {
+ hash_bulk_move = atoi(env);
+ if (hash_bulk_move == 0) {
+ hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
+ }
+ }
+ if ((ret = pthread_create(&maintenance_tid, NULL,
+ assoc_maintenance_thread, NULL)) != 0) {
+ fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
+ return -1;
+ }
+ return 0;
+}
+
+void stop_assoc_maintenance_thread() {
+ pthread_mutex_lock(&cache_lock);
+ do_run_maintenance_thread = 0;
+ pthread_cond_signal(&maintenance_cond);
+ pthread_mutex_unlock(&cache_lock);
+
+ /* Wait for the maintenance thread to stop */
+ pthread_join(maintenance_tid, NULL);
+}
+
+