summaryrefslogtreecommitdiff
path: root/storage.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-11 13:46:16 -0800
committerdormando <dormando@rydia.net>2023-01-11 21:37:55 -0800
commite660658748b04f865852e77b0aad1fd8301cd5ec (patch)
tree21d4017a4858c44c240e10b4c9a4b136812d2e0e /storage.c
parentfccf7b9efdfb0deb11f111496ce53c5892647dab (diff)
downloadmemcached-e660658748b04f865852e77b0aad1fd8301cd5ec.tar.gz
core: simplify background IO API
- removes unused "completed" IO callback handler - moves primary post-IO callback handlers from the queue definition to the actual IO objects. - allows IO object callbacks to be handled generically instead of based on the queue they were submitted from.
Diffstat (limited to 'storage.c')
-rw-r--r--storage.c49
1 files changed, 26 insertions, 23 deletions
diff --git a/storage.c b/storage.c
index 257ff2f..d7ca9a1 100644
--- a/storage.c
+++ b/storage.c
@@ -19,6 +19,8 @@
/*
* API functions
*/
+static void storage_finalize_cb(io_pending_t *pending);
+static void storage_return_cb(io_pending_t *pending);
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
@@ -26,7 +28,10 @@ typedef struct _io_pending_storage_t {
int io_queue_type;
LIBEVENT_THREAD *thread;
conn *c;
- mc_resp *resp; /* original struct ends here */
+ mc_resp *resp;
+ io_queue_cb return_cb; // called on worker thread.
+ io_queue_cb finalize_cb; // called back on the worker thread.
+ /* original struct ends here */
item *hdr_it; /* original header item. */
obj_io io_ctx; /* embedded extstore IO header */
unsigned int iovec_data; /* specific index of data iovec */
@@ -119,12 +124,10 @@ void storage_stats(ADD_STAT add_stats, conn *c) {
}
-
-// FIXME: This runs in the IO thread. to get better IO performance this should
-// simply mark the io wrapper with the return value and decrement wrapleft, if
-// zero redispatching. Still a bit of work being done in the side thread but
-// minimized at least.
-// TODO: wrap -> p?
+// This callback runs in the IO thread.
+// TODO: Some or all of this should move to the
+// io_pending's callback back in the worker thread.
+// It might make sense to keep the crc32c check here though.
static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
// FIXME: assumes success
io_pending_storage_t *p = (io_pending_storage_t *)io->data;
@@ -227,13 +230,7 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
p->active = false;
//assert(c->io_wrapleft >= 0);
- // All IO's have returned, lets re-attach this connection to our original
- // thread.
- io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
- q->count--;
- if (q->count == 0) {
- redispatch_conn(c);
- }
+ return_io_pending((io_pending_t *)p);
}
int storage_get_item(conn *c, item *it, mc_resp *resp) {
@@ -271,6 +268,9 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
p->miss = false;
p->badcrc = false;
p->noreply = c->noreply;
+ p->thread = c->thread;
+ p->return_cb = storage_return_cb;
+ p->finalize_cb = storage_finalize_cb;
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
@@ -371,8 +371,12 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
void storage_submit_cb(io_queue_t *q) {
// Don't need to do anything special for extstore.
extstore_submit(q->ctx, q->stack_ctx);
+
+ // need to reset the stack for next use.
+ q->stack_ctx = NULL;
}
+// Runs locally in worker thread.
static void recache_or_free(io_pending_t *pending) {
// re-cast to our specific struct.
io_pending_storage_t *p = (io_pending_storage_t *)pending;
@@ -446,18 +450,17 @@ static void recache_or_free(io_pending_t *pending) {
item_remove(p->hdr_it);
}
-// Called after the IO is processed but before the response is transmitted.
-// TODO: stubbed with a reminder: should be able to move most of the extstore
-// callback code into this code instead, executing on worker thread instead of
-// IO thread.
-void storage_complete_cb(io_queue_t *q) {
- // need to reset the stack for next use.
- q->stack_ctx = NULL;
- return;
+// Called after an IO has been returned to the worker thread.
+static void storage_return_cb(io_pending_t *pending) {
+ io_queue_t *q = conn_io_queue_get(pending->c, pending->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
+ conn_worker_readd(pending->c);
+ }
}
// Called after responses have been transmitted. Need to free up related data.
-void storage_finalize_cb(io_pending_t *pending) {
+static void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;