summaryrefslogtreecommitdiff
path: root/storage.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2020-08-29 17:12:56 -0700
committerdormando <dormando@rydia.net>2020-10-30 15:50:12 -0700
commitdf49d38930a32d342e1a3ab980c1ec432d138c2c (patch)
tree496e44dc652e37c0f012f93e4730318a22dabfa6 /storage.c
parent4c0d45a95c9ba8fb8ea0630b96f4e006ef1698a1 (diff)
downloadmemcached-df49d38930a32d342e1a3ab980c1ec432d138c2c.tar.gz
core: generalize extstore's defered IO queue
want to reuse the deferred IO system for extstore for something else. Should allow evolving into a more plugin-centric system. step one of three(?) - replace in place and tests pass with extstore enabled. step two should move more extstore code into storage.c step three should build the IO queue code without ifdef gating.
Diffstat (limited to 'storage.c')
-rw-r--r--storage.c84
1 files changed, 84 insertions, 0 deletions
diff --git a/storage.c b/storage.c
index 6c34d12..dcdc12d 100644
--- a/storage.c
+++ b/storage.c
@@ -15,6 +15,90 @@
#define PAGE_BUCKET_CHUNKED 2
#define PAGE_BUCKET_LOWTTL 3
+void storage_submit_cb(void *ctx, io_pending_t *pending) {
+ extstore_submit(ctx, pending->io_ctx);
+}
+
+static void recache_or_free(io_pending_t *p) {
+ conn *c = p->c;
+ obj_io *io = p->io_ctx;
+ item *it = (item *)io->buf;
+ assert(c != NULL);
+ assert(io != NULL);
+ bool do_free = true;
+ if (p->active) {
+ // If request never dispatched, free the read buffer but leave the
+ // item header alone.
+ do_free = false;
+ size_t ntotal = ITEM_ntotal(p->hdr_it);
+ slabs_free(it, ntotal, slabs_clsid(ntotal));
+ c->io_pending--;
+ assert(c->io_pending >= 0);
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_aborted_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+ } else if (p->miss) {
+ // If request was ultimately a miss, unlink the header.
+ do_free = false;
+ size_t ntotal = ITEM_ntotal(p->hdr_it);
+ item_unlink(p->hdr_it);
+ slabs_free(it, ntotal, slabs_clsid(ntotal));
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.miss_from_extstore++;
+ if (p->badcrc)
+ c->thread->stats.badcrc_from_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+ } else if (settings.ext_recache_rate) {
+ // hashvalue is cuddled during store
+ uint32_t hv = (uint32_t)it->time;
+ // opt to throw away rather than wait on a lock.
+ void *hold_lock = item_trylock(hv);
+ if (hold_lock != NULL) {
+ item *h_it = p->hdr_it;
+ uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
+ // Item must be recently hit at least twice to recache.
+ if (((h_it->it_flags & flags) == flags) &&
+ h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
+ c->recache_counter++ % settings.ext_recache_rate == 0) {
+ do_free = false;
+ // In case it's been updated.
+ it->exptime = h_it->exptime;
+ it->it_flags &= ~ITEM_LINKED;
+ it->refcount = 0;
+ it->h_next = NULL; // might not be necessary.
+ STORAGE_delete(c->thread->storage, h_it);
+ item_replace(h_it, it, hv);
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.recache_from_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+ }
+ }
+ if (hold_lock)
+ item_trylock_unlock(hold_lock);
+ }
+ if (do_free)
+ slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
+
+ //wrap->io.buf = NULL;
+ //wrap->io.next = NULL;
+ p->next = NULL;
+ p->active = false;
+
+ // TODO: reuse lock and/or hv.
+ item_remove(p->hdr_it);
+}
+
+// TODO: io cache or embed obj_io in space within io_pending_t
+void storage_free_cb(void *ctx, io_pending_t *pending) {
+ recache_or_free(pending);
+ obj_io *io = pending->io_ctx;
+ // malloc'ed iovec list used for chunked extstore fetches.
+ if (io->iov) {
+ free(io->iov);
+ }
+ free(io);
+}
+
/*** WRITE FLUSH THREAD ***/
static int storage_write(void *storage, const int clsid, const int item_age) {