From 889938405df7e46f31ab0bbafee1fd210372f249 Mon Sep 17 00:00:00 2001 From: dormando Date: Sat, 14 Oct 2017 01:53:24 -0700 Subject: extstore: support chunked items. item size max must be <= wbuf_size. reads into iovecs, writes out of same iovecs. --- storage.c | 143 +++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 86 insertions(+), 57 deletions(-) (limited to 'storage.c') diff --git a/storage.c b/storage.c index 3e92f2a..8e8b33f 100644 --- a/storage.c +++ b/storage.c @@ -5,6 +5,10 @@ #include #include +#define PAGE_BUCKET_DEFAULT 0 +#define PAGE_BUCKET_COMPACT 1 +#define PAGE_BUCKET_CHUNKED 2 + int lru_maintainer_store(void *storage, const int clsid) { //int i; int did_moves = 0; @@ -24,69 +28,92 @@ int lru_maintainer_store(void *storage, const int clsid) { it_info.it = NULL; lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info); - /* Item isn't locked, but we have a reference to it. */ - if (it_info.it != NULL) { - obj_io io; - item *it = it_info.it; - /* First, storage for the header object */ - size_t orig_ntotal = ITEM_ntotal(it); - uint32_t flags; - // TODO: Doesn't presently work with chunked items. */ - if ((it->it_flags & (ITEM_CHUNKED|ITEM_HDR)) == 0 && - (item_age == 0 || current_time - it->time > item_age)) { - if (settings.inline_ascii_response) { - flags = (uint32_t) strtoul(ITEM_suffix(it)+1, (char **) NULL, 10); - } else if (it->nsuffix > 0) { - flags = *((uint32_t *)ITEM_suffix(it)); - } else { - flags = 0; - } - item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr)); - /* Run the storage write understanding the header is dirty. - * We will fill it from the header on read either way. - */ - if (hdr_it != NULL) { + /* Item is locked, and we have a reference to it. */ + if (it_info.it == NULL) { + return did_moves; + } + + obj_io io; + item *it = it_info.it; + /* First, storage for the header object */ + size_t orig_ntotal = ITEM_ntotal(it); + uint32_t flags; + if ((it->it_flags & ITEM_HDR) == 0 && + (item_age == 0 || current_time - it->time > item_age)) { + // FIXME: flag conversion again + if (settings.inline_ascii_response) { + flags = (uint32_t) strtoul(ITEM_suffix(it)+1, (char **) NULL, 10); + } else if (it->nsuffix > 0) { + flags = *((uint32_t *)ITEM_suffix(it)); + } else { + flags = 0; + } + item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr)); + /* Run the storage write understanding the start of the item is dirty. + * We will fill it (time/exptime/etc) from the header item on read. + */ + if (hdr_it != NULL) { + int bucket = (it->it_flags & ITEM_CHUNKED) ? + PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT; + hdr_it->it_flags |= ITEM_HDR; + io.len = orig_ntotal; + io.mode = OBJ_IO_WRITE; + // NOTE: when the item is read back in, the slab mover + // may see it. Important to have refcount>=2 or ~ITEM_LINKED + assert(it->refcount >= 2); + if (extstore_write_request(storage, bucket, &io) == 0) { // cuddle the hash value into the time field so we don't have // to recalculate it. - uint32_t crc = crc32c(0, (char*)it+32, orig_ntotal-32); - hdr_it->it_flags |= ITEM_HDR; - io.len = orig_ntotal; - io.mode = OBJ_IO_WRITE; - // NOTE: when the item is read back in, the slab mover - // may see it. Important to have refcount>=2 or ~ITEM_LINKED - assert(it->refcount >= 2); - if (extstore_write_request(storage, 0, &io) == 0) { - item *buf_it = (item *) io.buf; - buf_it->time = it_info.hv; - buf_it->exptime = crc; - // copy from past the headers + time headers. - memcpy((char *)io.buf+32, (char *)it+32, io.len-32); - extstore_write(storage, &io); - item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it); - hdr->page_version = io.page_version; - hdr->page_id = io.page_id; - hdr->offset = io.offset; - // overload nbytes for the header it - hdr_it->nbytes = it->nbytes; - /* success! Now we need to fill relevant data into the new - * header and replace. Most of this requires the item lock - */ - /* CAS gets set while linking. Copy post-replace */ - item_replace(it, hdr_it, it_info.hv); - ITEM_set_cas(hdr_it, ITEM_get_cas(it)); - //fprintf(stderr, "EXTSTORE: swapped an item: %s %lu %lu\n", ITEM_key(it), orig_ntotal, ntotal); - do_item_remove(hdr_it); - did_moves = 1; + item *buf_it = (item *) io.buf; + buf_it->time = it_info.hv; + // copy from past the headers + time headers. + // TODO: should be in items.c + if (it->it_flags & ITEM_CHUNKED) { + // Need to loop through the item and copy + item_chunk *sch = (item_chunk *) ITEM_data(it); + int remain = orig_ntotal; + int copied = 0; + // copy original header + int hdrtotal = ITEM_ntotal(it) - it->nbytes; + memcpy((char *)io.buf+32, (char *)it+32, hdrtotal - 32); + copied = hdrtotal; + // copy data in like it were one large object. + while (sch && remain) { + assert(remain >= sch->used); + memcpy((char *)io.buf+copied, sch->data, sch->used); + // FIXME: use one variable? + remain -= sch->used; + copied += sch->used; + sch = sch->next; + } } else { - //fprintf(stderr, "EXTSTORE: failed to write\n"); - /* Failed to write for some reason, can't continue. */ - slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it)); + memcpy((char *)io.buf+32, (char *)it+32, io.len-32); } + // crc what we copied so we can do it sequentially. + buf_it->exptime = crc32c(0, (char*)io.buf+32, orig_ntotal-32); + extstore_write(storage, &io); + item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it); + hdr->page_version = io.page_version; + hdr->page_id = io.page_id; + hdr->offset = io.offset; + // overload nbytes for the header it + hdr_it->nbytes = it->nbytes; + /* success! Now we need to fill relevant data into the new + * header and replace. Most of this requires the item lock + */ + /* CAS gets set while linking. Copy post-replace */ + item_replace(it, hdr_it, it_info.hv); + ITEM_set_cas(hdr_it, ITEM_get_cas(it)); + do_item_remove(hdr_it); + did_moves = 1; + } else { + /* Failed to write for some reason, can't continue. */ + slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it)); } } - do_item_remove(it); - item_unlock(it_info.hv); } + do_item_remove(it); + item_unlock(it_info.hv); return did_moves; } @@ -211,7 +238,7 @@ static void storage_compact_readback(void *storage, logger *l, io.len = ntotal; io.mode = OBJ_IO_WRITE; for (tries = 10; tries > 0; tries--) { - if (extstore_write_request(storage, 1, &io) == 0) { + if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, &io) == 0) { memcpy(io.buf, it, io.len); extstore_write(storage, &io); do_update = true; @@ -231,6 +258,8 @@ static void storage_compact_readback(void *storage, logger *l, lost++; // TODO: re-alloc and replace header. } + } else { + lost++; } } -- cgit v1.2.1