summaryrefslogtreecommitdiff
path: root/extstore.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2018-07-16 21:58:43 -0700
committerdormando <dormando@rydia.net>2018-08-06 15:11:04 -0700
commit89bf7ab1cfea2c24d08b9de697215ac7f61a0362 (patch)
tree25bdbe36326abb864f02caec690c4f4893d65326 /extstore.c
parent954f4e044b3f1641da66910e4564cd91dfb83712 (diff)
downloadmemcached-89bf7ab1cfea2c24d08b9de697215ac7f61a0362.tar.gz
extstore JBOD support
Just a Bunch Of Devices :P code exists for routing specific devices to specific buckets (lowttl/compact/etc), but enabling it requires significant fixes to compaction algorithm. Thus it is disabled as of this writing. code cleanups and future work: - pedantically freeing memory and closing fd's on exit - unify and flatten the free_bucket code - defines for free buckets - page eviction adjustment (force min-free per free bucket) - fix default calculation for compact_under and drop_under - might require forcing this value only on default bucket
Diffstat (limited to 'extstore.c')
-rw-r--r--extstore.c124
1 files changed, 93 insertions, 31 deletions
diff --git a/extstore.c b/extstore.c
index 23f0f14..726435c 100644
--- a/extstore.c
+++ b/extstore.c
@@ -62,6 +62,7 @@ typedef struct _store_page {
unsigned int allocated;
unsigned int written; /* item offsets can be past written if wbuf not flushed */
unsigned int bucket; /* which bucket the page is linked into */
+ unsigned int free_bucket; /* which bucket this page returns to when freed */
int fd;
unsigned short id;
bool active; /* actively being written to */
@@ -95,6 +96,7 @@ struct store_engine {
store_maint_thread *maint_thread;
store_page *page_freelist;
store_page **page_buckets; /* stack of pages currently allocated to each bucket */
+ store_page **free_page_buckets; /* stack of use-case isolated free pages */
size_t page_size;
unsigned int version; /* global version counter */
unsigned int last_io_thread; /* round robin the IO threads */
@@ -102,6 +104,7 @@ struct store_engine {
unsigned int page_count;
unsigned int page_free; /* unallocated pages */
unsigned int page_bucketcount; /* count of potential page buckets */
+ unsigned int free_page_bucketcount; /* count of free page buckets */
unsigned int io_depth; /* FIXME: Might cache into thr struct */
pthread_mutex_t stats_mutex;
struct extstore_stats stats;
@@ -192,11 +195,11 @@ const char *extstore_err(enum extstore_res res) {
return rv;
}
-void *extstore_init(char *fn, struct extstore_conf *cf,
+// TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
+void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
enum extstore_res *res) {
int i;
- int fd;
- uint64_t offset = 0;
+ struct extstore_conf_file *f = NULL;
pthread_t thread;
if (cf->page_size % cf->wbuf_size != 0) {
@@ -227,43 +230,72 @@ void *extstore_init(char *fn, struct extstore_conf *cf,
}
e->page_size = cf->page_size;
- fd = open(fn, O_RDWR | O_CREAT | O_TRUNC, 0644);
- if (fd < 0) {
- *res = EXTSTORE_INIT_OPEN_FAIL;
+ for (f = fh; f != NULL; f = f->next) {
+ f->fd = open(f->file, O_RDWR | O_CREAT | O_TRUNC, 0644);
+ if (f->fd < 0) {
+ *res = EXTSTORE_INIT_OPEN_FAIL;
#ifdef EXTSTORE_DEBUG
- perror("open");
+ perror("open");
#endif
- free(e);
- return NULL;
+ free(e);
+ return NULL;
+ }
+ e->page_count += f->page_count;
+ f->offset = 0;
}
- e->pages = calloc(cf->page_count, sizeof(store_page));
+ e->pages = calloc(e->page_count, sizeof(store_page));
if (e->pages == NULL) {
*res = EXTSTORE_INIT_OOM;
- close(fd);
+ // FIXME: loop-close. make error label
free(e);
return NULL;
}
- for (i = 0; i < cf->page_count; i++) {
+ // interleave the pages between devices
+ f = NULL; // start at the first device.
+ for (i = 0; i < e->page_count; i++) {
+ // find next device with available pages
+ while (1) {
+ // restart the loop
+ if (f == NULL || f->next == NULL) {
+ f = fh;
+ } else {
+ f = f->next;
+ }
+ if (f->page_count) {
+ f->page_count--;
+ break;
+ }
+ }
pthread_mutex_init(&e->pages[i].mutex, NULL);
e->pages[i].id = i;
- e->pages[i].fd = fd;
- e->pages[i].offset = offset;
+ e->pages[i].fd = f->fd;
+ e->pages[i].free_bucket = f->free_bucket;
+ e->pages[i].offset = f->offset;
e->pages[i].free = true;
- offset += e->page_size;
+ f->offset += e->page_size;
}
- for (i = cf->page_count-1; i > 0; i--) {
- e->pages[i].next = e->page_freelist;
- e->page_freelist = &e->pages[i];
+ // free page buckets allows the app to organize devices by use case
+ e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
+ e->page_bucketcount = cf->page_buckets;
+
+ for (i = e->page_count-1; i > 0; i--) {
e->page_free++;
+ if (e->pages[i].free_bucket == 0) {
+ e->pages[i].next = e->page_freelist;
+ e->page_freelist = &e->pages[i];
+ } else {
+ int fb = e->pages[i].free_bucket;
+ e->pages[i].next = e->free_page_buckets[fb];
+ e->free_page_buckets[fb] = &e->pages[i];
+ }
}
// 0 is magic "page is freed" version
e->version = 1;
- e->page_count = cf->page_count;
// scratch data for stats. TODO: malloc failure handle
e->stats.page_data =
calloc(e->page_count, sizeof(struct extstore_page_data));
@@ -309,6 +341,8 @@ void *extstore_init(char *fn, struct extstore_conf *cf,
pthread_cond_init(&e->maint_thread->cond, NULL);
pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
+ extstore_run_maint(e);
+
return (void *)e;
}
@@ -318,13 +352,25 @@ void extstore_run_maint(void *ptr) {
}
// call with *e locked
-static store_page *_allocate_page(store_engine *e, unsigned int bucket) {
+static store_page *_allocate_page(store_engine *e, unsigned int bucket,
+ unsigned int free_bucket) {
assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
- store_page *tmp = e->page_freelist;
- E_DEBUG("EXTSTORE: allocating new page\n");
- if (e->page_free > 0) {
- assert(e->page_freelist != NULL);
+ store_page *tmp = NULL;
+ // if a specific free bucket was requested, check there first
+ if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) {
+ assert(e->page_free > 0);
+ tmp = e->free_page_buckets[free_bucket];
+ e->free_page_buckets[free_bucket] = tmp->next;
+ }
+ // failing that, try the global list.
+ if (tmp == NULL && e->page_freelist != NULL) {
+ tmp = e->page_freelist;
e->page_freelist = tmp->next;
+ }
+ E_DEBUG("EXTSTORE: allocating new page\n");
+ // page_freelist can be empty if the only free pages are specialized and
+ // we didn't just request one.
+ if (e->page_free > 0 && tmp != NULL) {
tmp->next = e->page_buckets[bucket];
e->page_buckets[bucket] = tmp;
tmp->active = true;
@@ -434,7 +480,8 @@ static void _submit_wbuf(store_engine *e, store_page *p) {
* new page. best if used from a background thread that can harmlessly retry.
*/
-int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) {
+int extstore_write_request(void *ptr, unsigned int bucket,
+ unsigned int free_bucket, obj_io *io) {
store_engine *e = (store_engine *)ptr;
store_page *p;
int ret = -1;
@@ -444,7 +491,7 @@ int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) {
pthread_mutex_lock(&e->mutex);
p = e->page_buckets[bucket];
if (!p) {
- p = _allocate_page(e, bucket);
+ p = _allocate_page(e, bucket, free_bucket);
}
pthread_mutex_unlock(&e->mutex);
if (!p)
@@ -458,7 +505,7 @@ int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) {
((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
pthread_mutex_unlock(&p->mutex);
pthread_mutex_lock(&e->mutex);
- _allocate_page(e, bucket);
+ _allocate_page(e, bucket, free_bucket);
pthread_mutex_unlock(&e->mutex);
return ret;
}
@@ -764,8 +811,14 @@ static void _free_page(store_engine *e, store_page *p) {
p->closed = false;
p->free = true;
// add to page stack
- p->next = e->page_freelist;
- e->page_freelist = p;
+ // TODO: free_page_buckets first class and remove redundancy?
+ if (p->free_bucket != 0) {
+ p->next = e->free_page_buckets[p->free_bucket];
+ e->free_page_buckets[p->free_bucket] = p;
+ } else {
+ p->next = e->page_freelist;
+ e->page_freelist = p;
+ }
e->page_free++;
pthread_mutex_unlock(&e->mutex);
}
@@ -797,7 +850,9 @@ static void *extstore_maint_thread(void *arg) {
pthread_cond_wait(&me->cond, &me->mutex);
pthread_mutex_lock(&e->mutex);
- if (e->page_free == 0) {
+ // default freelist requires at least one page free.
+ // specialized freelists fall back to default once full.
+ if (e->page_free == 0 || e->page_freelist == NULL) {
do_evict = true;
}
pthread_mutex_unlock(&e->mutex);
@@ -806,6 +861,7 @@ static void *extstore_maint_thread(void *arg) {
for (i = 0; i < e->page_count; i++) {
store_page *p = &e->pages[i];
pthread_mutex_lock(&p->mutex);
+ pd[p->id].free_bucket = p->free_bucket;
if (p->active || p->free) {
pthread_mutex_unlock(&p->mutex);
continue;
@@ -814,7 +870,13 @@ static void *extstore_maint_thread(void *arg) {
pd[p->id].version = p->version;
pd[p->id].bytes_used = p->bytes_used;
pd[p->id].bucket = p->bucket;
- if (p->version < low_version) {
+ // low_version/low_page are only used in the eviction
+ // scenario. when we evict, it's only to fill the default page
+ // bucket again.
+ // TODO: experiment with allowing evicting up to a single page
+ // for any specific free bucket. this is *probably* required
+ // since it could cause a load bias on default-only devices?
+ if (p->free_bucket == 0 && p->version < low_version) {
low_version = p->version;
low_page = i;
}