diff options
author | dormando <dormando@rydia.net> | 2020-08-29 23:15:23 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2020-10-30 15:50:12 -0700 |
commit | 416a7a10014299fd08bf4b35e4ffa0870cd330b0 (patch) | |
tree | 6c767d9538b00bae8a393e609a73790e015d6610 /storage.c | |
parent | df49d38930a32d342e1a3ab980c1ec432d138c2c (diff) | |
download | memcached-416a7a10014299fd08bf4b35e4ffa0870cd330b0.tar.gz |
core: move more storage functions to storage.c
extstore.h is now only used from storage.c. starting a path towards
getting the storage interface to be more generalized.
should be no functional changes.
Diffstat (limited to 'storage.c')
-rw-r--r-- | storage.c | 619 |
1 files changed, 618 insertions, 1 deletions
@@ -3,6 +3,7 @@ #ifdef EXTSTORE #include "storage.h" +#include "extstore.h" #include <stdlib.h> #include <stdio.h> #include <stddef.h> @@ -15,6 +16,342 @@ #define PAGE_BUCKET_CHUNKED 2 #define PAGE_BUCKET_LOWTTL 3 +/* + * API functions + */ + +// Only call this if item has ITEM_HDR +bool storage_validate_item(void *e, item *it) { + item_hdr *hdr = (item_hdr *)ITEM_data(it); + if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) { + return false; + } else { + return true; + } + +} + +void storage_delete(void *e, item *it) { + if (it->it_flags & ITEM_HDR) { + item_hdr *hdr = (item_hdr *)ITEM_data(it); + extstore_delete(e, hdr->page_id, hdr->page_version, + 1, ITEM_ntotal(it)); + } +} + +// Function for the extra stats called from a protocol. +// NOTE: This either needs a name change or a wrapper, perhaps? +// it's defined here to reduce exposure of extstore.h to the rest of memcached +// but feels a little off being defined here. +// At very least maybe "process_storage_stats" in line with making this more +// of a generic wrapper module. +void process_extstore_stats(ADD_STAT add_stats, conn *c) { + int i; + char key_str[STAT_KEY_LEN]; + char val_str[STAT_VAL_LEN]; + int klen = 0, vlen = 0; + struct extstore_stats st; + + assert(add_stats); + + void *storage = c->thread->storage; + extstore_get_stats(storage, &st); + st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data)); + extstore_get_page_data(storage, &st); + + for (i = 0; i < st.page_count; i++) { + APPEND_NUM_STAT(i, "version", "%llu", + (unsigned long long) st.page_data[i].version); + APPEND_NUM_STAT(i, "bytes", "%llu", + (unsigned long long) st.page_data[i].bytes_used); + APPEND_NUM_STAT(i, "bucket", "%u", + st.page_data[i].bucket); + APPEND_NUM_STAT(i, "free_bucket", "%u", + st.page_data[i].free_bucket); + } +} + +// Additional storage stats for the main stats output. +void storage_stats(ADD_STAT add_stats, conn *c) { + struct extstore_stats st; + if (c->thread->storage) { + STATS_LOCK(); + APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost); + APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues); + APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped); + STATS_UNLOCK(); + extstore_get_stats(c->thread->storage, &st); + APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs); + APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions); + APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims); + APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free); + APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used); + APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted); + APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read); + APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written); + APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used); + APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted); + APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written); + APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read); + APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used); + APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented); + APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size)); + APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue)); + } + +} + + +// FIXME: This runs in the IO thread. to get better IO performance this should +// simply mark the io wrapper with the return value and decrement wrapleft, if +// zero redispatching. Still a bit of work being done in the side thread but +// minimized at least. +// TODO: wrap -> p? +static void _storage_get_item_cb(void *e, obj_io *io, int ret) { + // FIXME: assumes success + io_pending_t *wrap = (io_pending_t *)io->data; + mc_resp *resp = wrap->resp; + conn *c = wrap->c; + assert(wrap->active == true); + item *read_it = (item *)io->buf; + bool miss = false; + + // TODO: How to do counters for hit/misses? + if (ret < 1) { + miss = true; + } else { + uint32_t crc2; + uint32_t crc = (uint32_t) read_it->exptime; + int x; + // item is chunked, crc the iov's + if (io->iov != NULL) { + // first iov is the header, which we don't use beyond crc + crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET); + // make sure it's not sent. hack :( + io->iov[0].iov_len = 0; + for (x = 1; x < io->iovcnt; x++) { + crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len); + } + } else { + crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET); + } + + if (crc != crc2) { + miss = true; + wrap->badcrc = true; + } + } + + if (miss) { + if (wrap->noreply) { + // In all GET cases, noreply means we send nothing back. + resp->skip = true; + } else { + // TODO: This should be movable to the worker thread. + // Convert the binprot response into a miss response. + // The header requires knowing a bunch of stateful crap, so rather + // than simply writing out a "new" miss response we mangle what's + // already there. + if (c->protocol == binary_prot) { + protocol_binary_response_header *header = + (protocol_binary_response_header *)resp->wbuf; + + // cut the extra nbytes off of the body_len + uint32_t body_len = ntohl(header->response.bodylen); + uint8_t hdr_len = header->response.extlen; + body_len -= resp->iov[wrap->iovec_data].iov_len + hdr_len; + resp->tosend -= resp->iov[wrap->iovec_data].iov_len + hdr_len; + header->response.extlen = 0; + header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT); + header->response.bodylen = htonl(body_len); + + // truncate the data response. + resp->iov[wrap->iovec_data].iov_len = 0; + // wipe the extlen iov... wish it was just a flat buffer. + resp->iov[wrap->iovec_data-1].iov_len = 0; + resp->chunked_data_iov = 0; + } else { + int i; + // Meta commands have EN status lines for miss, rather than + // END as a trailer as per normal ascii. + if (resp->iov[0].iov_len >= 3 + && memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) { + // TODO: These miss translators should use specific callback + // functions attached to the io wrap. This is weird :( + resp->iovcnt = 1; + resp->iov[0].iov_len = 4; + resp->iov[0].iov_base = "EN\r\n"; + resp->tosend = 4; + } else { + // Wipe the iovecs up through our data injection. + // Allows trailers to be returned (END) + for (i = 0; i <= wrap->iovec_data; i++) { + resp->tosend -= resp->iov[i].iov_len; + resp->iov[i].iov_len = 0; + resp->iov[i].iov_base = NULL; + } + } + resp->chunked_total = 0; + resp->chunked_data_iov = 0; + } + } + wrap->miss = true; + } else { + assert(read_it->slabs_clsid != 0); + // TODO: should always use it instead of ITEM_data to kill more + // chunked special casing. + if ((read_it->it_flags & ITEM_CHUNKED) == 0) { + resp->iov[wrap->iovec_data].iov_base = ITEM_data(read_it); + } + wrap->miss = false; + } + + c->io_pending--; + wrap->active = false; + //assert(c->io_wrapleft >= 0); + + // All IO's have returned, lets re-attach this connection to our original + // thread. + if (c->io_pending == 0) { + assert(c->io_queued == true); + redispatch_conn(c); + } +} + +int storage_get_item(conn *c, item *it, mc_resp *resp) { +#ifdef NEED_ALIGN + item_hdr hdr; + memcpy(&hdr, ITEM_data(it), sizeof(hdr)); +#else + item_hdr *hdr = (item_hdr *)ITEM_data(it); +#endif + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE); + size_t ntotal = ITEM_ntotal(it); + unsigned int clsid = slabs_clsid(ntotal); + item *new_it; + bool chunked = false; + if (ntotal > settings.slab_chunk_size_max) { + // Pull a chunked item header. + uint32_t flags; + FLAGS_CONV(it, flags); + new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes); + assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED)); + chunked = true; + } else { + new_it = do_item_alloc_pull(ntotal, clsid); + } + if (new_it == NULL) + return -1; + assert(!c->io_queued); // FIXME: debugging. + // so we can free the chunk on a miss + new_it->slabs_clsid = clsid; + + io_pending_t *p = do_cache_alloc(c->thread->io_cache); + p->active = true; + p->miss = false; + p->badcrc = false; + p->noreply = c->noreply; + // io_pending owns the reference for this object now. + p->hdr_it = it; + p->resp = resp; + obj_io *eio = calloc(1, sizeof(obj_io)); + p->io_ctx = eio; + + // FIXME: error handling. + if (chunked) { + unsigned int ciovcnt = 0; + size_t remain = new_it->nbytes; + item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it); + // TODO: This might make sense as a _global_ cache vs a per-thread. + // but we still can't load objects requiring > IOV_MAX iovs. + // In the meantime, these objects are rare/slow enough that + // malloc/freeing a statically sized object won't cause us much pain. + eio->iov = malloc(sizeof(struct iovec) * IOV_MAX); + if (eio->iov == NULL) { + item_remove(new_it); + free(eio); + do_cache_free(c->thread->io_cache, p); + return -1; + } + + // fill the header so we can get the full data + crc back. + eio->iov[0].iov_base = new_it; + eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes; + ciovcnt++; + + while (remain > 0) { + chunk = do_item_alloc_chunk(chunk, remain); + // FIXME: _pure evil_, silently erroring if item is too large. + if (chunk == NULL || ciovcnt > IOV_MAX-1) { + item_remove(new_it); + free(eio->iov); + // TODO: wrapper function for freeing up an io wrap? + eio->iov = NULL; + free(eio); + do_cache_free(c->thread->io_cache, p); + return -1; + } + eio->iov[ciovcnt].iov_base = chunk->data; + eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size; + chunk->used = (remain < chunk->size) ? remain : chunk->size; + remain -= chunk->size; + ciovcnt++; + } + + eio->iovcnt = ciovcnt; + } + + // Chunked or non chunked we reserve a response iov here. + p->iovec_data = resp->iovcnt; + int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes; + if (chunked) { + resp_add_chunked_iov(resp, new_it, iovtotal); + } else { + resp_add_iov(resp, "", iovtotal); + } + + eio->buf = (void *)new_it; + p->c = c; + + // We need to stack the sub-struct IO's together as well. + if (q->head_pending) { + eio->next = q->head_pending->io_ctx; + } else { + eio->next = NULL; + } + + // IO queue for this connection. + p->next = q->head_pending; + q->head_pending = p; + assert(c->io_pending >= 0); + c->io_pending++; + // reference ourselves for the callback. + eio->data = (void *)p; + + // Now, fill in io->io based on what was in our header. +#ifdef NEED_ALIGN + eio->page_version = hdr.page_version; + eio->page_id = hdr.page_id; + eio->offset = hdr.offset; +#else + eio->page_version = hdr->page_version; + eio->page_id = hdr->page_id; + eio->offset = hdr->offset; +#endif + eio->len = ntotal; + eio->mode = OBJ_IO_READ; + eio->cb = _storage_get_item_cb; + + // FIXME: This stat needs to move to reflect # of flash hits vs misses + // for now it's a good gauge on how often we request out to flash at + // least. + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_extstore++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + return 0; +} + void storage_submit_cb(void *ctx, io_pending_t *pending) { extstore_submit(ctx, pending->io_ctx); } @@ -99,7 +436,9 @@ void storage_free_cb(void *ctx, io_pending_t *pending) { free(io); } -/*** WRITE FLUSH THREAD ***/ +/* + * WRITE FLUSH THREAD + */ static int storage_write(void *storage, const int clsid, const int item_age) { int did_moves = 0; @@ -735,4 +1074,282 @@ error: return NULL; } +struct storage_settings { + struct extstore_conf_file *storage_file; + struct extstore_conf ext_cf; +}; + +void *storage_init_config(struct settings *s) { + struct storage_settings *cf = calloc(1, sizeof(struct storage_settings)); + + s->ext_item_size = 512; + s->ext_item_age = UINT_MAX; + s->ext_low_ttl = 0; + s->ext_recache_rate = 2000; + s->ext_max_frag = 0.8; + s->ext_drop_unread = false; + s->ext_wbuf_size = 1024 * 1024 * 4; + s->ext_compact_under = 0; + s->ext_drop_under = 0; + s->slab_automove_freeratio = 0.01; + s->ext_page_size = 1024 * 1024 * 64; + s->ext_io_threadcount = 1; + cf->ext_cf.page_size = settings.ext_page_size; + cf->ext_cf.wbuf_size = settings.ext_wbuf_size; + cf->ext_cf.io_threadcount = settings.ext_io_threadcount; + cf->ext_cf.io_depth = 1; + cf->ext_cf.page_buckets = 4; + cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets; + + return cf; +} + +// TODO: pass settings struct? +int storage_read_config(void *conf, char **subopt) { + struct storage_settings *cf = conf; + struct extstore_conf *ext_cf = &cf->ext_cf; + char *subopts_value; + + enum { + EXT_PAGE_SIZE, + EXT_WBUF_SIZE, + EXT_THREADS, + EXT_IO_DEPTH, + EXT_PATH, + EXT_ITEM_SIZE, + EXT_ITEM_AGE, + EXT_LOW_TTL, + EXT_RECACHE_RATE, + EXT_COMPACT_UNDER, + EXT_DROP_UNDER, + EXT_MAX_FRAG, + EXT_DROP_UNREAD, + SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back? + }; + + char *const subopts_tokens[] = { + [EXT_PAGE_SIZE] = "ext_page_size", + [EXT_WBUF_SIZE] = "ext_wbuf_size", + [EXT_THREADS] = "ext_threads", + [EXT_IO_DEPTH] = "ext_io_depth", + [EXT_PATH] = "ext_path", + [EXT_ITEM_SIZE] = "ext_item_size", + [EXT_ITEM_AGE] = "ext_item_age", + [EXT_LOW_TTL] = "ext_low_ttl", + [EXT_RECACHE_RATE] = "ext_recache_rate", + [EXT_COMPACT_UNDER] = "ext_compact_under", + [EXT_DROP_UNDER] = "ext_drop_under", + [EXT_MAX_FRAG] = "ext_max_frag", + [EXT_DROP_UNREAD] = "ext_drop_unread", + [SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio", + NULL + }; + + switch (getsubopt(subopt, subopts_tokens, &subopts_value)) { + case EXT_PAGE_SIZE: + if (cf->storage_file) { + fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n"); + return 1; + } + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_page_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf->page_size)) { + fprintf(stderr, "could not parse argument to ext_page_size\n"); + return 1; + } + ext_cf->page_size *= 1024 * 1024; /* megabytes */ + break; + case EXT_WBUF_SIZE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_wbuf_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) { + fprintf(stderr, "could not parse argument to ext_wbuf_size\n"); + return 1; + } + ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */ + settings.ext_wbuf_size = ext_cf->wbuf_size; + break; + case EXT_THREADS: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_threads argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) { + fprintf(stderr, "could not parse argument to ext_threads\n"); + return 1; + } + break; + case EXT_IO_DEPTH: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_io_depth argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) { + fprintf(stderr, "could not parse argument to ext_io_depth\n"); + return 1; + } + break; + case EXT_ITEM_SIZE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_item_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_item_size)) { + fprintf(stderr, "could not parse argument to ext_item_size\n"); + return 1; + } + break; + case EXT_ITEM_AGE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_item_age argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_item_age)) { + fprintf(stderr, "could not parse argument to ext_item_age\n"); + return 1; + } + break; + case EXT_LOW_TTL: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_low_ttl argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) { + fprintf(stderr, "could not parse argument to ext_low_ttl\n"); + return 1; + } + break; + case EXT_RECACHE_RATE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_recache_rate argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) { + fprintf(stderr, "could not parse argument to ext_recache_rate\n"); + return 1; + } + break; + case EXT_COMPACT_UNDER: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_compact_under argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) { + fprintf(stderr, "could not parse argument to ext_compact_under\n"); + return 1; + } + break; + case EXT_DROP_UNDER: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_drop_under argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) { + fprintf(stderr, "could not parse argument to ext_drop_under\n"); + return 1; + } + break; + case EXT_MAX_FRAG: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_max_frag argument\n"); + return 1; + } + if (!safe_strtod(subopts_value, &settings.ext_max_frag)) { + fprintf(stderr, "could not parse argument to ext_max_frag\n"); + return 1; + } + break; + case SLAB_AUTOMOVE_FREERATIO: + if (subopts_value == NULL) { + fprintf(stderr, "Missing slab_automove_freeratio argument\n"); + return 1; + } + if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) { + fprintf(stderr, "could not parse argument to slab_automove_freeratio\n"); + return 1; + } + break; + case EXT_DROP_UNREAD: + settings.ext_drop_unread = true; + break; + case EXT_PATH: + if (subopts_value) { + struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size); + if (tmp == NULL) { + fprintf(stderr, "failed to parse ext_path argument\n"); + return 1; + } + if (cf->storage_file != NULL) { + tmp->next = cf->storage_file; + } + cf->storage_file = tmp; + } else { + fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n"); + return 1; + } + break; + default: + fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value); + return 1; + } + + return 0; +} + +int storage_check_config(void *conf) { + struct storage_settings *cf = conf; + struct extstore_conf *ext_cf = &cf->ext_cf; + + if (cf->storage_file) { + if (settings.item_size_max > ext_cf->wbuf_size) { + fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n", + settings.item_size_max, ext_cf->wbuf_size); + return 1; + } + + if (settings.udpport) { + fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n"); + return 1; + } + + return 0; + } + + return 2; +} + +void *storage_init(void *conf) { + struct storage_settings *cf = conf; + struct extstore_conf *ext_cf = &cf->ext_cf; + + enum extstore_res eres; + void *storage = NULL; + if (settings.ext_compact_under == 0) { + // If changing the default fraction (4), change the help text as well. + settings.ext_compact_under = cf->storage_file->page_count / 4; + /* Only rescues non-COLD items if below this threshold */ + settings.ext_drop_under = cf->storage_file->page_count / 4; + } + crc32c_init(); + /* Init free chunks to zero. */ + for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) { + settings.ext_free_memchunks[x] = 0; + } + storage = extstore_init(cf->storage_file, ext_cf, &eres); + if (storage == NULL) { + fprintf(stderr, "Failed to initialize external storage: %s\n", + extstore_err(eres)); + if (eres == EXTSTORE_INIT_OPEN_FAIL) { + perror("extstore open"); + } + return NULL; + } + + return storage; +} + #endif |