diff options
author | dormando <dormando@rydia.net> | 2021-08-02 16:25:43 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | 3fc8775bf081f0cf84fe16058f834b951953c269 (patch) | |
tree | 46e0eb2a0f2e9163bd75e49b0d7418e413a4ce17 /storage.c | |
parent | 57493bfca4d16f19aa6d591d29f19f3d2ad160f8 (diff) | |
download | memcached-3fc8775bf081f0cf84fe16058f834b951953c269.tar.gz |
core: io_queue flow second attempt
probably squash into previous commit.
io->c->thead can change for orpahned IO's, so we had to directly add the
original worker thread as a reference.
also tried again to split callbacks onto the thread and off of the
connection for similar reasons; sometimes we just need the callbacks,
sometimes we need both.
Diffstat (limited to 'storage.c')
-rw-r--r-- | storage.c | 19 |
1 files changed, 11 insertions, 8 deletions
@@ -23,7 +23,8 @@ // re-cast an io_pending_t into this more descriptive structure. // the first few items _must_ match the original struct. typedef struct _io_pending_storage_t { - io_queue_t *q; + int io_queue_type; + LIBEVENT_THREAD *thread; conn *c; mc_resp *resp; /* original struct ends here */ item *hdr_it; /* original header item. */ @@ -223,13 +224,14 @@ static void _storage_get_item_cb(void *e, obj_io *io, int ret) { p->miss = false; } - p->q->count--; p->active = false; //assert(c->io_wrapleft >= 0); // All IO's have returned, lets re-attach this connection to our original // thread. - if (p->q->count == 0) { + io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); + q->count--; + if (q->count == 0) { redispatch_conn(c); } } @@ -272,7 +274,7 @@ int storage_get_item(conn *c, item *it, mc_resp *resp) { // io_pending owns the reference for this object now. p->hdr_it = it; p->resp = resp; - p->q = q; // quicker access to the queue structure. + p->io_queue_type = IO_QUEUE_EXTSTORE; obj_io *eio = &p->io_ctx; // FIXME: error handling. @@ -387,8 +389,10 @@ static void recache_or_free(io_pending_t *pending) { do_free = false; size_t ntotal = ITEM_ntotal(p->hdr_it); slabs_free(it, ntotal, slabs_clsid(ntotal)); - p->q->count--; - assert(p->q->count >= 0); + + io_queue_t *q = conn_io_queue_get(c, p->io_queue_type); + q->count--; + assert(q->count >= 0); pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.get_aborted_extstore++; pthread_mutex_unlock(&c->thread->stats.mutex); @@ -453,7 +457,7 @@ void storage_complete_cb(io_queue_t *q) { } // Called after responses have been transmitted. Need to free up related data. -int storage_finalize_cb(io_pending_t *pending) { +void storage_finalize_cb(io_pending_t *pending) { recache_or_free(pending); io_pending_storage_t *p = (io_pending_storage_t *)pending; obj_io *io = &p->io_ctx; @@ -463,7 +467,6 @@ int storage_finalize_cb(io_pending_t *pending) { io->iov = NULL; } // don't need to free the main context, since it's embedded. - return 0; // return code ignored. } /* |