diff options
Diffstat (limited to 'extstore.c')
-rw-r--r-- | extstore.c | 32 |
1 files changed, 28 insertions, 4 deletions
@@ -78,6 +78,7 @@ typedef struct { pthread_cond_t cond; obj_io *queue; store_engine *e; + unsigned int depth; // queue depth } store_io_thread; typedef struct { @@ -86,7 +87,6 @@ typedef struct { store_engine *e; } store_maint_thread; -/* TODO: Array of FDs for JBOD support */ struct store_engine { pthread_mutex_t mutex; /* covers internal stacks and variables */ store_page *pages; /* directly addressable page list */ @@ -126,10 +126,21 @@ static _store_wbuf *wbuf_new(size_t size) { } static store_io_thread *_get_io_thread(store_engine *e) { - int tid; + int tid = -1; + long long int low = LLONG_MAX; pthread_mutex_lock(&e->mutex); - tid = (e->last_io_thread + 1) % e->io_threadcount; - e->last_io_thread = tid; + // find smallest queue. ignoring lock since being wrong isn't fatal. + // TODO: if average queue depth can be quickly tracked, can break as soon + // as we see a thread that's less than average, and start from last_io_thread + for (int x = 0; x < e->io_threadcount; x++) { + if (e->io_threads[x].depth == 0) { + tid = x; + break; + } else if (e->io_threads[x].depth < low) { + tid = x; + low = e->io_threads[x].depth; + } + } pthread_mutex_unlock(&e->mutex); return &e->io_threads[tid]; @@ -154,6 +165,12 @@ void extstore_get_stats(void *ptr, struct extstore_stats *st) { st->pages_free = e->page_free; st->pages_used = e->page_count - e->page_free; pthread_mutex_unlock(&e->mutex); + st->io_queue = 0; + for (int x = 0; x < e->io_threadcount; x++) { + pthread_mutex_lock(&e->io_threads[x].mutex); + st->io_queue += e->io_threads[x].depth; + pthread_mutex_unlock(&e->io_threads[x].mutex); + } // calculate bytes_fragmented. // note that open and yet-filled pages count against fragmentation. st->bytes_fragmented = st->pages_used * e->page_size - @@ -579,6 +596,12 @@ int extstore_submit(void *ptr, obj_io *io) { } tmp->next = io; } + // TODO: extstore_submit(ptr, io, count) + obj_io *tio = io; + while (tio != NULL) { + t->depth++; + tio = tio->next; + } pthread_mutex_unlock(&t->mutex); //pthread_mutex_lock(&t->mutex); @@ -703,6 +726,7 @@ static void *extstore_io_thread(void *arg) { break; } } + me->depth -= i; me->queue = end->next; end->next = NULL; } |