summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.c25
-rw-r--r--memcached.h14
-rw-r--r--storage.c11
-rw-r--r--storage.h6
-rw-r--r--thread.c32
5 files changed, 67 insertions, 21 deletions
diff --git a/memcached.c b/memcached.c
index a029061..0daee12 100644
--- a/memcached.c
+++ b/memcached.c
@@ -558,7 +558,7 @@ void conn_worker_readd(conn *c) {
}
}
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb) {
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
@@ -569,6 +569,7 @@ void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu
q->submit_cb = cb;
q->complete_cb = com_cb;
q->finalize_cb = fin_cb;
+ q->return_cb = ret_cb;
return;
}
@@ -589,17 +590,27 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
static void conn_io_queue_complete(conn *c) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
- // Reuse the same submit stack. We zero it out first so callbacks can
- // queue new IO's if necessary.
if (q->stack_ctx) {
- void *tmp = q->stack_ctx;
- q->stack_ctx = NULL;
- q->complete_cb(q->ctx, tmp);
+ q->complete_cb(q);
}
q++;
}
}
+// called to return a single IO object to the original worker thread.
+void conn_io_queue_return(io_pending_t *io) {
+ io_queue_t *q = io->q;
+ int ret = q->return_cb(io);
+ // An IO may or may not count against the pending responses.
+ if (ret) {
+ q->count--;
+ if (q->count == 0) {
+ conn_worker_readd(io->c);
+ }
+ }
+ return;
+}
+
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
@@ -3227,7 +3238,7 @@ static void drive_machine(conn *c) {
if (q->count != 0) {
assert(q->stack_ctx != NULL);
hit = true;
- q->submit_cb(q->ctx, q->stack_ctx);
+ q->submit_cb(q);
c->io_queues_submitted++;
}
}
diff --git a/memcached.h b/memcached.h
index aab21cd..e8ed48b 100644
--- a/memcached.h
+++ b/memcached.h
@@ -697,8 +697,9 @@ typedef struct conn conn;
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1
-typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
-typedef void (*io_queue_cb)(io_pending_t *pending);
+typedef struct io_queue_s io_queue_t;
+typedef void (*io_queue_stack_cb)(io_queue_t *q);
+typedef int (*io_queue_cb)(io_pending_t *pending);
// this structure's ownership gets passed between threads:
// - owned normally by the worker thread.
// - multiple queues can be submitted at the same time.
@@ -714,15 +715,16 @@ typedef void (*io_queue_cb)(io_pending_t *pending);
//
// All of this is to avoid having to hit a mutex owned by the connection
// thread that gets pinged for each thread (or an equivalent atomic).
-typedef struct {
+struct io_queue_s {
void *ctx; // untouched ptr for specific context
void *stack_ctx; // module-specific context to be batch-submitted
io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
io_queue_stack_cb complete_cb;
+ io_queue_cb return_cb; // called on worker thread.
io_queue_cb finalize_cb; // called back on the worker thread.
int type;
int count; // ios to process before returning. only accessed by queue processor once submitted
-} io_queue_t;
+};
struct _io_pending_t {
io_queue_t *q;
@@ -859,8 +861,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb);
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
io_queue_t *conn_io_queue_get(conn *c, int type);
+void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);
@@ -888,6 +891,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
+void return_io_pending(conn *c, io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);
diff --git a/storage.c b/storage.c
index 70ac78c..3c81a02 100644
--- a/storage.c
+++ b/storage.c
@@ -366,9 +366,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
return 0;
}
-void storage_submit_cb(void *ctx, void *stack) {
+void storage_submit_cb(io_queue_t *q) {
// Don't need to do anything special for extstore.
- extstore_submit(ctx, stack);
+ extstore_submit(q->ctx, q->stack_ctx);
}
static void recache_or_free(io_pending_t *pending) {
@@ -446,12 +446,14 @@ static void recache_or_free(io_pending_t *pending) {
// TODO: stubbed with a reminder: should be able to move most of the extstore
// callback code into this code instead, executing on worker thread instead of
// IO thread.
-void storage_complete_cb(void *ctx, void *stack_ctx) {
+void storage_complete_cb(io_queue_t *q) {
+ // need to reset the stack for next use.
+ q->stack_ctx = NULL;
return;
}
// Called after responses have been transmitted. Need to free up related data.
-void storage_finalize_cb(io_pending_t *pending) {
+int storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
@@ -461,6 +463,7 @@ void storage_finalize_cb(io_pending_t *pending) {
io->iov = NULL;
}
// don't need to free the main context, since it's embedded.
+ return 0; // return code ignored.
}
/*
diff --git a/storage.h b/storage.h
index ca59102..9257c3f 100644
--- a/storage.h
+++ b/storage.h
@@ -18,9 +18,9 @@ bool storage_validate_item(void *e, item *it);
int storage_get_item(conn *c, item *it, mc_resp *resp);
// callbacks for the IO queue subsystem.
-void storage_submit_cb(void *ctx, void *stack);
-void storage_complete_cb(void *ctx, void *stack);
-void storage_finalize_cb(io_pending_t *pending);
+void storage_submit_cb(io_queue_t *q);
+void storage_complete_cb(io_queue_t *q);
+int storage_finalize_cb(io_pending_t *pending);
// Thread functions.
int start_storage_write_thread(void *arg);
diff --git a/thread.c b/thread.c
index d969a09..c64de52 100644
--- a/thread.c
+++ b/thread.c
@@ -35,6 +35,7 @@ enum conn_queue_item_modes {
queue_timeout, /* socket sfd timed out */
queue_redispatch, /* return conn from side thread */
queue_stop, /* exit thread */
+ queue_return_io, /* returning a pending IO object immediately */
};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
@@ -46,6 +47,7 @@ struct conn_queue_item {
enum conn_queue_item_modes mode;
conn *c;
void *ssl;
+ io_pending_t *io; // IO when used for deferred IO handling.
STAILQ_ENTRY(conn_queue_item) i_next;
};
@@ -321,6 +323,12 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
cache_free(cq->cache, item);
}
+// TODO: Skip notify if queue wasn't empty?
+// - Requires cq_push() returning a "was empty" flag
+// - Requires event handling loop to pop the entire queue and work from that
+// instead of the ev_count work there now.
+// In testing this does result in a large performance uptick, but unclear how
+// much that will transfer from a synthetic benchmark.
static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
cq_push(t->ev_queue, item);
#ifdef HAVE_EVENTFD
@@ -558,10 +566,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
#ifdef EXTSTORE
if (c->thread->storage) {
conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, storage_finalize_cb);
+ storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
}
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
#ifdef TLS
if (settings.ssl_enabled && c->ssl != NULL) {
@@ -587,6 +595,10 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
/* asked to stop */
event_base_loopexit(me->base, NULL);
break;
+ case queue_return_io:
+ /* getting an individual IO object back */
+ conn_io_queue_return(item->io);
+ break;
}
cqi_free(me->ev_queue, item);
@@ -714,6 +726,22 @@ void timeout_conn(conn *c) {
notify_worker_fd(c->thread, c->sfd, queue_timeout);
}
+void return_io_pending(conn *c, io_pending_t *io) {
+ CQ_ITEM *item = cqi_new(c->thread->ev_queue);
+ if (item == NULL) {
+ // TODO: how can we avoid this?
+ // In the main case I just loop, since a malloc failure here for a
+ // tiny object that's generally in a fixed size queue is going to
+ // implode shortly.
+ return;
+ }
+
+ item->mode = queue_return_io;
+ item->io = io;
+
+ notify_worker(c->thread, item);
+}
+
/* This misses the allow_new_conns flag :( */
void sidethread_conn_close(conn *c) {
if (settings.verbose > 1)