summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.c76
-rw-r--r--memcached.h31
-rw-r--r--storage.c40
-rw-r--r--storage.h12
-rw-r--r--thread.c5
5 files changed, 103 insertions, 61 deletions
diff --git a/memcached.c b/memcached.c
index 0c0445e..94597ea 100644
--- a/memcached.c
+++ b/memcached.c
@@ -531,25 +531,29 @@ void conn_worker_readd(conn *c) {
drive_machine(c);
return;
}
- c->state = conn_new_cmd;
// If we had IO objects, process
if (c->io_queued) {
c->io_queued = false;
- conn_set_state(c, conn_mwrite);
+ // state should be conn_io_queue already, so it will know how to
+ // dequeue and finalize the async work.
drive_machine(c);
+ } else {
+ conn_set_state(c, conn_new_cmd);
}
}
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb) {
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
q++;
}
q->type = type;
q->ctx = ctx;
- q->cb = cb;
- q->free_cb = free_cb;
+ q->stack_ctx = NULL;
+ q->submit_cb = cb;
+ q->complete_cb = com_cb;
+ q->finalize_cb = fin_cb;
return;
}
@@ -564,6 +568,23 @@ io_queue_t *conn_io_queue_get(conn *c, int type) {
return NULL;
}
+// called after returning to the main worker thread.
+// users of the queue need to distinguish if the IO was actually consumed or
+// not and handle appropriately.
+static void conn_io_queue_complete(conn *c) {
+ io_queue_t *q = c->io_queues;
+ while (q->type != IO_QUEUE_NONE) {
+ // Reuse the same submit stack. We zero it out first so callbacks can
+ // queue new IO's if necessary.
+ if (q->stack_ctx) {
+ void *tmp = q->stack_ctx;
+ q->stack_ctx = NULL;
+ q->complete_cb(q->ctx, tmp);
+ }
+ q++;
+ }
+}
+
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
@@ -747,21 +768,6 @@ void conn_release_items(conn *c) {
c->item = 0;
}
- io_queue_t *q = c->io_queues;
- while (q->type != IO_QUEUE_NONE) {
- if (q->head_pending) {
- io_pending_t *tmp = q->head_pending;
- while (tmp) {
- io_pending_t *next = tmp->next;
- q->free_cb(q->ctx, tmp);
- do_cache_free(c->thread->io_cache, tmp); // lockless
- tmp = next;
- }
- q->head_pending = NULL;
- }
- q++;
- }
-
// Cull any unsent responses.
if (c->resp_head) {
mc_resp *resp = c->resp_head;
@@ -885,7 +891,8 @@ static const char *state_text(enum conn_states state) {
"conn_closing",
"conn_mwrite",
"conn_closed",
- "conn_watch" };
+ "conn_watch",
+ "conn_io_queue" };
return statenames[state];
}
@@ -1110,6 +1117,13 @@ mc_resp* resp_finish(conn *c, mc_resp *resp) {
if (resp->write_and_free) {
free(resp->write_and_free);
}
+ if (resp->io_pending) {
+ // If we had a pending IO, tell it to internally clean up then return
+ // the main object back to our thread cache.
+ resp->io_pending->q->finalize_cb(resp->io_pending);
+ do_cache_free(c->thread->io_cache, resp->io_pending);
+ resp->io_pending = NULL;
+ }
if (c->resp_head == resp) {
c->resp_head = next;
}
@@ -3159,16 +3173,21 @@ static void drive_machine(conn *c) {
*/
if (c->io_pending) {
assert(c->io_queued == false);
- // TODO: create proper state for this condition
- conn_set_state(c, conn_watch);
+ conn_set_state(c, conn_io_queue);
event_del(&c->event);
c->io_queued = true;
- io_queue_t *q = c->io_queues;
+ // TODO: write as for loop?
+ /*io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
- if (q->head_pending != NULL) {
- q->cb(q->ctx, q->head_pending);
+ if (q->stack_ctx != NULL) {
+ q->submit_cb(q->ctx, q->stack_ctx);
}
q++;
+ }*/
+ for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
+ if (q->stack_ctx != NULL) {
+ q->submit_cb(q->ctx, q->stack_ctx);
+ }
}
stop = true;
break;
@@ -3217,6 +3236,11 @@ static void drive_machine(conn *c) {
/* We handed off our connection to the logger thread. */
stop = true;
break;
+ case conn_io_queue:
+ /* Complete our queued IO's from within the worker thread. */
+ conn_io_queue_complete(c);
+ conn_set_state(c, conn_mwrite);
+ break;
case conn_max_state:
assert(false);
break;
diff --git a/memcached.h b/memcached.h
index 73253c6..bce4f41 100644
--- a/memcached.h
+++ b/memcached.h
@@ -195,6 +195,7 @@ enum conn_states {
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
conn_watch, /**< held by the logger thread as a watcher */
+ conn_io_queue, /**< wait on async. process to get response object */
conn_max_state /**< Max state value (used for assertion) */
};
@@ -626,6 +627,7 @@ typedef struct {
/**
* Response objects
*/
+typedef struct _io_pending_t io_pending_t;
#define MC_RESP_IOVCOUNT 4
typedef struct _mc_resp {
mc_resp_bundle *bundle; // ptr back to bundle
@@ -633,6 +635,7 @@ typedef struct _mc_resp {
int wbytes; // bytes to write out of wbuf: might be able to nuke this.
int tosend; // total bytes to send for this response
void *write_and_free; /** free this memory after finishing writing */
+ io_pending_t *io_pending; /* pending IO descriptor for this response */
item *item; /* item associated with this response object, with reference held */
struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
@@ -668,23 +671,25 @@ typedef struct conn conn;
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1
-typedef struct _io_pending_t {
- struct _io_pending_t *next;
- conn *c;
- mc_resp *resp; /* associated response object */
- char data[120];
-} io_pending_t;
-typedef void (*io_queue_add_cb)(void *ctx, io_pending_t *pending);
-typedef void (*io_queue_free_cb)(void *ctx, io_pending_t *pending);
+typedef void (*io_queue_stack_cb)(void *ctx, void *stack);
+typedef void (*io_queue_cb)(io_pending_t *pending);
typedef struct {
- io_pending_t *head_pending;
- void *ctx;
- io_queue_add_cb cb;
- io_queue_free_cb free_cb;
+ void *ctx; // untouched ptr for specific context
+ void *stack_ctx; // module-specific context to be batch-submitted
+ io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
+ io_queue_stack_cb complete_cb;
+ io_queue_cb finalize_cb; // called back on the worker thread.
int type;
} io_queue_t;
+struct _io_pending_t {
+ io_queue_t *q;
+ conn *c;
+ mc_resp *resp; /* associated response object */
+ char data[120];
+};
+
/**
* The structure representing a connection into memcached.
*/
@@ -807,7 +812,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
-void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_add_cb cb, io_queue_free_cb free_cb);
+void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb fin_cb);
io_queue_t *conn_io_queue_get(conn *c, int type);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl);
diff --git a/storage.c b/storage.c
index a4420be..cfa0206 100644
--- a/storage.c
+++ b/storage.c
@@ -23,7 +23,7 @@
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
typedef struct _io_pending_storage_t {
- struct _io_pending_t *next;
+ io_queue_t *q;
conn *c;
mc_resp *resp; /* original struct ends here */
item *hdr_it; /* original header item. */
@@ -271,6 +271,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
+ p->q = q; // quicker access to the queue structure.
obj_io *eio = &p->io_ctx;
// FIXME: error handling.
@@ -324,20 +325,17 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
resp_add_iov(resp, "", iovtotal);
}
+ // We can't bail out anymore, so mc_resp owns the IO from here.
+ resp->io_pending = (io_pending_t *)p;
+
eio->buf = (void *)new_it;
p->c = c;
- // We need to stack the sub-struct IO's together as well.
- if (q->head_pending) {
- io_pending_storage_t *qh = (io_pending_storage_t *)q->head_pending;
- eio->next = &qh->io_ctx;
- } else {
- eio->next = NULL;
- }
+ // We need to stack the sub-struct IO's together for submission.
+ eio->next = q->stack_ctx;
+ q->stack_ctx = eio;
- // IO queue for this connection.
- p->next = q->head_pending;
- q->head_pending = (io_pending_t *)p;
+ // No need to stack the io_pending's together as they live on mc_resp's.
assert(c->io_pending >= 0);
c->io_pending++;
// reference ourselves for the callback.
@@ -367,10 +365,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
return 0;
}
-void storage_submit_cb(void *ctx, io_pending_t *pending) {
- // re-cast to our specific struct.
- io_pending_storage_t *p = (io_pending_storage_t *)pending;
- extstore_submit(ctx, &p->io_ctx);
+void storage_submit_cb(void *ctx, void *stack) {
+ // Don't need to do anything special for extstore.
+ extstore_submit(ctx, stack);
}
static void recache_or_free(io_pending_t *pending) {
@@ -438,15 +435,22 @@ static void recache_or_free(io_pending_t *pending) {
p->io_ctx.buf = NULL;
p->io_ctx.next = NULL;
- p->next = NULL;
p->active = false;
// TODO: reuse lock and/or hv.
item_remove(p->hdr_it);
}
-// TODO: io cache or embed obj_io in space within io_pending_t
-void storage_free_cb(void *ctx, io_pending_t *pending) {
+// Called after the IO is processed but before the response is transmitted.
+// TODO: stubbed with a reminder: should be able to move most of the extstore
+// callback code into this code instead, executing on worker thread instead of
+// IO thread.
+void storage_complete_cb(void *ctx, void *stack_ctx) {
+ return;
+}
+
+// Called after responses have been transmitted. Need to free up related data.
+void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
diff --git a/storage.h b/storage.h
index 05e420d..ca59102 100644
--- a/storage.h
+++ b/storage.h
@@ -11,18 +11,26 @@ void storage_delete(void *e, item *it);
#define STORAGE_delete(...)
#endif
+// API.
void storage_stats(ADD_STAT add_stats, conn *c);
void process_extstore_stats(ADD_STAT add_stats, conn *c);
bool storage_validate_item(void *e, item *it);
int storage_get_item(conn *c, item *it, mc_resp *resp);
-void storage_submit_cb(void *ctx, io_pending_t *pending);
-void storage_free_cb(void *ctx, io_pending_t *pending);
+
+// callbacks for the IO queue subsystem.
+void storage_submit_cb(void *ctx, void *stack);
+void storage_complete_cb(void *ctx, void *stack);
+void storage_finalize_cb(io_pending_t *pending);
+
+// Thread functions.
int start_storage_write_thread(void *arg);
void storage_write_pause(void);
void storage_write_resume(void);
int start_storage_compact_thread(void *arg);
void storage_compact_pause(void);
void storage_compact_resume(void);
+
+// Init functions.
struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size);
void *storage_init_config(struct settings *s);
int storage_read_config(void *conf, char **subopt);
diff --git a/thread.c b/thread.c
index d3a5403..7de7d4e 100644
--- a/thread.c
+++ b/thread.c
@@ -538,10 +538,11 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
c->thread = me;
#ifdef EXTSTORE
if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, storage_submit_cb, storage_free_cb);
+ conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
+ storage_submit_cb, storage_complete_cb, storage_finalize_cb);
}
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS
if (settings.ssl_enabled && c->ssl != NULL) {