summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extstore.c32
-rw-r--r--extstore.h1
-rw-r--r--memcached.c1
3 files changed, 30 insertions, 4 deletions
diff --git a/extstore.c b/extstore.c
index 726435c..cbfd4d6 100644
--- a/extstore.c
+++ b/extstore.c
@@ -78,6 +78,7 @@ typedef struct {
pthread_cond_t cond;
obj_io *queue;
store_engine *e;
+ unsigned int depth; // queue depth
} store_io_thread;
typedef struct {
@@ -86,7 +87,6 @@ typedef struct {
store_engine *e;
} store_maint_thread;
-/* TODO: Array of FDs for JBOD support */
struct store_engine {
pthread_mutex_t mutex; /* covers internal stacks and variables */
store_page *pages; /* directly addressable page list */
@@ -126,10 +126,21 @@ static _store_wbuf *wbuf_new(size_t size) {
}
static store_io_thread *_get_io_thread(store_engine *e) {
- int tid;
+ int tid = -1;
+ long long int low = LLONG_MAX;
pthread_mutex_lock(&e->mutex);
- tid = (e->last_io_thread + 1) % e->io_threadcount;
- e->last_io_thread = tid;
+ // find smallest queue. ignoring lock since being wrong isn't fatal.
+ // TODO: if average queue depth can be quickly tracked, can break as soon
+ // as we see a thread that's less than average, and start from last_io_thread
+ for (int x = 0; x < e->io_threadcount; x++) {
+ if (e->io_threads[x].depth == 0) {
+ tid = x;
+ break;
+ } else if (e->io_threads[x].depth < low) {
+ tid = x;
+ low = e->io_threads[x].depth;
+ }
+ }
pthread_mutex_unlock(&e->mutex);
return &e->io_threads[tid];
@@ -154,6 +165,12 @@ void extstore_get_stats(void *ptr, struct extstore_stats *st) {
st->pages_free = e->page_free;
st->pages_used = e->page_count - e->page_free;
pthread_mutex_unlock(&e->mutex);
+ st->io_queue = 0;
+ for (int x = 0; x < e->io_threadcount; x++) {
+ pthread_mutex_lock(&e->io_threads[x].mutex);
+ st->io_queue += e->io_threads[x].depth;
+ pthread_mutex_unlock(&e->io_threads[x].mutex);
+ }
// calculate bytes_fragmented.
// note that open and yet-filled pages count against fragmentation.
st->bytes_fragmented = st->pages_used * e->page_size -
@@ -579,6 +596,12 @@ int extstore_submit(void *ptr, obj_io *io) {
}
tmp->next = io;
}
+ // TODO: extstore_submit(ptr, io, count)
+ obj_io *tio = io;
+ while (tio != NULL) {
+ t->depth++;
+ tio = tio->next;
+ }
pthread_mutex_unlock(&t->mutex);
//pthread_mutex_lock(&t->mutex);
@@ -703,6 +726,7 @@ static void *extstore_io_thread(void *arg) {
break;
}
}
+ me->depth -= i;
me->queue = end->next;
end->next = NULL;
}
diff --git a/extstore.h b/extstore.h
index 6814415..0a27d0c 100644
--- a/extstore.h
+++ b/extstore.h
@@ -34,6 +34,7 @@ struct extstore_stats {
uint64_t bytes_read; /* wbuf - read -> bytes read from storage */
uint64_t bytes_used; /* total number of bytes stored */
uint64_t bytes_fragmented; /* see above comment */
+ uint64_t io_queue;
struct extstore_page_data *page_data;
};
diff --git a/memcached.c b/memcached.c
index c77f28e..f7c4359 100644
--- a/memcached.c
+++ b/memcached.c
@@ -3214,6 +3214,7 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
+ APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
}
#endif
}