diff options
author | dormando <dormando@rydia.net> | 2018-07-16 21:58:43 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2018-08-06 15:11:04 -0700 |
commit | 89bf7ab1cfea2c24d08b9de697215ac7f61a0362 (patch) | |
tree | 25bdbe36326abb864f02caec690c4f4893d65326 /extstore.c | |
parent | 954f4e044b3f1641da66910e4564cd91dfb83712 (diff) | |
download | memcached-89bf7ab1cfea2c24d08b9de697215ac7f61a0362.tar.gz |
extstore JBOD support
Just a Bunch Of Devices :P
code exists for routing specific devices to specific buckets
(lowttl/compact/etc), but enabling it requires significant fixes to
compaction algorithm. Thus it is disabled as of this writing.
code cleanups and future work:
- pedantically freeing memory and closing fd's on exit
- unify and flatten the free_bucket code
- defines for free buckets
- page eviction adjustment (force min-free per free bucket)
- fix default calculation for compact_under and drop_under
- might require forcing this value only on default bucket
Diffstat (limited to 'extstore.c')
-rw-r--r-- | extstore.c | 124 |
1 files changed, 93 insertions, 31 deletions
@@ -62,6 +62,7 @@ typedef struct _store_page { unsigned int allocated; unsigned int written; /* item offsets can be past written if wbuf not flushed */ unsigned int bucket; /* which bucket the page is linked into */ + unsigned int free_bucket; /* which bucket this page returns to when freed */ int fd; unsigned short id; bool active; /* actively being written to */ @@ -95,6 +96,7 @@ struct store_engine { store_maint_thread *maint_thread; store_page *page_freelist; store_page **page_buckets; /* stack of pages currently allocated to each bucket */ + store_page **free_page_buckets; /* stack of use-case isolated free pages */ size_t page_size; unsigned int version; /* global version counter */ unsigned int last_io_thread; /* round robin the IO threads */ @@ -102,6 +104,7 @@ struct store_engine { unsigned int page_count; unsigned int page_free; /* unallocated pages */ unsigned int page_bucketcount; /* count of potential page buckets */ + unsigned int free_page_bucketcount; /* count of free page buckets */ unsigned int io_depth; /* FIXME: Might cache into thr struct */ pthread_mutex_t stats_mutex; struct extstore_stats stats; @@ -192,11 +195,11 @@ const char *extstore_err(enum extstore_res res) { return rv; } -void *extstore_init(char *fn, struct extstore_conf *cf, +// TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc +void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf, enum extstore_res *res) { int i; - int fd; - uint64_t offset = 0; + struct extstore_conf_file *f = NULL; pthread_t thread; if (cf->page_size % cf->wbuf_size != 0) { @@ -227,43 +230,72 @@ void *extstore_init(char *fn, struct extstore_conf *cf, } e->page_size = cf->page_size; - fd = open(fn, O_RDWR | O_CREAT | O_TRUNC, 0644); - if (fd < 0) { - *res = EXTSTORE_INIT_OPEN_FAIL; + for (f = fh; f != NULL; f = f->next) { + f->fd = open(f->file, O_RDWR | O_CREAT | O_TRUNC, 0644); + if (f->fd < 0) { + *res = EXTSTORE_INIT_OPEN_FAIL; #ifdef EXTSTORE_DEBUG - perror("open"); + perror("open"); #endif - free(e); - return NULL; + free(e); + return NULL; + } + e->page_count += f->page_count; + f->offset = 0; } - e->pages = calloc(cf->page_count, sizeof(store_page)); + e->pages = calloc(e->page_count, sizeof(store_page)); if (e->pages == NULL) { *res = EXTSTORE_INIT_OOM; - close(fd); + // FIXME: loop-close. make error label free(e); return NULL; } - for (i = 0; i < cf->page_count; i++) { + // interleave the pages between devices + f = NULL; // start at the first device. + for (i = 0; i < e->page_count; i++) { + // find next device with available pages + while (1) { + // restart the loop + if (f == NULL || f->next == NULL) { + f = fh; + } else { + f = f->next; + } + if (f->page_count) { + f->page_count--; + break; + } + } pthread_mutex_init(&e->pages[i].mutex, NULL); e->pages[i].id = i; - e->pages[i].fd = fd; - e->pages[i].offset = offset; + e->pages[i].fd = f->fd; + e->pages[i].free_bucket = f->free_bucket; + e->pages[i].offset = f->offset; e->pages[i].free = true; - offset += e->page_size; + f->offset += e->page_size; } - for (i = cf->page_count-1; i > 0; i--) { - e->pages[i].next = e->page_freelist; - e->page_freelist = &e->pages[i]; + // free page buckets allows the app to organize devices by use case + e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *)); + e->page_bucketcount = cf->page_buckets; + + for (i = e->page_count-1; i > 0; i--) { e->page_free++; + if (e->pages[i].free_bucket == 0) { + e->pages[i].next = e->page_freelist; + e->page_freelist = &e->pages[i]; + } else { + int fb = e->pages[i].free_bucket; + e->pages[i].next = e->free_page_buckets[fb]; + e->free_page_buckets[fb] = &e->pages[i]; + } } // 0 is magic "page is freed" version e->version = 1; - e->page_count = cf->page_count; // scratch data for stats. TODO: malloc failure handle e->stats.page_data = calloc(e->page_count, sizeof(struct extstore_page_data)); @@ -309,6 +341,8 @@ void *extstore_init(char *fn, struct extstore_conf *cf, pthread_cond_init(&e->maint_thread->cond, NULL); pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread); + extstore_run_maint(e); + return (void *)e; } @@ -318,13 +352,25 @@ void extstore_run_maint(void *ptr) { } // call with *e locked -static store_page *_allocate_page(store_engine *e, unsigned int bucket) { +static store_page *_allocate_page(store_engine *e, unsigned int bucket, + unsigned int free_bucket) { assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size); - store_page *tmp = e->page_freelist; - E_DEBUG("EXTSTORE: allocating new page\n"); - if (e->page_free > 0) { - assert(e->page_freelist != NULL); + store_page *tmp = NULL; + // if a specific free bucket was requested, check there first + if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) { + assert(e->page_free > 0); + tmp = e->free_page_buckets[free_bucket]; + e->free_page_buckets[free_bucket] = tmp->next; + } + // failing that, try the global list. + if (tmp == NULL && e->page_freelist != NULL) { + tmp = e->page_freelist; e->page_freelist = tmp->next; + } + E_DEBUG("EXTSTORE: allocating new page\n"); + // page_freelist can be empty if the only free pages are specialized and + // we didn't just request one. + if (e->page_free > 0 && tmp != NULL) { tmp->next = e->page_buckets[bucket]; e->page_buckets[bucket] = tmp; tmp->active = true; @@ -434,7 +480,8 @@ static void _submit_wbuf(store_engine *e, store_page *p) { * new page. best if used from a background thread that can harmlessly retry. */ -int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) { +int extstore_write_request(void *ptr, unsigned int bucket, + unsigned int free_bucket, obj_io *io) { store_engine *e = (store_engine *)ptr; store_page *p; int ret = -1; @@ -444,7 +491,7 @@ int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) { pthread_mutex_lock(&e->mutex); p = e->page_buckets[bucket]; if (!p) { - p = _allocate_page(e, bucket); + p = _allocate_page(e, bucket, free_bucket); } pthread_mutex_unlock(&e->mutex); if (!p) @@ -458,7 +505,7 @@ int extstore_write_request(void *ptr, unsigned int bucket, obj_io *io) { ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) { pthread_mutex_unlock(&p->mutex); pthread_mutex_lock(&e->mutex); - _allocate_page(e, bucket); + _allocate_page(e, bucket, free_bucket); pthread_mutex_unlock(&e->mutex); return ret; } @@ -764,8 +811,14 @@ static void _free_page(store_engine *e, store_page *p) { p->closed = false; p->free = true; // add to page stack - p->next = e->page_freelist; - e->page_freelist = p; + // TODO: free_page_buckets first class and remove redundancy? + if (p->free_bucket != 0) { + p->next = e->free_page_buckets[p->free_bucket]; + e->free_page_buckets[p->free_bucket] = p; + } else { + p->next = e->page_freelist; + e->page_freelist = p; + } e->page_free++; pthread_mutex_unlock(&e->mutex); } @@ -797,7 +850,9 @@ static void *extstore_maint_thread(void *arg) { pthread_cond_wait(&me->cond, &me->mutex); pthread_mutex_lock(&e->mutex); - if (e->page_free == 0) { + // default freelist requires at least one page free. + // specialized freelists fall back to default once full. + if (e->page_free == 0 || e->page_freelist == NULL) { do_evict = true; } pthread_mutex_unlock(&e->mutex); @@ -806,6 +861,7 @@ static void *extstore_maint_thread(void *arg) { for (i = 0; i < e->page_count; i++) { store_page *p = &e->pages[i]; pthread_mutex_lock(&p->mutex); + pd[p->id].free_bucket = p->free_bucket; if (p->active || p->free) { pthread_mutex_unlock(&p->mutex); continue; @@ -814,7 +870,13 @@ static void *extstore_maint_thread(void *arg) { pd[p->id].version = p->version; pd[p->id].bytes_used = p->bytes_used; pd[p->id].bucket = p->bucket; - if (p->version < low_version) { + // low_version/low_page are only used in the eviction + // scenario. when we evict, it's only to fill the default page + // bucket again. + // TODO: experiment with allowing evicting up to a single page + // for any specific free bucket. this is *probably* required + // since it could cause a load bias on default-only devices? + if (p->free_bucket == 0 && p->version < low_version) { low_version = p->version; low_page = i; } |