summaryrefslogtreecommitdiff
path: root/storage.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-08-02 16:25:43 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit3fc8775bf081f0cf84fe16058f834b951953c269 (patch)
tree46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /storage.c
parent57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff)
downloadmemcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz
core: io_queue flow second attempt
probably squash into previous commit. io->c->thead can change for orpahned IO's, so we had to directly add the original worker thread as a reference. also tried again to split callbacks onto the thread and off of the connection for similar reasons; sometimes we just need the callbacks, sometimes we need both.
Diffstat (limited to 'storage.c')
-rw-r--r--storage.c19
1 files changed, 11 insertions, 8 deletions
diff --git a/storage.c b/storage.c
index 3c81a02..10567ca 100644
--- a/storage.c
+++ b/storage.c
@@ -23,7 +23,8 @@
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
typedef struct _io_pending_storage_t {
- io_queue_t *q;
+ int io_queue_type;
+ LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; /* original struct ends here */
item *hdr_it; /* original header item. */
@@ -223,13 +224,14 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
p->miss = false;
}
- p->q->count--;
p->active = false;
//assert(c->io_wrapleft >= 0);
// All IO's have returned, lets re-attach this connection to our original
// thread.
- if (p->q->count == 0) {
+ io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
redispatch_conn(c);
}
}
@@ -272,7 +274,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) {
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
- p->q = q; // quicker access to the queue structure.
+ p->io_queue_type = IO_QUEUE_EXTSTORE;
obj_io *eio = &p->io_ctx;
// FIXME: error handling.
@@ -387,8 +389,10 @@ static void recache_or_free(io_pending_t *pending) {
do_free = false;
size_t ntotal = ITEM_ntotal(p->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
- p->q->count--;
- assert(p->q->count >= 0);
+
+ io_queue_t *q = conn_io_queue_get(c, p->io_queue_type);
+ q->count--;
+ assert(q->count >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
@@ -453,7 +457,7 @@ void storage_complete_cb(io_queue_t *q) {
}
// Called after responses have been transmitted. Need to free up related data.
-int storage_finalize_cb(io_pending_t *pending) {
+void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
@@ -463,7 +467,6 @@ int storage_finalize_cb(io_pending_t *pending) {
io->iov = NULL;
}
// don't need to free the main context, since it's embedded.
- return 0; // return code ignored.
}
/*