diff options
author | dormando <dormando@rydia.net> | 2017-09-26 14:43:17 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2017-11-28 14:18:05 -0800 |
commit | f593a59bce69f917514ef6213cf565c71bddcf8c (patch) | |
tree | 4a5dc07433e97b089f46a913b5367aa5d52c059a | |
parent | e6239a905d072e837baa8aa425ca0ccee2fc3e01 (diff) | |
download | memcached-f593a59bce69f917514ef6213cf565c71bddcf8c.tar.gz |
external storage base commit
been squashing reorganizing, and pulling code off to go upstream ahead
of merging the whole branch.
-rw-r--r-- | Makefile.am | 5 | ||||
-rw-r--r-- | crawler.c | 21 | ||||
-rw-r--r-- | crc32c.c | 343 | ||||
-rw-r--r-- | crc32c.h | 9 | ||||
-rw-r--r-- | doc/storage.txt | 141 | ||||
-rw-r--r-- | extstore.c | 800 | ||||
-rw-r--r-- | extstore.h | 94 | ||||
-rw-r--r-- | items.c | 21 | ||||
-rw-r--r-- | items.h | 13 | ||||
-rw-r--r-- | logger.c | 22 | ||||
-rw-r--r-- | logger.h | 8 | ||||
-rw-r--r-- | memcached.c | 493 | ||||
-rw-r--r-- | memcached.h | 65 | ||||
-rw-r--r-- | sizes.c | 3 | ||||
-rw-r--r-- | slabs.c | 18 | ||||
-rw-r--r-- | storage.c | 379 | ||||
-rw-r--r-- | storage.h | 7 | ||||
-rw-r--r-- | thread.c | 17 |
18 files changed, 2441 insertions, 18 deletions
diff --git a/Makefile.am b/Makefile.am index 95c334c..d1e4d60 100644 --- a/Makefile.am +++ b/Makefile.am @@ -23,7 +23,10 @@ memcached_SOURCES = memcached.c memcached.h \ logger.c logger.h \ crawler.c crawler.h \ itoa_ljust.c itoa_ljust.h \ - slab_automove.c slab_automove.h + slab_automove.c slab_automove.h \ + extstore.c exstore.h \ + storage.c storage.h \ + crc32c.c crc32c.h if BUILD_CACHE memcached_SOURCES += cache.c @@ -96,6 +96,10 @@ static volatile int do_run_lru_crawler_thread = 0; static int lru_crawler_initialized = 0; static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER; +#ifdef EXTSTORE +/* TODO: pass this around */ +static void *storage; +#endif /* Will crawl all slab classes a minimum of once per hour */ #define MAX_MAINTCRAWL_WAIT 60 * 60 @@ -179,8 +183,20 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv pthread_mutex_lock(&d->lock); crawlerstats_t *s = &d->crawlerstats[i]; int is_flushed = item_is_flushed(search); +#ifdef EXTSTORE + bool is_valid = true; + if (search->it_flags & ITEM_HDR) { + item_hdr *hdr = (item_hdr *)ITEM_data(search); + if (extstore_check(storage, hdr->page_id, hdr->page_version) != 0) + is_valid = false; + } +#endif if ((search->exptime != 0 && search->exptime < current_time) - || is_flushed) { + || is_flushed +#ifdef EXTSTORE + || !is_valid +#endif + ) { crawlers[i].reclaimed++; s->reclaimed++; @@ -656,6 +672,9 @@ void lru_crawler_resume(void) { int init_lru_crawler(void *arg) { if (lru_crawler_initialized == 0) { +#ifdef EXTSTORE + storage = arg; +#endif if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) { fprintf(stderr, "Can't initialize lru crawler condition\n"); return -1; diff --git a/crc32c.c b/crc32c.c new file mode 100644 index 0000000..13deee2 --- /dev/null +++ b/crc32c.c @@ -0,0 +1,343 @@ +/* crc32c.c -- compute CRC-32C using the Intel crc32 instruction + * Copyright (C) 2013 Mark Adler + * Version 1.1 1 Aug 2013 Mark Adler + */ + +/* + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution. + + Mark Adler + madler@alumni.caltech.edu + */ + +/* Use hardware CRC instruction on Intel SSE 4.2 processors. This computes a + CRC-32C, *not* the CRC-32 used by Ethernet and zip, gzip, etc. A software + version is provided as a fall-back, as well as for speed comparisons. */ + +/* Version history: + 1.0 10 Feb 2013 First version + 1.1 1 Aug 2013 Correct comments on why three crc instructions in parallel + */ + +/* This version has been modified by dormando for inclusion in memcached */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> +#include <pthread.h> +#include "crc32c.h" + +/* CRC-32C (iSCSI) polynomial in reversed bit order. */ +#define POLY 0x82f63b78 + +/* Table for a quadword-at-a-time software crc. */ +static pthread_once_t crc32c_once_sw = PTHREAD_ONCE_INIT; +static uint32_t crc32c_table[8][256]; + +/* Construct table for software CRC-32C calculation. */ +static void crc32c_init_sw(void) +{ + uint32_t n, crc, k; + + for (n = 0; n < 256; n++) { + crc = n; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1; + crc32c_table[0][n] = crc; + } + for (n = 0; n < 256; n++) { + crc = crc32c_table[0][n]; + for (k = 1; k < 8; k++) { + crc = crc32c_table[0][crc & 0xff] ^ (crc >> 8); + crc32c_table[k][n] = crc; + } + } +} + +/* Table-driven software version as a fall-back. This is about 15 times slower + than using the hardware instructions. This assumes little-endian integers, + as is the case on Intel processors that the assembler code here is for. */ +static uint32_t crc32c_sw(uint32_t crci, const void *buf, size_t len) +{ + const unsigned char *next = buf; + uint64_t crc; + + pthread_once(&crc32c_once_sw, crc32c_init_sw); + crc = crci ^ 0xffffffff; + while (len && ((uintptr_t)next & 7) != 0) { + crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8); + len--; + } + while (len >= 8) { + crc ^= *(uint64_t *)next; + crc = crc32c_table[7][crc & 0xff] ^ + crc32c_table[6][(crc >> 8) & 0xff] ^ + crc32c_table[5][(crc >> 16) & 0xff] ^ + crc32c_table[4][(crc >> 24) & 0xff] ^ + crc32c_table[3][(crc >> 32) & 0xff] ^ + crc32c_table[2][(crc >> 40) & 0xff] ^ + crc32c_table[1][(crc >> 48) & 0xff] ^ + crc32c_table[0][crc >> 56]; + next += 8; + len -= 8; + } + while (len) { + crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8); + len--; + } + return (uint32_t)crc ^ 0xffffffff; +} + +/* Multiply a matrix times a vector over the Galois field of two elements, + GF(2). Each element is a bit in an unsigned integer. mat must have at + least as many entries as the power of two for most significant one bit in + vec. */ +static inline uint32_t gf2_matrix_times(uint32_t *mat, uint32_t vec) +{ + uint32_t sum; + + sum = 0; + while (vec) { + if (vec & 1) + sum ^= *mat; + vec >>= 1; + mat++; + } + return sum; +} + +/* Multiply a matrix by itself over GF(2). Both mat and square must have 32 + rows. */ +static inline void gf2_matrix_square(uint32_t *square, uint32_t *mat) +{ + int n; + + for (n = 0; n < 32; n++) + square[n] = gf2_matrix_times(mat, mat[n]); +} + +/* Construct an operator to apply len zeros to a crc. len must be a power of + two. If len is not a power of two, then the result is the same as for the + largest power of two less than len. The result for len == 0 is the same as + for len == 1. A version of this routine could be easily written for any + len, but that is not needed for this application. */ +static void crc32c_zeros_op(uint32_t *even, size_t len) +{ + int n; + uint32_t row; + uint32_t odd[32]; /* odd-power-of-two zeros operator */ + + /* put operator for one zero bit in odd */ + odd[0] = POLY; /* CRC-32C polynomial */ + row = 1; + for (n = 1; n < 32; n++) { + odd[n] = row; + row <<= 1; + } + + /* put operator for two zero bits in even */ + gf2_matrix_square(even, odd); + + /* put operator for four zero bits in odd */ + gf2_matrix_square(odd, even); + + /* first square will put the operator for one zero byte (eight zero bits), + in even -- next square puts operator for two zero bytes in odd, and so + on, until len has been rotated down to zero */ + do { + gf2_matrix_square(even, odd); + len >>= 1; + if (len == 0) + return; + gf2_matrix_square(odd, even); + len >>= 1; + } while (len); + + /* answer ended up in odd -- copy to even */ + for (n = 0; n < 32; n++) + even[n] = odd[n]; +} + +/* Take a length and build four lookup tables for applying the zeros operator + for that length, byte-by-byte on the operand. */ +static void crc32c_zeros(uint32_t zeros[][256], size_t len) +{ + uint32_t n; + uint32_t op[32]; + + crc32c_zeros_op(op, len); + for (n = 0; n < 256; n++) { + zeros[0][n] = gf2_matrix_times(op, n); + zeros[1][n] = gf2_matrix_times(op, n << 8); + zeros[2][n] = gf2_matrix_times(op, n << 16); + zeros[3][n] = gf2_matrix_times(op, n << 24); + } +} + +/* Apply the zeros operator table to crc. */ +static inline uint32_t crc32c_shift(uint32_t zeros[][256], uint32_t crc) +{ + return zeros[0][crc & 0xff] ^ zeros[1][(crc >> 8) & 0xff] ^ + zeros[2][(crc >> 16) & 0xff] ^ zeros[3][crc >> 24]; +} + +/* Block sizes for three-way parallel crc computation. LONG and SHORT must + both be powers of two. The associated string constants must be set + accordingly, for use in constructing the assembler instructions. */ +#define LONG 8192 +#define LONGx1 "8192" +#define LONGx2 "16384" +#define SHORT 256 +#define SHORTx1 "256" +#define SHORTx2 "512" + +/* Tables for hardware crc that shift a crc by LONG and SHORT zeros. */ +static pthread_once_t crc32c_once_hw = PTHREAD_ONCE_INIT; +static uint32_t crc32c_long[4][256]; +static uint32_t crc32c_short[4][256]; + +/* Initialize tables for shifting crcs. */ +static void crc32c_init_hw(void) +{ + crc32c_zeros(crc32c_long, LONG); + crc32c_zeros(crc32c_short, SHORT); +} + +/* Compute CRC-32C using the Intel hardware instruction. */ +static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len) +{ + const unsigned char *next = buf; + const unsigned char *end; + uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */ + + /* populate shift tables the first time through */ + pthread_once(&crc32c_once_hw, crc32c_init_hw); + + /* pre-process the crc */ + crc0 = crc ^ 0xffffffff; + + /* compute the crc for up to seven leading bytes to bring the data pointer + to an eight-byte boundary */ + while (len && ((uintptr_t)next & 7) != 0) { + __asm__("crc32b\t" "(%1), %0" + : "=r"(crc0) + : "r"(next), "0"(crc0)); + next++; + len--; + } + + /* compute the crc on sets of LONG*3 bytes, executing three independent crc + instructions, each on LONG bytes -- this is optimized for the Nehalem, + Westmere, Sandy Bridge, and Ivy Bridge architectures, which have a + throughput of one crc per cycle, but a latency of three cycles */ + while (len >= LONG*3) { + crc1 = 0; + crc2 = 0; + end = next + LONG; + do { + __asm__("crc32q\t" "(%3), %0\n\t" + "crc32q\t" LONGx1 "(%3), %1\n\t" + "crc32q\t" LONGx2 "(%3), %2" + : "=r"(crc0), "=r"(crc1), "=r"(crc2) + : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2)); + next += 8; + } while (next < end); + crc0 = crc32c_shift(crc32c_long, crc0) ^ crc1; + crc0 = crc32c_shift(crc32c_long, crc0) ^ crc2; + next += LONG*2; + len -= LONG*3; + } + + /* do the same thing, but now on SHORT*3 blocks for the remaining data less + than a LONG*3 block */ + while (len >= SHORT*3) { + crc1 = 0; + crc2 = 0; + end = next + SHORT; + do { + __asm__("crc32q\t" "(%3), %0\n\t" + "crc32q\t" SHORTx1 "(%3), %1\n\t" + "crc32q\t" SHORTx2 "(%3), %2" + : "=r"(crc0), "=r"(crc1), "=r"(crc2) + : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2)); + next += 8; + } while (next < end); + crc0 = crc32c_shift(crc32c_short, crc0) ^ crc1; + crc0 = crc32c_shift(crc32c_short, crc0) ^ crc2; + next += SHORT*2; + len -= SHORT*3; + } + + /* compute the crc on the remaining eight-byte units less than a SHORT*3 + block */ + end = next + (len - (len & 7)); + while (next < end) { + __asm__("crc32q\t" "(%1), %0" + : "=r"(crc0) + : "r"(next), "0"(crc0)); + next += 8; + } + len &= 7; + + /* compute the crc for up to seven trailing bytes */ + while (len) { + __asm__("crc32b\t" "(%1), %0" + : "=r"(crc0) + : "r"(next), "0"(crc0)); + next++; + len--; + } + + /* return a post-processed crc */ + return (uint32_t)crc0 ^ 0xffffffff; +} + +/* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors + introduced in November, 2008. This does not check for the existence of the + cpuid instruction itself, which was introduced on the 486SL in 1992, so this + will fail on earlier x86 processors. cpuid works on all Pentium and later + processors. */ +#define SSE42(have) \ + do { \ + uint32_t eax, ecx; \ + eax = 1; \ + __asm__("cpuid" \ + : "=c"(ecx) \ + : "a"(eax) \ + : "%ebx", "%edx"); \ + (have) = (ecx >> 20) & 1; \ + } while (0) + +/* Compute a CRC-32C. If the crc32 instruction is available, use the hardware + version. Otherwise, use the software version. */ +void crc32c_init(void) { + int sse42; + + SSE42(sse42); + if (sse42) { + crc32c = crc32c_hw; + } else { + crc32c = crc32c_sw; + } +} diff --git a/crc32c.h b/crc32c.h new file mode 100644 index 0000000..8b030de --- /dev/null +++ b/crc32c.h @@ -0,0 +1,9 @@ +#ifndef CRC32C_H +#define CRC32C_H + +typedef uint32_t (*crc_func)(uint32_t crc, const void *buf, size_t len); +crc_func crc32c; + +void crc32c_init(void); + +#endif /* CRC32C_H */ diff --git a/doc/storage.txt b/doc/storage.txt new file mode 100644 index 0000000..41a3c7e --- /dev/null +++ b/doc/storage.txt @@ -0,0 +1,141 @@ +Storage system notes +-------------------- + +extstore.h defines the API. + +extstore_write() is a synchronous call which memcpy's the input buffer into a +write buffer for an active page. A failure is not usually a hard failure, but +indicates caller can try again another time. IE: it might be busy freeing +pages or assigning new ones. + +As of this writing the write() implementation doesn't have an internal loop, +so it can give spurious failures (good for testing integration) + +extstore_read() is an asynchronous call which takes a stack of IO objects and +adds it to the end of a queue. It then signals the IO thread to run. Once an +IO stack is submitted the caller must not touch the submitted objects anymore +(they are relinked internally). + +extstore_delete() is a synchronous call which informs the storage engine an +item has been removed from that page. It's important to call this as items are +actively deleted or passively reaped due to TTL expiration. This allows the +engine to intelligently reclaim pages. + +The IO threads execute each object in turn (or in bulk of running in the +future libaio mode). + +Callbacks are issued from the IO threads. It's thus important to keep +processing to a minimum. Callbacks may be issued out of order, and it is the +caller's responsibility to know when its stack has been fully processed so it +may reclaim the memory. + +With DIRECT_IO support, buffers submitted for read/write will need to be +aligned with posix_memalign() or similar. + +Buckets +------- + +During extstore_init(), a number of active buckets is specified. Pages are +handled overall as a global pool, but writes can be redirected to specific +active pages. + +This allows a lot of flexibility, ie: + +1) an idea of "high TTL" and "low TTL" being two buckets. TTL < 86400 +goes into bucket 0, rest into bucket 1. Co-locating low TTL items means +those pages can reach zero objects and free up more easily. + +2) Extended: "low TTL" is one bucket, and then one bucket per slab class. +If TTL's are low, mixed sized objects can go together as they are likely to +expire before cycling out of flash (depending on workload, of course). +For higher TTL items, pages are stored on chunk barriers. This means less +space is wasted as items should fit nearly exactly into write buffers and +pages. It also means you can blindly read items back if the system wants to +free a page and we can indicate to the caller somehow which pages are up for +probation. ie; issue a read against page 3 version 1 for byte range 0->1MB, +then chunk and look up objects. Then read next 1MB chunk/etc. If there's +anything we want to keep, pull it back into RAM before pages is freed. + +Pages are assigned into buckets on demand, so if you make 30 but use 1 there +will only be a single active page with write buffers. + +Memcached integration +--------------------- + +With the POC: items.c's lru_maintainer_thread calls writes to storage if all +memory has been allocated out to slab classes, and there is less than an +amount of memory free. Original objects are swapped with items marked with +ITEM_HDR flag. an ITEM_HDR contains copies of the original key and most of the +header data. The ITEM_data() section of an ITEM_HDR object contains (item_hdr +*), which describes enough information to retrieve the original object from +storage. + +To get best performance is important that reads can be deeply pipelined. +As much processing as possible is done ahead of time, IO's are submitted, and +once IO's are done processing a minimal amount of code is executed before +transmit() is possible. This should amortize the amount of latency incurred by +hopping threads and waiting on IO. + +Recaching +--------- + +If a header is hit twice overall, and the second time within ~60s of the first +time, it will have a chance of getting recached. "recache_rate" is a simple +"counter % rate == 0" check. Setting to 1000 means one out of every 1000 +instances of an item being hit twice within ~60s it will be recached into +memory. Very hot items will get pulled out of storage relatively quickly. + +Compaction +---------- + +A target fragmentation limit is set: "0.9", meaning "run compactions if pages +exist which have less than 90% of their bytes used". + +This value is slewed based on the number of free pages in the system, and +activates when half of the pages used. The percentage of free pages is +multiplied against the target fragmentation limit, ie: +limit of 0.9: 50% of pages free -> 0.9 * 0.5 -> 0.45%. If a page is 64 +megabytes, pages with less than 28.8 megabytes used would be targeted for +compaction. If 0 pges are free, anything less than 90% used is targeted, which +means it has to rewrite 10 pages to free one page. + +In memcached's integration, a second bucket is used for objects rewritten via +the compactor. Potentially objects around long enough to get compacted might +continue to stick around, so co-locating them could reduce fragmentation work. + +If an exclusive lock is made on a valid object header, the flash locations are +rewritten directly in the object. As of this writing, if an object header is +busy for some reason, the write is dropped (COW needs to be implemented). This +is an unlikely scenario however. + +Objects are read back along the boundaries of a write buffer. If an 8 meg +write buffer is used, 8 megs are read back at once and iterated for objects. + +This needs a fair amount of tuning, possibly more throttling. It will still +evict pages if the compactor gets behind. + +TODO +---- + +Sharing my broad TODO items into here. While doing the work they get split up +more into local notes. Adding this so others can follow along: + +(a bunch of the TODO has been completed and removed) +- DIRECT_IO support +- libaio support (requires DIRECT_IO) +- code cleanup (funtion over form until I have bugs out) + - pull all of the inlined linked list code + - naming consistency + - clear FIXME/TODO's related to error handling +- JBOD support (not first pass) + - 1-2 active pages per device. potentially dedicated IO threads per device. + with a RAID setup you risk any individual disk doing a GC pause stalling + all writes. could also simply rotate devices on a per-bucket basis. + +on memcached end: +- fix append/prepend/incr/decr/etc +- large item support +- --configure gating for extstore being compiled (for now, at least) +- binprot support +- DIRECT_IO support; mostly memalign pages, but also making chunks grow + aligned to sector sizes once they are >= a single sector. diff --git a/extstore.c b/extstore.c new file mode 100644 index 0000000..536211d --- /dev/null +++ b/extstore.c @@ -0,0 +1,800 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ + +// FIXME: config.h? +#include <stdint.h> +#include <stdbool.h> +// end FIXME +#include <stdlib.h> +#include <limits.h> +#include <pthread.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdio.h> +#include <string.h> +#include <assert.h> +#include "extstore.h" + +// TODO: better if an init option turns this on/off. +#ifdef EXTSTORE_DEBUG +#define E_DEBUG(...) \ + do { \ + fprintf(stderr, __VA_ARGS__); \ + } while (0) +#else +#define E_DEBUG(...) +#endif + +#define STAT_L(e) pthread_mutex_lock(&e->stats_mutex); +#define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex); +#define STAT_INCR(e, stat, amount) { \ + pthread_mutex_lock(&e->stats_mutex); \ + e->stats.stat += amount; \ + pthread_mutex_unlock(&e->stats_mutex); \ +} + +#define STAT_DECR(e, stat, amount) { \ + pthread_mutex_lock(&e->stats_mutex); \ + e->stats.stat -= amount; \ + pthread_mutex_unlock(&e->stats_mutex); \ +} + +typedef struct __store_wbuf { + struct __store_wbuf *next; + char *buf; + char *buf_pos; + unsigned int free; + unsigned int size; + unsigned int offset; /* offset into page this write starts at */ + bool full; /* done writing to this page */ + bool flushed; /* whether wbuf has been flushed to disk */ +} _store_wbuf; + +typedef struct _store_page { + pthread_mutex_t mutex; /* Need to be held for most operations */ + uint64_t obj_count; /* _delete can decrease post-closing */ + uint64_t bytes_used; /* _delete can decrease post-closing */ + uint64_t offset; /* starting address of page within fd */ + unsigned int version; + unsigned int refcount; + unsigned int allocated; + unsigned int written; /* item offsets can be past written if wbuf not flushed */ + unsigned int bucket; /* which bucket the page is linked into */ + int fd; + unsigned short id; + bool active; /* actively being written to */ + bool closed; /* closed and draining before free */ + bool free; /* on freelist */ + _store_wbuf *wbuf; /* currently active wbuf from the stack */ + struct _store_page *next; +} store_page; + +typedef struct store_engine store_engine; +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + obj_io *queue; + store_engine *e; +} store_io_thread; + +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + store_engine *e; +} store_maint_thread; + +/* TODO: Array of FDs for JBOD support */ +struct store_engine { + pthread_mutex_t mutex; /* covers internal stacks and variables */ + store_page *pages; /* directly addressable page list */ + _store_wbuf *wbuf_stack; /* wbuf freelist */ + obj_io *io_stack; /* IO's to use with submitting wbuf's */ + store_io_thread *io_threads; + store_maint_thread *maint_thread; + store_page *page_freelist; + store_page **page_buckets; /* stack of pages currently allocated to each bucket */ + size_t page_size; + unsigned int version; /* global version counter */ + unsigned int last_io_thread; /* round robin the IO threads */ + unsigned int io_threadcount; /* count of IO threads */ + unsigned int page_count; + unsigned int page_free; /* unallocated pages */ + unsigned int page_bucketcount; /* count of potential page buckets */ + unsigned int io_depth; /* FIXME: Might cache into thr struct */ + pthread_mutex_t stats_mutex; + struct extstore_stats stats; +}; + +static _store_wbuf *wbuf_new(size_t size) { + _store_wbuf *b = calloc(1, sizeof(_store_wbuf)); + if (b == NULL) + return NULL; + b->buf = malloc(size); + if (b->buf == NULL) { + free(b); + return NULL; + } + b->buf_pos = b->buf; + b->free = size; + b->size = size; + return b; +} + +static store_io_thread *_get_io_thread(store_engine *e) { + int tid; + pthread_mutex_lock(&e->mutex); + tid = (e->last_io_thread + 1) % e->io_threadcount; + e->last_io_thread = tid; + pthread_mutex_unlock(&e->mutex); + + return &e->io_threads[tid]; +} + +static uint64_t _next_version(store_engine *e) { + return e->version++; +} + +static void *extstore_io_thread(void *arg); +static void *extstore_maint_thread(void *arg); + +/* Copies stats internal to engine and computes any derived values */ +void extstore_get_stats(void *ptr, struct extstore_stats *st) { + store_engine *e = (store_engine *)ptr; + STAT_L(e); + memcpy(st, &e->stats, sizeof(struct extstore_stats)); + STAT_UL(e); + + // grab pages_free/pages_used + pthread_mutex_lock(&e->mutex); + st->pages_free = e->page_free; + st->pages_used = e->page_count - e->page_free; + pthread_mutex_unlock(&e->mutex); + // calculate bytes_fragmented. + // note that open and yet-filled pages count against fragmentation. + st->bytes_fragmented = st->pages_used * e->page_size - + st->bytes_used; +} + +void extstore_get_page_data(void *ptr, struct extstore_stats *st) { + store_engine *e = (store_engine *)ptr; + STAT_L(e); + memcpy(st->page_data, e->stats.page_data, + sizeof(struct extstore_page_data) * e->page_count); + STAT_UL(e); +} + +/* TODO: debug mode with prints? error code? */ +// TODO: Somehow pass real error codes from config failures +void *extstore_init(char *fn, struct extstore_conf *cf) { + int i; + int fd; + uint64_t offset = 0; + pthread_t thread; + + if (cf->page_size % cf->wbuf_size != 0) { + E_DEBUG("EXTSTORE: page_size must be divisible by wbuf_size\n"); + return NULL; + } + // Should ensure at least one write buffer per potential page + if (cf->page_buckets > cf->wbuf_count) { + E_DEBUG("EXTSTORE: wbuf_count must be >= page_buckets\n"); + return NULL; + } + if (cf->page_buckets < 1) { + E_DEBUG("EXTSTORE: page_buckets must be > 0\n"); + return NULL; + } + + // TODO: More intelligence around alignment of flash erasure block sizes + if (cf->page_size % (1024 * 1024 * 2) != 0 || + cf->wbuf_size % (1024 * 1024 * 2) != 0) { + E_DEBUG("EXTSTORE: page_size and wbuf_size must be divisible by 1024*1024*2\n"); + return NULL; + } + + store_engine *e = calloc(1, sizeof(store_engine)); + if (e == NULL) { + E_DEBUG("EXTSTORE: failed calloc for engine\n"); + return NULL; + } + + e->page_size = cf->page_size; + fd = open(fn, O_RDWR | O_CREAT | O_TRUNC, 0644); + if (fd < 0) { + E_DEBUG("EXTSTORE: failed to open file: %s\n", fn); +#ifdef EXTSTORE_DEBUG + perror("open"); +#endif + free(e); + return NULL; + } + + e->pages = calloc(cf->page_count, sizeof(store_page)); + if (e->pages == NULL) { + E_DEBUG("EXTSTORE: failed to calloc storage pages\n"); + close(fd); + free(e); + return NULL; + } + + for (i = 0; i < cf->page_count; i++) { + pthread_mutex_init(&e->pages[i].mutex, NULL); + e->pages[i].id = i; + e->pages[i].fd = fd; + e->pages[i].offset = offset; + e->pages[i].free = true; + offset += e->page_size; + } + + for (i = cf->page_count-1; i > 0; i--) { + e->pages[i].next = e->page_freelist; + e->page_freelist = &e->pages[i]; + e->page_free++; + } + + // 0 is magic "page is freed" version + e->version = 1; + + e->page_count = cf->page_count; + // scratch data for stats. TODO: malloc failure handle + e->stats.page_data = + calloc(e->page_count, sizeof(struct extstore_page_data)); + e->stats.page_count = e->page_count; + e->stats.page_size = e->page_size; + + // page buckets lazily have pages assigned into them + e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *)); + e->page_bucketcount = cf->page_buckets; + + // allocate write buffers + // also IO's to use for shipping to IO thread + for (i = 0; i < cf->wbuf_count; i++) { + _store_wbuf *w = wbuf_new(cf->wbuf_size); + obj_io *io = calloc(1, sizeof(obj_io)); + /* TODO: on error, loop again and free stack. */ + w->next = e->wbuf_stack; + e->wbuf_stack = w; + io->next = e->io_stack; + e->io_stack = io; + } + + pthread_mutex_init(&e->mutex, NULL); + pthread_mutex_init(&e->stats_mutex, NULL); + + e->io_depth = cf->io_depth; + + // spawn threads + e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread)); + for (i = 0; i < cf->io_threadcount; i++) { + pthread_mutex_init(&e->io_threads[i].mutex, NULL); + pthread_cond_init(&e->io_threads[i].cond, NULL); + e->io_threads[i].e = e; + // FIXME: error handling + pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]); + } + e->io_threadcount = cf->io_threadcount; + + e->maint_thread = calloc(1, sizeof(store_maint_thread)); + e->maint_thread->e = e; + // FIXME: error handling + pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread); + + return (void *)e; +} + +void extstore_run_maint(void *ptr) { + store_engine *e = (store_engine *)ptr; + pthread_cond_signal(&e->maint_thread->cond); +} + +// call with *e locked +static store_page *_allocate_page(store_engine *e, unsigned int bucket) { + assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size); + store_page *tmp = e->page_freelist; + E_DEBUG("EXTSTORE: allocating new page\n"); + if (e->page_free > 0) { + assert(e->page_freelist != NULL); + e->page_freelist = tmp->next; + tmp->next = e->page_buckets[bucket]; + e->page_buckets[bucket] = tmp; + tmp->active = true; + tmp->free = false; + tmp->closed = false; + tmp->version = _next_version(e); + tmp->bucket = bucket; + e->page_free--; + STAT_INCR(e, page_allocs, 1); + } else { + extstore_run_maint(e); + } + if (tmp) + E_DEBUG("EXTSTORE: got page %u\n", tmp->id); + return tmp; +} + +// call with *p locked. locks *e +static void _allocate_wbuf(store_engine *e, store_page *p) { + _store_wbuf *wbuf = NULL; + assert(!p->wbuf); + pthread_mutex_lock(&e->mutex); + if (e->wbuf_stack) { + wbuf = e->wbuf_stack; + e->wbuf_stack = wbuf->next; + wbuf->next = 0; + } + pthread_mutex_unlock(&e->mutex); + if (wbuf) { + wbuf->offset = p->allocated; + p->allocated += wbuf->size; + wbuf->free = wbuf->size; + wbuf->buf_pos = wbuf->buf; + wbuf->full = false; + wbuf->flushed = false; + + p->wbuf = wbuf; + } +} + +/* callback after wbuf is flushed. can only remove wbuf's from the head onward + * if successfully flushed, which complicates this routine. each callback + * attempts to free the wbuf stack, which is finally done when the head wbuf's + * callback happens. + * It's rare flushes would happen out of order. + */ +static void _wbuf_cb(void *ep, obj_io *io, int ret) { + store_engine *e = (store_engine *)ep; + store_page *p = &e->pages[io->page_id]; + _store_wbuf *w = (_store_wbuf *) io->data; + + // TODO: Examine return code. Not entirely sure how to handle errors. + // Naive first-pass should probably cause the page to close/free. + w->flushed = true; + pthread_mutex_lock(&p->mutex); + assert(p->wbuf != NULL && p->wbuf == w); + assert(p->written == w->offset); + p->written += w->size; + p->wbuf = NULL; + + if (p->written == e->page_size) + p->active = false; + + // return the wbuf + pthread_mutex_lock(&e->mutex); + w->next = e->wbuf_stack; + e->wbuf_stack = w; + // also return the IO we just used. + io->next = e->io_stack; + e->io_stack = io; + pthread_mutex_unlock(&e->mutex); + pthread_mutex_unlock(&p->mutex); +} + +/* Wraps pages current wbuf in an io and submits to IO thread. + * Called with p locked, locks e. + */ +static void _submit_wbuf(store_engine *e, store_page *p) { + _store_wbuf *w; + pthread_mutex_lock(&e->mutex); + obj_io *io = e->io_stack; + e->io_stack = io->next; + pthread_mutex_unlock(&e->mutex); + w = p->wbuf; + + // zero out the end of the wbuf to allow blind readback of data. + memset(w->buf + (w->size - w->free), 0, w->free); + + io->next = NULL; + io->mode = OBJ_IO_WRITE; + io->page_id = p->id; + io->data = w; + io->offset = w->offset; + io->len = w->size; + io->buf = w->buf; + io->cb = _wbuf_cb; + + extstore_submit(e, io); +} + +/* engine write function; takes engine, item_io. + * fast fail if no available write buffer (flushing) + * lock engine context, find active page, unlock + * if page full, submit page/buffer to io thread. + * + * write is designed to be flaky; if page full, caller must try again to get + * new page. best if used from a background thread that can harmlessly retry. + */ + +int extstore_write(void *ptr, unsigned int bucket, obj_io *io) { + store_engine *e = (store_engine *)ptr; + store_page *p; + int ret = -1; + if (bucket >= e->page_bucketcount) + return ret; + + pthread_mutex_lock(&e->mutex); + p = e->page_buckets[bucket]; + if (!p) { + p = _allocate_page(e, bucket); + } + pthread_mutex_unlock(&e->mutex); + if (!p) + return ret; + + pthread_mutex_lock(&p->mutex); + + // FIXME: can't null out page_buckets!!! + // page is full, clear bucket and retry later. + if (!p->active || + ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) { + pthread_mutex_unlock(&p->mutex); + pthread_mutex_lock(&e->mutex); + _allocate_page(e, bucket); + pthread_mutex_unlock(&e->mutex); + return ret; + } + + // if io won't fit, submit IO for wbuf and find new one. + if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) { + _submit_wbuf(e, p); + p->wbuf->full = true; + } + + if (!p->wbuf && p->allocated < e->page_size) { + _allocate_wbuf(e, p); + } + + // memcpy into wbuf + if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) { + memcpy(p->wbuf->buf_pos, io->buf, io->len); + io->page_id = p->id; + io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free); + io->page_version = p->version; + p->wbuf->buf_pos += io->len; + p->wbuf->free -= io->len; + p->bytes_used += io->len; + p->obj_count++; + STAT_L(e); + e->stats.bytes_written += io->len; + e->stats.bytes_used += io->len; + e->stats.objects_written++; + e->stats.objects_used++; + STAT_UL(e); + ret = 0; + } + + pthread_mutex_unlock(&p->mutex); + // p->written is incremented post-wbuf flush + return ret; +} + +/* engine submit function; takes engine, item_io stack. + * lock io_thread context and add stack? + * signal io thread to wake. + * return sucess. + */ +int extstore_submit(void *ptr, obj_io *io) { + store_engine *e = (store_engine *)ptr; + store_io_thread *t = _get_io_thread(e); + + pthread_mutex_lock(&t->mutex); + if (t->queue == NULL) { + t->queue = io; + } else { + /* Have to put the *io stack at the end of current queue. + * FIXME: Optimize by tracking tail. + */ + obj_io *tmp = t->queue; + while (tmp->next != NULL) { + tmp = tmp->next; + assert(tmp != t->queue); + } + tmp->next = io; + } + pthread_mutex_unlock(&t->mutex); + + //pthread_mutex_lock(&t->mutex); + pthread_cond_signal(&t->cond); + //pthread_mutex_unlock(&t->mutex); + return 0; +} + +/* engine note delete function: takes engine, page id, size? + * note that an item in this page is no longer valid + */ +int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version, + unsigned int count, unsigned int bytes) { + store_engine *e = (store_engine *)ptr; + // FIXME: validate page_id in bounds + store_page *p = &e->pages[page_id]; + int ret = 0; + + pthread_mutex_lock(&p->mutex); + if (!p->closed && p->version == page_version) { + if (p->bytes_used >= bytes) { + p->bytes_used -= bytes; + } else { + p->bytes_used = 0; + } + + if (p->obj_count >= count) { + p->obj_count -= count; + } else { + p->obj_count = 0; // caller has bad accounting? + } + STAT_L(e); + e->stats.bytes_used -= bytes; + e->stats.objects_used -= count; + STAT_UL(e); + + if (p->obj_count == 0) { + extstore_run_maint(e); + } + } else { + ret = -1; + } + pthread_mutex_unlock(&p->mutex); + return ret; +} + +int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) { + store_engine *e = (store_engine *)ptr; + store_page *p = &e->pages[page_id]; + int ret = 0; + + pthread_mutex_lock(&p->mutex); + if (p->version != page_version) + ret = -1; + pthread_mutex_unlock(&p->mutex); + return ret; +} + +/* allows a compactor to say "we're done with this page, kill it. */ +void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) { + store_engine *e = (store_engine *)ptr; + store_page *p = &e->pages[page_id]; + + pthread_mutex_lock(&p->mutex); + if (!p->closed && p->version == page_version) { + p->closed = true; + extstore_run_maint(e); + } + pthread_mutex_unlock(&p->mutex); +} + +/* Finds an attached wbuf that can satisfy the read. + * Since wbufs can potentially be flushed to disk out of order, they are only + * removed as the head of the list successfuly flushes to disk. + */ +// call with *p locked +// FIXME: protect from reading past wbuf +static inline int _read_from_wbuf(store_page *p, obj_io *io) { + _store_wbuf *wbuf = p->wbuf; + assert(wbuf != NULL); + assert(io->offset < p->written + wbuf->size); + memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len); + return io->len; +} + +/* engine IO thread; takes engine context + * manage writes/reads + * runs IO callbacks inline after each IO + */ +// FIXME: protect from reading past page +static void *extstore_io_thread(void *arg) { + store_io_thread *me = (store_io_thread *)arg; + store_engine *e = me->e; + while (1) { + obj_io *io_stack = NULL; + pthread_mutex_lock(&me->mutex); + if (me->queue == NULL) { + pthread_cond_wait(&me->cond, &me->mutex); + } + + // Pull and disconnect a batch from the queue + if (me->queue != NULL) { + int i; + obj_io *end = NULL; + io_stack = me->queue; + end = io_stack; + for (i = 1; i < e->io_depth; i++) { + if (end->next) { + end = end->next; + } else { + break; + } + } + me->queue = end->next; + end->next = NULL; + } + pthread_mutex_unlock(&me->mutex); + + obj_io *cur_io = io_stack; + while (cur_io) { + // We need to note next before the callback in case the obj_io + // gets reused. + obj_io *next = cur_io->next; + int ret = 0; + int do_op = 1; + store_page *p = &e->pages[cur_io->page_id]; + // TODO: loop if not enough bytes were read/written. + switch (cur_io->mode) { + case OBJ_IO_READ: + // Page is currently open. deal if read is past the end. + pthread_mutex_lock(&p->mutex); + if (!p->free && !p->closed && p->version == cur_io->page_version) { + if (p->active && cur_io->offset >= p->written) { + ret = _read_from_wbuf(p, cur_io); + do_op = 0; + } else { + p->refcount++; + } + STAT_L(e); + e->stats.bytes_read += cur_io->len; + e->stats.objects_read++; + STAT_UL(e); + } else { + do_op = 0; + ret = -2; // TODO: enum in IO for status? + } + pthread_mutex_unlock(&p->mutex); + if (do_op) + ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset); + break; + case OBJ_IO_WRITE: + do_op = 0; + // FIXME: Should hold refcount during write. doesn't + // currently matter since page can't free while active. + ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset); + break; + } + if (ret == 0) { + E_DEBUG("read returned nothing\n"); + } + +#ifdef EXTSTORE_DEBUG + if (ret == -1) { + perror("read/write op failed"); + } +#endif + cur_io->cb(e, cur_io, ret); + if (do_op) { + pthread_mutex_lock(&p->mutex); + p->refcount--; + pthread_mutex_unlock(&p->mutex); + } + cur_io = next; + } + } + + return NULL; +} + +// call with *p locked. +static void _free_page(store_engine *e, store_page *p) { + store_page *tmp = NULL; + store_page *prev = NULL; + E_DEBUG("EXTSTORE: freeing page %u\n", p->id); + STAT_L(e); + e->stats.objects_used -= p->obj_count; + e->stats.bytes_used -= p->bytes_used; + e->stats.page_reclaims++; + STAT_UL(e); + pthread_mutex_lock(&e->mutex); + // unlink page from bucket list + tmp = e->page_buckets[p->bucket]; + while (tmp) { + if (tmp == p) { + if (prev) { + prev->next = tmp->next; + } else { + e->page_buckets[p->bucket] = tmp->next; + } + tmp->next = NULL; + break; + } + prev = tmp; + tmp = tmp->next; + } + // reset most values + p->version = 0; + p->obj_count = 0; + p->bytes_used = 0; + p->allocated = 0; + p->written = 0; + p->bucket = 0; + p->active = false; + p->closed = false; + p->free = true; + // add to page stack + p->next = e->page_freelist; + e->page_freelist = p; + e->page_free++; + pthread_mutex_unlock(&e->mutex); +} + +/* engine maint thread; takes engine context. + * Uses version to ensure oldest possible objects are being evicted. + * Needs interface to inform owner of pages with fewer objects or most space + * free, which can then be actively compacted to avoid eviction. + * + * This gets called asynchronously after every page allocation. Could run less + * often if more pages are free. + * + * Another allocation call is required if an attempted free didn't happen + * due to the page having a refcount. + */ + +// TODO: Don't over-evict pages if waiting on refcounts to drop +static void *extstore_maint_thread(void *arg) { + store_maint_thread *me = (store_maint_thread *)arg; + store_engine *e = me->e; + struct extstore_page_data *pd = + calloc(e->page_count, sizeof(struct extstore_page_data)); + pthread_mutex_lock(&me->mutex); + while (1) { + int i; + bool do_evict = false; + unsigned int low_page = 0; + uint64_t low_version = ULLONG_MAX; + + pthread_cond_wait(&me->cond, &me->mutex); + pthread_mutex_lock(&e->mutex); + if (e->page_free == 0) { + do_evict = true; + } + pthread_mutex_unlock(&e->mutex); + memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count); + + for (i = 0; i < e->page_count; i++) { + store_page *p = &e->pages[i]; + pthread_mutex_lock(&p->mutex); + if (p->active || p->free) { + pthread_mutex_unlock(&p->mutex); + continue; + } + if (p->obj_count > 0 && !p->closed) { + pd[p->id].version = p->version; + pd[p->id].bytes_used = p->bytes_used; + if (p->version < low_version) { + low_version = p->version; + low_page = i; + } + } + if ((p->obj_count == 0 || p->closed) && p->refcount == 0) { + _free_page(e, p); + // Found a page to free, no longer need to evict. + do_evict = false; + } + pthread_mutex_unlock(&p->mutex); + } + + if (do_evict && low_version != ULLONG_MAX) { + store_page *p = &e->pages[low_page]; + E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n", + p->id, (unsigned long long) p->version); + pthread_mutex_lock(&p->mutex); + if (!p->closed) { + p->closed = true; + STAT_L(e); + e->stats.page_evictions++; + e->stats.objects_evicted += p->obj_count; + e->stats.bytes_evicted += p->bytes_used; + STAT_UL(e); + if (p->refcount == 0) { + _free_page(e, p); + } + } + pthread_mutex_unlock(&p->mutex); + } + + // copy the page data into engine context so callers can use it from + // the stats lock. + STAT_L(e); + memcpy(e->stats.page_data, pd, + sizeof(struct extstore_page_data) * e->page_count); + STAT_UL(e); + } + + return NULL; +} diff --git a/extstore.h b/extstore.h new file mode 100644 index 0000000..1088101 --- /dev/null +++ b/extstore.h @@ -0,0 +1,94 @@ +#ifndef EXTSTORE_H +#define EXTSTORE_H + +/* A safe-to-read dataset for determining compaction. + * id is the array index. + */ +struct extstore_page_data { + uint64_t version; + uint64_t bytes_used; +}; + +/* Pages can have objects deleted from them at any time. This creates holes + * that can't be reused until the page is either evicted or all objects are + * deleted. + * bytes_fragmented is the total bytes for all of these holes. + * It is the size of all used pages minus each page's bytes_used value. + */ +struct extstore_stats { + uint64_t page_allocs; + uint64_t page_count; /* total page count */ + uint64_t page_evictions; + uint64_t page_reclaims; + uint64_t page_size; /* size in bytes per page (supplied by caller) */ + uint64_t pages_free; /* currently unallocated/unused pages */ + uint64_t pages_used; + uint64_t objects_evicted; + uint64_t objects_read; + uint64_t objects_written; + uint64_t objects_used; /* total number of objects stored */ + uint64_t bytes_evicted; + uint64_t bytes_written; + uint64_t bytes_read; /* wbuf - read -> bytes read from storage */ + uint64_t bytes_used; /* total number of bytes stored */ + uint64_t bytes_fragmented; /* see above comment */ + struct extstore_page_data *page_data; +}; + +// TODO: Temporary configuration structure. A "real" library should have an +// extstore_set(enum, void *ptr) which hides the implementation. +// this is plenty for quick development. +struct extstore_conf { + unsigned int page_size; // ideally 64-256M in size + unsigned int page_count; + unsigned int page_buckets; // number of different writeable pages + unsigned int wbuf_size; // must divide cleanly into page_size + unsigned int wbuf_count; // this might get locked to "2 per active page" + unsigned int io_threadcount; + unsigned int io_depth; // with normal I/O, hits locks less. req'd for AIO +}; + +enum obj_io_mode { + OBJ_IO_READ = 0, + OBJ_IO_WRITE, +}; + +typedef struct _obj_io obj_io; +typedef void (*obj_io_cb)(void *e, obj_io *io, int ret); + +/* An object for both reads and writes to the storage engine. + * Once an IO is submitted, ->next may be changed by the IO thread. It is not + * safe to further modify the IO stack until the entire request is completed. + */ +struct _obj_io { + void *data; /* user supplied data pointer */ + struct _obj_io *next; + char *buf; /* buffer of data to read or write to */ + unsigned int page_version; /* page version for read mode */ + unsigned int len; /* for both modes */ + unsigned int offset; /* for read mode */ + unsigned short page_id; /* for read mode */ + enum obj_io_mode mode; + /* callback pointers? */ + obj_io_cb cb; +}; + + +void *extstore_init(char *fn, struct extstore_conf *cf); +int extstore_write(void *ptr, unsigned int bucket, obj_io *io); +int extstore_submit(void *ptr, obj_io *io); +/* count are the number of objects being removed, bytes are the original + * length of those objects. Bytes is optional but you can't track + * fragmentation without it. + */ +int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version); +int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version, unsigned int count, unsigned int bytes); +void extstore_get_stats(void *ptr, struct extstore_stats *st); +/* add page data array to a stats structure. + * caller must allocate its stats.page_data memory first. + */ +void extstore_get_page_data(void *ptr, struct extstore_stats *st); +void extstore_run_maint(void *ptr); +void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version); + +#endif @@ -2,6 +2,9 @@ #include "memcached.h" #include "bipbuffer.h" #include "slab_automove.h" +#ifdef EXTSTORE +#include "storage.h" +#endif #include <sys/stat.h> #include <sys/socket.h> #include <sys/resource.h> @@ -960,6 +963,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c was_found = 1; if (item_is_flushed(it)) { do_item_unlink(it, hv); + STORAGE_delete(c->thread->storage, it); do_item_remove(it); it = NULL; pthread_mutex_lock(&c->thread->stats.mutex); @@ -971,6 +975,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c was_found = 2; } else if (it->exptime != 0 && it->exptime <= current_time) { do_item_unlink(it, hv); + STORAGE_delete(c->thread->storage, it); do_item_remove(it); it = NULL; pthread_mutex_lock(&c->thread->stats.mutex); @@ -1489,6 +1494,10 @@ static pthread_t lru_maintainer_tid; #define MIN_LRU_MAINTAINER_SLEEP 1000 static void *lru_maintainer_thread(void *arg) { +#ifdef EXTSTORE + void *storage = arg; + int x; +#endif int i; useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP; useconds_t last_sleep = MIN_LRU_MAINTAINER_SLEEP; @@ -1542,6 +1551,18 @@ static void *lru_maintainer_thread(void *arg) { } int did_moves = lru_maintainer_juggle(i); +#ifdef EXTSTORE + // Deeper loop to speed up pushing to storage. + for (x = 0; x < 500; x++) { + int found; + found = lru_maintainer_store(storage, i); + if (found) { + did_moves += found; + } else { + break; + } + } +#endif if (did_moves == 0) { if (backoff_juggles[i] != 0) { backoff_juggles[i] += backoff_juggles[i] / 8; @@ -80,3 +80,16 @@ void lru_maintainer_pause(void); void lru_maintainer_resume(void); void *lru_bump_buf_create(void); + +#ifdef EXTSTORE +#define STORAGE_delete(e, it) \ + do { \ + if (it->it_flags & ITEM_HDR) { \ + item_hdr *hdr = (item_hdr *)ITEM_data(it); \ + extstore_delete(e, hdr->page_id, hdr->page_version, \ + 1, ITEM_ntotal(it)); \ + } \ + } while (0) +#else +#define STORAGE_delete(...) +#endif @@ -54,7 +54,27 @@ static const entry_details default_entries[] = { }, [LOGGER_SLAB_MOVE] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, "type=slab_move src=%d dst=%d" - } + }, +#ifdef EXTSTORE + [LOGGER_COMPACT_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_start id=%lu version=%llu" + }, + [LOGGER_COMPACT_ABORT] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_abort id=%lu" + }, + [LOGGER_COMPACT_READ_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_read_start id=%lu offset=%llu" + }, + [LOGGER_COMPACT_READ_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu" + }, + [LOGGER_COMPACT_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_end id=%lu" + }, + [LOGGER_COMPACT_FRAGINFO] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, + "type=compact_fraginfo ratio=%.2f bytes=%lu" + }, +#endif }; #define WATCHER_ALL -1 @@ -20,6 +20,14 @@ enum log_entry_type { LOGGER_ITEM_STORE, LOGGER_CRAWLER_STATUS, LOGGER_SLAB_MOVE, +#ifdef EXTSTORE + LOGGER_COMPACT_START, + LOGGER_COMPACT_ABORT, + LOGGER_COMPACT_READ_START, + LOGGER_COMPACT_READ_END, + LOGGER_COMPACT_END, + LOGGER_COMPACT_FRAGINFO, +#endif }; enum log_entry_subtype { diff --git a/memcached.c b/memcached.c index 2616ddb..80edcfa 100644 --- a/memcached.c +++ b/memcached.c @@ -14,6 +14,9 @@ * Brad Fitzpatrick <brad@danga.com> */ #include "memcached.h" +#ifdef EXTSTORE +#include "storage.h" +#endif #include <sys/stat.h> #include <sys/socket.h> #include <sys/un.h> @@ -472,9 +475,18 @@ void conn_worker_readd(conn *c) { event_base_set(c->thread->base, &c->event); c->state = conn_new_cmd; + // TODO: call conn_cleanup/fail/etc if (event_add(&c->event, 0) == -1) { perror("event_add"); } +#ifdef EXTSTORE + // If we had IO objects, process + if (c->io_wraplist) { + //assert(c->io_wrapleft == 0); // assert no more to process + conn_set_state(c, conn_mwrite); + drive_machine(c); // FIXME: Again, is it really this easy? + } +#endif } conn *conn_new(const int sfd, enum conn_states init_state, @@ -592,6 +604,10 @@ conn *conn_new(const int sfd, enum conn_states init_state, c->msgused = 0; c->authenticated = false; c->last_cmd_time = current_time; /* initialize for idle kicker */ +#ifdef EXTSTORE + c->io_wraplist = NULL; + c->io_wrapleft = 0; +#endif c->write_and_go = init_state; c->write_and_free = 0; @@ -617,7 +633,60 @@ conn *conn_new(const int sfd, enum conn_states init_state, return c; } +#ifdef EXTSTORE +static void recache_or_free(conn *c, io_wrap *wrap) { + item *it = (item *)wrap->io.buf; + // If request was ultimately a miss, unlink the header. + if (wrap->miss) { + size_t ntotal = ITEM_ntotal(wrap->hdr_it); + item_unlink(wrap->hdr_it); + slabs_free(it, ntotal, slabs_clsid(ntotal)); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.miss_from_extstore++; + if (wrap->badcrc) + c->thread->stats.badcrc_from_extstore++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } else { + // hashvalue is cuddled during store + uint32_t hv = (uint32_t)it->time; + bool do_free = true; + // opt to throw away rather than wait on a lock. + void *hold_lock = item_trylock(hv); + if (hold_lock != NULL) { + item *h_it = wrap->hdr_it; + uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE; + // Item must be recently hit at least twice to recache. + if (((h_it->it_flags & flags) == flags) && + h_it->time > current_time - ITEM_UPDATE_INTERVAL && + c->recache_counter++ % settings.ext_recache_rate == 0) { + do_free = false; + // In case it's been updated. + it->exptime = h_it->exptime; + it->it_flags &= ~ITEM_LINKED; + it->refcount = 0; + it->h_next = NULL; // might not be necessary. + STORAGE_delete(c->thread->storage, h_it); + item_replace(h_it, it, hv); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.recache_from_extstore++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } + } + + if (do_free) + slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it)); + if (hold_lock) + item_trylock_unlock(hold_lock); + } + wrap->io.buf = NULL; // sanity. + wrap->io.next = NULL; + wrap->next = NULL; + wrap->active = false; + // TODO: reuse lock and/or hv. + item_remove(wrap->hdr_it); +} +#endif static void conn_release_items(conn *c) { assert(c != NULL); @@ -639,7 +708,18 @@ static void conn_release_items(conn *c) { do_cache_free(c->thread->suffix_cache, *(c->suffixcurr)); } } - +#ifdef EXTSTORE + if (c->io_wraplist) { + io_wrap *tmp = c->io_wraplist; + while (tmp) { + io_wrap *next = tmp->next; + recache_or_free(c, tmp); + do_cache_free(c->thread->io_cache, tmp); // lockless + tmp = next; + } + c->io_wraplist = NULL; + } +#endif c->icurr = c->ilist; c->suffixcurr = c->suffixlist; } @@ -2298,6 +2378,7 @@ static void process_bin_update(conn *c) { it = item_get(key, nkey, c, DONT_UPDATE); if (it) { item_unlink(it); + STORAGE_delete(c->thread->storage, it); item_remove(it); } } @@ -2456,6 +2537,7 @@ static void process_bin_delete(conn *c) { c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++; pthread_mutex_unlock(&c->thread->stats.mutex); item_unlink(it); + STORAGE_delete(c->thread->storage, it); write_bin_response(c, NULL, 0, 0, 0); } else { write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); @@ -2676,6 +2758,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; pthread_mutex_unlock(&c->thread->stats.mutex); + STORAGE_delete(c->thread->storage, old_it); item_replace(old_it, it, hv); stored = STORED; } else { @@ -2735,10 +2818,12 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h } if (stored == NOT_STORED && failed_alloc == 0) { - if (old_it != NULL) + if (old_it != NULL) { + STORAGE_delete(c->thread->storage, old_it); item_replace(old_it, it, hv); - else + } else { do_item_link(it, hv); + } c->cas = ITEM_get_cas(it); @@ -2911,7 +2996,9 @@ static void server_stats(ADD_STAT add_stats, conn *c) { threadlocal_stats_aggregate(&thread_stats); struct slab_stats slab_stats; slab_stats_aggregate(&thread_stats, &slab_stats); - +#ifdef EXTSTORE + struct extstore_stats st; +#endif #ifndef WIN32 struct rusage usage; getrusage(RUSAGE_SELF, &usage); @@ -2951,6 +3038,12 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses); APPEND_STAT("get_expired", "%llu", (unsigned long long)thread_stats.get_expired); APPEND_STAT("get_flushed", "%llu", (unsigned long long)thread_stats.get_flushed); +#ifdef EXTSTORE + APPEND_STAT("get_extstore", "%llu", (unsigned long long)thread_stats.get_extstore); + APPEND_STAT("recache_from_extstore", "%llu", (unsigned long long)thread_stats.recache_from_extstore); + APPEND_STAT("miss_from_extstore", "%llu", (unsigned long long)thread_stats.miss_from_extstore); + APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore); +#endif APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses); APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits); APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses); @@ -3002,6 +3095,23 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_watcher_skipped); APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watcher_sent); STATS_UNLOCK(); +#ifdef EXTSTORE + extstore_get_stats(c->thread->storage, &st); + APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs); + APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions); + APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims); + APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free); + APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used); + APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted); + APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read); + APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written); + APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used); + APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted); + APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written); + APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read); + APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used); + APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented); +#endif } static void process_stat_settings(ADD_STAT add_stats, void *c) { @@ -3336,6 +3446,128 @@ static inline char *_ascii_get_suffix_buf(conn *c, int i) { *(c->suffixlist + i) = suffix; return suffix; } +#ifdef EXTSTORE +// FIXME: This runs in the IO thread. to get better IO performance this should +// simply mark the io wrapper with the return value and decrement wrapleft, if +// zero redispatching. Still a bit of work being done in the side thread but +// minimized at least. +static void _ascii_get_extstore_cb(void *e, obj_io *io, int ret) { + // FIXME: assumes success + io_wrap *wrap = (io_wrap *)io->data; + conn *c = wrap->c; + assert(wrap->active == true); + item *read_it = (item *)io->buf; + bool miss = false; + + // TODO: How to do counters for hit/misses? + if (ret < 1) { + miss = true; + } else { + uint32_t crc = (uint32_t) read_it->exptime; + read_it->exptime = 0; // to match what was crc'ed + uint32_t crc2 = crc32c(0, (char *)read_it+24, io->len-24); + if (crc != crc2) { + miss = true; + wrap->badcrc = true; + } + } + + if (miss) { + int i; + struct iovec *v; + for (i = 0; i < wrap->iovec_count; i++) { + v = &c->iov[wrap->iovec_start + i]; + v->iov_len = 0; + v->iov_base = NULL; + } + wrap->miss = true; + } else { + assert(read_it->slabs_clsid != 0); + c->iov[wrap->iovec_data].iov_base = ITEM_data(read_it); + // iov_len is already set + // TODO: Should do that here instead and cuddle in the wrap object + } + c->io_wrapleft--; + wrap->active = false; + //assert(c->io_wrapleft >= 0); + + // All IO's have returned, lets re-attach this connection to our original + // thread. + if (c->io_wrapleft == 0) { + assert(c->io_queued == true); + c->io_queued = false; + // FIXME: Is it really this easy? + // I have worries this won't be performant enough under load though. + // Can cause the IO threads to block if the pipes are full, too. + redispatch_conn(c); + } +} + +// FIXME: This completely breaks UDP support. +static inline int _ascii_get_extstore(conn *c, item *it) { + item_hdr *hdr = (item_hdr *)ITEM_data(it); + size_t ntotal = ITEM_ntotal(it); + unsigned int clsid = slabs_clsid(ntotal); + item *new_it = do_item_alloc_pull(ntotal, clsid); + if (new_it == NULL) + return -1; + assert(!c->io_queued); // FIXME: debugging. + // so we can free the chunk on a miss + new_it->slabs_clsid = clsid; + + io_wrap *io = do_cache_alloc(c->thread->io_cache); + io->active = true; + io->miss = false; + io->badcrc = false; + // io_wrap owns the reference for this object now. + io->hdr_it = it; + + // FIXME: error handling. + // The offsets we'll wipe on a miss. + io->iovec_start = c->iovused - 3; + io->iovec_count = 4; + // FIXME: error handling. + // This is probably super dangerous. keep it at 0 and fill into wrap + // object? + io->iovec_data = c->iovused; + add_iov(c, "", it->nbytes); + // The offset we'll fill in on a hit. + io->c = c; + // We need to stack the sub-struct IO's together as well. + if (c->io_wraplist) { + io->io.next = &c->io_wraplist->io; + } else { + io->io.next = NULL; + } + // IO queue for this connection. + io->next = c->io_wraplist; + c->io_wraplist = io; + assert(c->io_wrapleft >= 0); + c->io_wrapleft++; + // reference ourselves for the callback. + io->io.data = (void *)io; + io->io.buf = (void *)new_it; + + // Now, fill in io->io based on what was in our header. + io->io.page_version = hdr->page_version; + io->io.page_id = hdr->page_id; + io->io.offset = hdr->offset; + io->io.len = ITEM_ntotal(it); + io->io.mode = OBJ_IO_READ; + + // TODO: set the callback function + io->io.cb = _ascii_get_extstore_cb; + //fprintf(stderr, "EXTSTORE: IO stacked %u\n", io->iovec_data); + // FIXME: This stat needs to move to reflect # of flash hits vs misses + // for now it's a good gauge on how often we request out to flash at + // least. + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_extstore++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + return 0; +} +#endif // FIXME: the 'breaks' around memory malloc's should break all the way down, // fill ileft/suffixleft, then run conn_releaseitems() /* ntokens is overwritten here... shrug.. */ @@ -3417,7 +3649,16 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, item_remove(it); break; } +#ifdef EXTSTORE + if (it->it_flags & ITEM_HDR) { + if (_ascii_get_extstore(c, it) != 0) { + item_remove(it); + break; + } + } else if ((it->it_flags & ITEM_CHUNKED) == 0) { +#else if ((it->it_flags & ITEM_CHUNKED) == 0) { +#endif add_iov(c, ITEM_data(it), it->nbytes); } else if (add_chunked_item_iovs(c, it, it->nbytes) != 0) { item_remove(it); @@ -3468,9 +3709,16 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, c->thread->stats.get_cmds++; } pthread_mutex_unlock(&c->thread->stats.mutex); +#ifdef EXTSTORE + /* If ITEM_HDR, an io_wrap owns the reference. */ + if ((it->it_flags & ITEM_HDR) == 0) { + *(c->ilist + i) = it; + i++; + } +#else *(c->ilist + i) = it; i++; - +#endif } else { pthread_mutex_lock(&c->thread->stats.mutex); if (should_touch) { @@ -3602,6 +3850,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken it = item_get(key, nkey, c, DONT_UPDATE); if (it) { item_unlink(it); + STORAGE_delete(c->thread->storage, it); item_remove(it); } } @@ -3869,6 +4118,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken pthread_mutex_unlock(&c->thread->stats.mutex); item_unlink(it); + STORAGE_delete(c->thread->storage, it); item_remove(it); /* release our reference */ out_string(c, "DELETED"); } else { @@ -4072,7 +4322,38 @@ static void process_lru_command(conn *c, token_t *tokens, const size_t ntokens) out_string(c, "ERROR"); } } - +#ifdef EXTSTORE +static void process_extstore_command(conn *c, token_t *tokens, const size_t ntokens) { + set_noreply_maybe(c, tokens, ntokens); + if (strcmp(tokens[1].value, "item_size") == 0 && ntokens >= 3) { + if (!safe_strtoul(tokens[2].value, &settings.ext_item_size)) { + out_string(c, "ERROR"); + } else { + out_string(c, "OK"); + } + } else if (strcmp(tokens[1].value, "item_age") == 0 && ntokens >= 3) { + if (!safe_strtoul(tokens[2].value, &settings.ext_item_age)) { + out_string(c, "ERROR"); + } else { + out_string(c, "OK"); + } + } else if (strcmp(tokens[1].value, "recache_rate") == 0 && ntokens >= 3) { + if (!safe_strtoul(tokens[2].value, &settings.ext_recache_rate)) { + out_string(c, "ERROR"); + } else { + out_string(c, "OK"); + } + } else if (strcmp(tokens[1].value, "max_frag") == 0 && ntokens >= 3) { + if (!safe_strtod(tokens[2].value, &settings.ext_max_frag)) { + out_string(c, "ERROR"); + } else { + out_string(c, "OK"); + } + } else { + out_string(c, "ERROR"); + } +} +#endif static void process_command(conn *c, char *command) { token_t tokens[MAX_TOKENS]; @@ -4371,6 +4652,10 @@ static void process_command(conn *c, char *command) { } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "misbehave") == 0)) { process_misbehave_command(c); #endif +#ifdef EXTSTORE + } else if (ntokens >= 3 && strcmp(tokens[COMMAND_TOKEN].value, "extstore") == 0) { + process_extstore_command(c, tokens, ntokens); +#endif } else { if (ntokens >= 2 && strncmp(tokens[ntokens - 2].value, "HTTP/", 5) == 0) { conn_set_state(c, conn_closing); @@ -5126,6 +5411,23 @@ static void drive_machine(conn *c) { /* fall through... */ case conn_mwrite: +#ifdef EXTSTORE + /* have side IO's that must process before transmit() can run. + * remove the connection from the worker thread and dispatch the + * IO queue + */ + if (c->io_wrapleft) { + assert(c->io_queued == false); + assert(c->io_wraplist != NULL); + // TODO: create proper state for this condition + conn_set_state(c, conn_watch); + event_del(&c->event); + c->io_queued = true; + extstore_submit(c->thread->storage, &c->io_wraplist->io); + stop = true; + break; + } +#endif if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) { if (settings.verbose > 0) fprintf(stderr, "Failed to build UDP headers\n"); @@ -5983,7 +6285,11 @@ int main (int argc, char **argv) { bool use_slab_sizes = false; char *slab_sizes_unparsed = NULL; bool slab_chunk_size_changed = false; - +#ifdef EXTSTORE + void *storage = NULL; + char *storage_file = "/dev/shm/extstore"; + struct extstore_conf ext_cf; +#endif char *subopts, *subopts_orig; char *subopts_value; enum { @@ -6025,6 +6331,19 @@ int main (int argc, char **argv) { #ifdef MEMCACHED_DEBUG RELAXED_PRIVILEGES, #endif +#ifdef EXTSTORE + EXT_PAGE_SIZE, + EXT_PAGE_COUNT, + EXT_WBUF_SIZE, + EXT_WBUF_COUNT, + EXT_THREADS, + EXT_IO_DEPTH, + EXT_PATH, + EXT_ITEM_SIZE, + EXT_ITEM_AGE, + EXT_RECACHE_RATE, + EXT_MAX_FRAG, +#endif }; char *const subopts_tokens[] = { [MAXCONNS_FAST] = "maxconns_fast", @@ -6065,6 +6384,19 @@ int main (int argc, char **argv) { #ifdef MEMCACHED_DEBUG [RELAXED_PRIVILEGES] = "relaxed_privileges", #endif +#ifdef EXTSTORE + [EXT_PAGE_SIZE] = "ext_page_size", + [EXT_PAGE_COUNT] = "ext_page_count", + [EXT_WBUF_SIZE] = "ext_wbuf_size", + [EXT_WBUF_COUNT] = "ext_wbuf_count", + [EXT_THREADS] = "ext_threads", + [EXT_IO_DEPTH] = "ext_io_depth", + [EXT_PATH] = "ext_path", + [EXT_ITEM_SIZE] = "ext_item_size", + [EXT_ITEM_AGE] = "ext_item_age", + [EXT_RECACHE_RATE] = "ext_recache_rate", + [EXT_MAX_FRAG] = "ext_max_frag", +#endif NULL }; @@ -6078,9 +6410,22 @@ int main (int argc, char **argv) { /* init settings */ settings_init(); +#ifdef EXTSTORE + settings.ext_item_size = 512; + settings.ext_item_age = 0; + settings.ext_recache_rate = 2; + settings.ext_max_frag = 0.9; + settings.ext_wbuf_size = 1024 * 1024 * 8; + ext_cf.page_size = 1024 * 1024 * 64; + ext_cf.page_count = 64; + ext_cf.wbuf_size = settings.ext_wbuf_size; + ext_cf.wbuf_count = 2; + ext_cf.io_threadcount = 1; + ext_cf.io_depth = 1; + ext_cf.page_buckets = 2; +#endif /* Run regardless of initializing it later */ - init_lru_crawler(NULL); init_lru_maintainer(); /* set stderr non-buffering (for running under, say, daemontools) */ @@ -6604,6 +6949,114 @@ int main (int argc, char **argv) { start_lru_maintainer = false; settings.lru_segmented = false; break; +#ifdef EXTSTORE + case EXT_PAGE_SIZE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_page_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.page_size)) { + fprintf(stderr, "could not parse argument to ext_page_size\n"); + return 1; + } + ext_cf.page_size *= 1024 * 1024; /* megabytes */ + break; + case EXT_PAGE_COUNT: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_page_count argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.page_count)) { + fprintf(stderr, "could not parse argument to ext_page_count\n"); + return 1; + } + break; + case EXT_WBUF_SIZE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_wbuf_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.wbuf_size)) { + fprintf(stderr, "could not parse argument to ext_wbuf_size\n"); + return 1; + } + ext_cf.wbuf_size *= 1024 * 1024; /* megabytes */ + settings.ext_wbuf_size = ext_cf.wbuf_size; + break; + case EXT_WBUF_COUNT: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_wbuf_count argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.wbuf_count)) { + fprintf(stderr, "could not parse argument to ext_wbuf_count\n"); + return 1; + } + break; + case EXT_THREADS: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_threads argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.io_threadcount)) { + fprintf(stderr, "could not parse argument to ext_threads\n"); + return 1; + } + break; + case EXT_IO_DEPTH: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_io_depth argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &ext_cf.io_depth)) { + fprintf(stderr, "could not parse argument to ext_io_depth\n"); + return 1; + } + break; + case EXT_ITEM_SIZE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_item_size argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_item_size)) { + fprintf(stderr, "could not parse argument to ext_item_size\n"); + return 1; + } + break; + case EXT_ITEM_AGE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_item_age argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_item_age)) { + fprintf(stderr, "could not parse argument to ext_item_age\n"); + return 1; + } + break; + case EXT_RECACHE_RATE: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_recache_rate argument\n"); + return 1; + } + if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) { + fprintf(stderr, "could not parse argument to ext_recache_rate\n"); + return 1; + } + break; + case EXT_MAX_FRAG: + if (subopts_value == NULL) { + fprintf(stderr, "Missing ext_max_frag argument\n"); + return 1; + } + if (!safe_strtod(subopts_value, &settings.ext_max_frag)) { + fprintf(stderr, "could not parse argument to ext_max_frag\n"); + return 1; + } + break; + case EXT_PATH: + storage_file = strdup(subopts_value); + break; +#endif case MODERN: /* currently no new defaults */ break; @@ -6815,7 +7268,14 @@ int main (int argc, char **argv) { conn_init(); slabs_init(settings.maxbytes, settings.factor, preallocate, use_slab_sizes ? slab_sizes : NULL); - +#ifdef EXTSTORE + crc32c_init(); + storage = extstore_init(storage_file, &ext_cf); + if (storage == NULL) { + fprintf(stderr, "Failed to initialize external storage\n"); + exit(EXIT_FAILURE); + } +#endif /* * ignore SIGPIPE signals; we can use errno == EPIPE if we * need that information @@ -6825,18 +7285,31 @@ int main (int argc, char **argv) { exit(EX_OSERR); } /* start up worker threads if MT mode */ +#ifdef EXTSTORE + memcached_thread_init(settings.num_threads, storage); + init_lru_crawler(storage); +#else memcached_thread_init(settings.num_threads, NULL); + init_lru_crawler(NULL); +#endif if (start_assoc_maint && start_assoc_maintenance_thread() == -1) { exit(EXIT_FAILURE); } - if (start_lru_crawler && start_item_crawler_thread() != 0) { fprintf(stderr, "Failed to enable LRU crawler thread\n"); exit(EXIT_FAILURE); } +#ifdef EXTSTORE + if (start_storage_compact_thread(storage) != 0) { + fprintf(stderr, "Failed to start storage compaction thread\n"); + exit(EXIT_FAILURE); + } + if (start_lru_maintainer && start_lru_maintainer_thread(storage) != 0) { +#else if (start_lru_maintainer && start_lru_maintainer_thread(NULL) != 0) { +#endif fprintf(stderr, "Failed to enable LRU maintainer thread\n"); return 1; } diff --git a/memcached.h b/memcached.h index 69c3b50..97c78f6 100644 --- a/memcached.h +++ b/memcached.h @@ -5,6 +5,8 @@ * structures and function prototypes. */ +#define EXTSTORE 1 + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -24,6 +26,11 @@ #include "cache.h" #include "logger.h" +#ifdef EXTSTORE +#include "extstore.h" +#include "crc32c.h" +#endif + #include "sasl_defs.h" /** Maximum length of a key. */ @@ -266,6 +273,14 @@ struct slab_stats { X(auth_errors) \ X(idle_kicks) /* idle connections killed */ +#ifdef EXTSTORE +#define EXTSTORE_THREAD_STATS_FIELDS \ + X(get_extstore) \ + X(recache_from_extstore) \ + X(miss_from_extstore) \ + X(badcrc_from_extstore) +#endif + /** * Stats stored per-thread. */ @@ -273,6 +288,9 @@ struct thread_stats { pthread_mutex_t mutex; #define X(name) uint64_t name; THREAD_STATS_FIELDS +#ifdef EXTSTORE + EXTSTORE_THREAD_STATS_FIELDS +#endif #undef X struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES]; uint64_t lru_hits[POWER_LARGEST]; @@ -384,6 +402,13 @@ struct settings { unsigned int logger_buf_size; /* size of per-thread logger buffer */ bool drop_privileges; /* Whether or not to drop unnecessary process privileges */ bool relaxed_privileges; /* Relax process restrictions when running testapp */ +#ifdef EXTSTORE + unsigned int ext_item_size; /* minimum size of items to store externally */ + unsigned int ext_item_age; /* max age of tail item before storing ext. */ + unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */ + unsigned int ext_wbuf_size; /* read only note for the engine */ + double ext_max_frag; /* ideal maximum page fragmentation */ +#endif }; extern struct stats stats; @@ -404,6 +429,10 @@ extern struct settings settings; /* If an item's storage are chained chunks. */ #define ITEM_CHUNKED 32 #define ITEM_CHUNK 64 +#ifdef EXTSTORE +/* ITEM_data bulk is external to item */ +#define ITEM_HDR 128 +#endif /** * Structure for storing items within memcached. @@ -471,7 +500,13 @@ typedef struct _strchunk { uint8_t slabs_clsid; /* Same as above. */ char data[]; } item_chunk; - +#ifdef EXTSTORE +typedef struct { + unsigned int page_version; /* from IO header */ + unsigned int offset; /* from IO header */ + unsigned short page_id; /* from IO header */ +} item_hdr; +#endif typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ @@ -481,14 +516,31 @@ typedef struct { struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ +#ifdef EXTSTORE + cache_t *io_cache; /* IO objects */ + void *storage; /* data object for storage system */ +#endif logger *l; /* logger buffer */ void *lru_bump_buf; /* async LRU bump buffer */ } LIBEVENT_THREAD; - +typedef struct conn conn; +#ifdef EXTSTORE +typedef struct _io_wrap { + obj_io io; + struct _io_wrap *next; + conn *c; + item *hdr_it; /* original header item. */ + unsigned int iovec_start; /* start of the iovecs for this IO */ + unsigned int iovec_count; /* total number of iovecs */ + unsigned int iovec_data; /* specific index of data iovec */ + bool miss; /* signal a miss to unlink hdr_it */ + bool badcrc; /* signal a crc failure */ + bool active; // FIXME: canary for test. remove +} io_wrap; +#endif /** * The structure representing a connection into memcached. */ -typedef struct conn conn; struct conn { int sfd; sasl_conn_t *sasl_conn; @@ -549,7 +601,12 @@ struct conn { int suffixsize; char **suffixcurr; int suffixleft; - +#ifdef EXTSTORE + int io_wrapleft; + unsigned int recache_counter; + io_wrap *io_wraplist; /* linked list of io_wraps */ + bool io_queued; /* FIXME: debugging flag */ +#endif enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ @@ -16,6 +16,9 @@ int main(int argc, char **argv) { display("Settings", sizeof(struct settings)); display("Item (no cas)", sizeof(item)); display("Item (cas)", sizeof(item) + sizeof(uint64_t)); +#ifdef EXTSTORE + display("extstore header", sizeof(item_hdr)); +#endif display("Libevent thread", sizeof(LIBEVENT_THREAD) - sizeof(struct thread_stats)); display("Connection", sizeof(conn)); @@ -360,6 +360,9 @@ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { it = (item *)ptr; if ((it->it_flags & ITEM_CHUNKED) == 0) { +#ifdef EXTSTORE + bool is_hdr = it->it_flags & ITEM_HDR; +#endif it->it_flags = ITEM_SLABBED; it->slabs_clsid = 0; it->prev = 0; @@ -368,7 +371,15 @@ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { p->slots = it; p->sl_curr++; +#ifdef EXTSTORE + if (!is_hdr) { + p->requested -= size; + } else { + p->requested -= (size - it->nbytes) + sizeof(item_hdr); + } +#else p->requested -= size; +#endif } else { do_slabs_free_chunked(it, size); } @@ -855,6 +866,11 @@ static int slab_rebalance_move(void) { */ /* Check if expired or flushed */ ntotal = ITEM_ntotal(it); +#ifdef EXTSTORE + if (it->it_flags & ITEM_HDR) { + ntotal = (ntotal - it->nbytes) + sizeof(item_hdr); + } +#endif /* REQUIRES slabs_lock: CHECK FOR cls->sl_curr > 0 */ if (ch == NULL && (it->it_flags & ITEM_CHUNKED)) { /* Chunked should be identical to non-chunked, except we need @@ -928,6 +944,7 @@ static int slab_rebalance_move(void) { } else { /* restore ntotal in case we tried saving a head chunk. */ ntotal = ITEM_ntotal(it); + // FIXME: need storage instance to call extstore_delete do_item_unlink(it, hv); slabs_free(it, ntotal, slab_rebal.s_clsid); /* Swing around again later to remove it from the freelist. */ @@ -1027,6 +1044,7 @@ static void slab_rebalance_finish(void) { slab_rebal.d_clsid); } else if (slab_rebal.d_clsid == SLAB_GLOBAL_PAGE_POOL) { /* mem_malloc'ed might be higher than mem_limit. */ + mem_limit_reached = false; memory_release(); } diff --git a/storage.c b/storage.c new file mode 100644 index 0000000..21fd1f1 --- /dev/null +++ b/storage.c @@ -0,0 +1,379 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#include "memcached.h" +#include "storage.h" +#include <stdlib.h> +#include <string.h> +#include <limits.h> + +int lru_maintainer_store(void *storage, const int clsid) { + //int i; + int did_moves = 0; + int item_age = settings.ext_item_age; + bool mem_limit_reached = false; + unsigned int chunks_free; + unsigned int chunks_perslab; + struct lru_pull_tail_return it_info; + // FIXME: need to directly ask the slabber how big a class is + if (slabs_clsid(settings.ext_item_size) > clsid) + return 0; + chunks_free = slabs_available_chunks(clsid, &mem_limit_reached, + NULL, &chunks_perslab); + // if we are low on chunks and no spare, push out early. + if (chunks_free < (chunks_perslab / 2) && mem_limit_reached) + item_age = 0; + + it_info.it = NULL; + lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info); + /* Item isn't locked, but we have a reference to it. */ + if (it_info.it != NULL) { + obj_io io; + item *it = it_info.it; + /* First, storage for the header object */ + size_t orig_ntotal = ITEM_ntotal(it); + uint32_t flags; + // TODO: Doesn't presently work with chunked items. */ + if ((it->it_flags & (ITEM_CHUNKED|ITEM_HDR)) == 0 && + (item_age == 0 || current_time - it->time > item_age)) { + if (settings.inline_ascii_response) { + flags = (uint32_t) strtoul(ITEM_suffix(it)+1, (char **) NULL, 10); + } else if (it->nsuffix > 0) { + flags = *((uint32_t *)ITEM_suffix(it)); + } else { + flags = 0; + } + item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr)); + /* Run the storage write understanding the header is dirty. + * We will fill it from the header on read either way. + */ + if (hdr_it != NULL) { + // N.B.: rel_time_t happens to be an unsigned int. + // to do this fully safely requires more refactoring. + // cuddle the hash value into the time field so we don't have + // to recalcultae it. + uint32_t orig_time = (uint32_t) it->time; + it->time = it_info.hv; + // Also cuddle a crc32c into the exptime field. + // first clear it so we get a clean crc + uint32_t orig_exptime = (uint32_t) it->exptime; + it->exptime = 0; + it->exptime = crc32c(0, (char*)it+24, orig_ntotal-24); + hdr_it->it_flags |= ITEM_HDR; + io.buf = (void *) it; + io.len = orig_ntotal; + io.mode = OBJ_IO_WRITE; + // NOTE: when the item is read back in, the slab mover + // may see it. Important to have refcount>=2 or ~ITEM_LINKED + assert(it->refcount >= 2); + if (extstore_write(storage, 0, &io) == 0) { + item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it); + hdr->page_version = io.page_version; + hdr->page_id = io.page_id; + hdr->offset = io.offset; + // overload nbytes for the header it + hdr_it->nbytes = it->nbytes; + /* success! Now we need to fill relevant data into the new + * header and replace. Most of this requires the item lock + */ + /* CAS gets set while linking. Copy post-replace */ + item_replace(it, hdr_it, it_info.hv); + ITEM_set_cas(hdr_it, ITEM_get_cas(it)); + //fprintf(stderr, "EXTSTORE: swapped an item: %s %lu %lu\n", ITEM_key(it), orig_ntotal, ntotal); + do_item_remove(hdr_it); + did_moves = 1; + } else { + //fprintf(stderr, "EXTSTORE: failed to write\n"); + /* Failed to write for some reason, can't continue. */ + slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it)); + } + it->time = orig_time; + it->exptime = orig_exptime; + } + } + do_item_remove(it); + item_unlock(it_info.hv); + } + return did_moves; +} + +/* Fetch stats from the external storage system and decide to compact. + * If we're more than half full, start skewing how aggressively to run + * compaction, up to a desired target when all pages are full. + */ +static int storage_compact_check(void *storage, logger *l, + uint32_t *page_id, + uint64_t *page_version, uint64_t *page_size) { + struct extstore_stats st; + int x; + double rate; + uint64_t frag_limit; + uint64_t low_version = ULLONG_MAX; + unsigned int low_page = 0; + extstore_get_stats(storage, &st); + if (st.pages_used == 0) + return 0; + + // lets pick a target "wasted" value and slew. + if (st.pages_free > st.page_count / 4) + return 0; + + // the number of free pages reduces the configured frag limit + // this allows us to defrag early if pages are very empty. + rate = 1.0 - ((double)st.pages_free / st.page_count); + rate *= settings.ext_max_frag; + frag_limit = st.page_size * rate; + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO, + NULL, rate, frag_limit); + st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data)); + extstore_get_page_data(storage, &st); + + // find oldest page by version that violates the constraint + for (x = 0; x < st.page_count; x++) { + if (st.page_data[x].version == 0) + continue; + if (st.page_data[x].bytes_used < frag_limit) { + if (st.page_data[x].version < low_version) { + low_page = x; + low_version = st.page_data[x].version; + } + } + } + *page_size = st.page_size; + free(st.page_data); + + // we have a page + version to attempt to reclaim. + if (low_version != ULLONG_MAX) { + *page_id = low_page; + *page_version = low_version; + return 1; + } + + return 0; +} + +static pthread_t storage_compact_tid; +#define MIN_STORAGE_COMPACT_SLEEP 10000 +#define MAX_STORAGE_COMPACT_SLEEP 2000000 + +struct storage_compact_wrap { + obj_io io; + pthread_mutex_t lock; // gates the bools. + bool done; + bool submitted; + bool miss; // version flipped out from under us +}; + +static void storage_compact_readback(void *storage, logger *l, + char *readback_buf, + uint32_t page_id, uint64_t page_version, uint64_t read_size) { + uint64_t offset = 0; + unsigned int rescues = 0; + unsigned int lost = 0; + + while (offset < read_size) { + item *hdr_it = NULL; + item_hdr *hdr = NULL; + item *it = (item *)(readback_buf+offset); + unsigned int ntotal; + // probably zeroed out junk at the end of the wbuf + if (it->nkey == 0) { + break; + } + + ntotal = ITEM_ntotal(it); + uint32_t hv = (uint32_t)it->time; + item_lock(hv); + // We don't have a conn and don't need to do most of do_item_get + hdr_it = assoc_find(ITEM_key(it), it->nkey, hv); + if (hdr_it != NULL) { + bool do_write = false; + refcount_incr(hdr_it); + + // Check validity but don't bother removing it. + if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) && + (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) { + hdr = (item_hdr *)ITEM_data(hdr_it); + if (hdr->page_id == page_id && hdr->page_version == page_version) { + // Item header is still completely valid. + extstore_delete(storage, page_id, page_version, 1, ntotal); + do_write = true; + } + } + + if (do_write) { + bool do_update = false; + int tries; + obj_io io; + io.buf = (void *)it; + io.len = ntotal; + io.mode = OBJ_IO_WRITE; + for (tries = 10; tries > 0; tries--) { + if (extstore_write(storage, 1, &io) == 0) { + do_update = true; + break; + } else { + usleep(1000); + } + } + + if (do_update) { + if (it->refcount == 2) { + hdr->page_version = io.page_version; + hdr->page_id = io.page_id; + hdr->offset = io.offset; + rescues++; + } else { + lost++; + // TODO: re-alloc and replace header. + } + } + } + + do_item_remove(hdr_it); + } + + item_unlock(hv); + offset += ntotal; + if (read_size - offset < sizeof(struct _stritem)) + break; + } + + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END, + NULL, page_id, offset, rescues, lost); +} + +static void _storage_compact_cb(void *e, obj_io *io, int ret) { + struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data; + assert(wrap->submitted == true); + + pthread_mutex_lock(&wrap->lock); + + if (ret < 1) { + wrap->miss = true; + } + wrap->done = true; + + pthread_mutex_unlock(&wrap->lock); +} + +// TODO: hoist the storage bits from lru_maintainer_thread in here. +// would be nice if they could avoid hammering the same locks though? +// I guess it's only COLD. that's probably fine. +static void *storage_compact_thread(void *arg) { + void *storage = arg; + useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP; + bool compacting = false; + uint64_t page_version = 0; + uint64_t page_size = 0; + uint64_t page_offset = 0; + uint32_t page_id = 0; + char *readback_buf = NULL; + struct storage_compact_wrap wrap; + + logger *l = logger_create(); + if (l == NULL) { + fprintf(stderr, "Failed to allocate logger for LRU maintainer thread\n"); + abort(); + } + + // TODO: check error. + readback_buf = malloc(settings.ext_wbuf_size); + + pthread_mutex_init(&wrap.lock, NULL); + wrap.done = false; + wrap.submitted = false; + wrap.io.data = &wrap; + wrap.io.buf = (void *)readback_buf; + + wrap.io.len = settings.ext_wbuf_size; + wrap.io.mode = OBJ_IO_READ; + wrap.io.cb = _storage_compact_cb; + + while (1) { + if (to_sleep) { + extstore_run_maint(storage); + usleep(to_sleep); + } + + if (!compacting && storage_compact_check(storage, l, + &page_id, &page_version, &page_size)) { + page_offset = 0; + compacting = true; + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START, + NULL, page_id, page_version); + } + + if (compacting) { + pthread_mutex_lock(&wrap.lock); + if (page_offset < page_size && !wrap.done && !wrap.submitted) { + wrap.io.page_version = page_version; + wrap.io.page_id = page_id; + wrap.io.offset = page_offset; + // FIXME: should be smarter about io->next (unlink at use?) + wrap.io.next = NULL; + wrap.submitted = true; + wrap.miss = false; + + extstore_submit(storage, &wrap.io); + } else if (wrap.miss) { + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT, + NULL, page_id); + wrap.done = false; + wrap.submitted = false; + compacting = false; + } else if (wrap.submitted && wrap.done) { + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START, + NULL, page_id, page_offset); + storage_compact_readback(storage, l, + readback_buf, page_id, page_version, settings.ext_wbuf_size); + page_offset += settings.ext_wbuf_size; + wrap.done = false; + wrap.submitted = false; + } else if (page_offset >= page_size) { + compacting = false; + wrap.done = false; + wrap.submitted = false; + extstore_close_page(storage, page_id, page_version); + LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END, + NULL, page_id); + } + pthread_mutex_unlock(&wrap.lock); + + if (to_sleep > MIN_STORAGE_COMPACT_SLEEP) + to_sleep /= 2; + } else { + if (to_sleep < MAX_STORAGE_COMPACT_SLEEP) + to_sleep += MIN_STORAGE_COMPACT_SLEEP; + } + } + free(readback_buf); + + return NULL; +} + +// TODO +/*int stop_storage_compact_thread(void) { + int ret; + pthread_mutex_lock(&lru_maintainer_lock); + do_run_lru_maintainer_thread = 0; + pthread_mutex_unlock(&lru_maintainer_lock); + if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) { + fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret)); + return -1; + } + settings.lru_maintainer_thread = false; + return 0; +}*/ + +int start_storage_compact_thread(void *arg) { + int ret; + + if ((ret = pthread_create(&storage_compact_tid, NULL, + storage_compact_thread, arg)) != 0) { + fprintf(stderr, "Can't create storage_compact thread: %s\n", + strerror(ret)); + return -1; + } + + return 0; +} + diff --git a/storage.h b/storage.h new file mode 100644 index 0000000..3bfeacc --- /dev/null +++ b/storage.h @@ -0,0 +1,7 @@ +#ifndef STORAGE_H +#define STORAGE_H + +int lru_maintainer_store(void *storage, const int clsid); +int start_storage_compact_thread(void *arg); + +#endif @@ -334,6 +334,13 @@ static void setup_thread(LIBEVENT_THREAD *me) { fprintf(stderr, "Failed to create suffix cache\n"); exit(EXIT_FAILURE); } +#ifdef EXTSTORE + me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL); + if (me->io_cache == NULL) { + fprintf(stderr, "Failed to create IO object cache\n"); + exit(EXIT_FAILURE); + } +#endif } /* @@ -641,6 +648,9 @@ void threadlocal_stats_reset(void) { pthread_mutex_lock(&threads[ii].stats.mutex); #define X(name) threads[ii].stats.name = 0; THREAD_STATS_FIELDS +#ifdef EXTSTORE + EXTSTORE_THREAD_STATS_FIELDS +#endif #undef X memset(&threads[ii].stats.slab_stats, 0, @@ -663,6 +673,9 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) { pthread_mutex_lock(&threads[ii].stats.mutex); #define X(name) stats->name += threads[ii].stats.name; THREAD_STATS_FIELDS +#ifdef EXTSTORE + EXTSTORE_THREAD_STATS_FIELDS +#endif #undef X for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { @@ -765,7 +778,9 @@ void memcached_thread_init(int nthreads, void *arg) { threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; - +#ifdef EXTSTORE + threads[i].storage = arg; +#endif setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; |