summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2017-09-26 14:43:17 -0700
committerdormando <dormando@rydia.net>2017-11-28 14:18:05 -0800
commitf593a59bce69f917514ef6213cf565c71bddcf8c (patch)
tree4a5dc07433e97b089f46a913b5367aa5d52c059a
parente6239a905d072e837baa8aa425ca0ccee2fc3e01 (diff)
downloadmemcached-f593a59bce69f917514ef6213cf565c71bddcf8c.tar.gz
external storage base commit
been squashing reorganizing, and pulling code off to go upstream ahead of merging the whole branch.
-rw-r--r--Makefile.am5
-rw-r--r--crawler.c21
-rw-r--r--crc32c.c343
-rw-r--r--crc32c.h9
-rw-r--r--doc/storage.txt141
-rw-r--r--extstore.c800
-rw-r--r--extstore.h94
-rw-r--r--items.c21
-rw-r--r--items.h13
-rw-r--r--logger.c22
-rw-r--r--logger.h8
-rw-r--r--memcached.c493
-rw-r--r--memcached.h65
-rw-r--r--sizes.c3
-rw-r--r--slabs.c18
-rw-r--r--storage.c379
-rw-r--r--storage.h7
-rw-r--r--thread.c17
18 files changed, 2441 insertions, 18 deletions
diff --git a/Makefile.am b/Makefile.am
index 95c334c..d1e4d60 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -23,7 +23,10 @@ memcached_SOURCES = memcached.c memcached.h \
logger.c logger.h \
crawler.c crawler.h \
itoa_ljust.c itoa_ljust.h \
- slab_automove.c slab_automove.h
+ slab_automove.c slab_automove.h \
+ extstore.c exstore.h \
+ storage.c storage.h \
+ crc32c.c crc32c.h
if BUILD_CACHE
memcached_SOURCES += cache.c
diff --git a/crawler.c b/crawler.c
index b38ad7e..d43fa92 100644
--- a/crawler.c
+++ b/crawler.c
@@ -96,6 +96,10 @@ static volatile int do_run_lru_crawler_thread = 0;
static int lru_crawler_initialized = 0;
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
+#ifdef EXTSTORE
+/* TODO: pass this around */
+static void *storage;
+#endif
/* Will crawl all slab classes a minimum of once per hour */
#define MAX_MAINTCRAWL_WAIT 60 * 60
@@ -179,8 +183,20 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv
pthread_mutex_lock(&d->lock);
crawlerstats_t *s = &d->crawlerstats[i];
int is_flushed = item_is_flushed(search);
+#ifdef EXTSTORE
+ bool is_valid = true;
+ if (search->it_flags & ITEM_HDR) {
+ item_hdr *hdr = (item_hdr *)ITEM_data(search);
+ if (extstore_check(storage, hdr->page_id, hdr->page_version) != 0)
+ is_valid = false;
+ }
+#endif
if ((search->exptime != 0 && search->exptime < current_time)
- || is_flushed) {
+ || is_flushed
+#ifdef EXTSTORE
+ || !is_valid
+#endif
+ ) {
crawlers[i].reclaimed++;
s->reclaimed++;
@@ -656,6 +672,9 @@ void lru_crawler_resume(void) {
int init_lru_crawler(void *arg) {
if (lru_crawler_initialized == 0) {
+#ifdef EXTSTORE
+ storage = arg;
+#endif
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
fprintf(stderr, "Can't initialize lru crawler condition\n");
return -1;
diff --git a/crc32c.c b/crc32c.c
new file mode 100644
index 0000000..13deee2
--- /dev/null
+++ b/crc32c.c
@@ -0,0 +1,343 @@
+/* crc32c.c -- compute CRC-32C using the Intel crc32 instruction
+ * Copyright (C) 2013 Mark Adler
+ * Version 1.1 1 Aug 2013 Mark Adler
+ */
+
+/*
+ This software is provided 'as-is', without any express or implied
+ warranty. In no event will the author be held liable for any damages
+ arising from the use of this software.
+
+ Permission is granted to anyone to use this software for any purpose,
+ including commercial applications, and to alter it and redistribute it
+ freely, subject to the following restrictions:
+
+ 1. The origin of this software must not be misrepresented; you must not
+ claim that you wrote the original software. If you use this software
+ in a product, an acknowledgment in the product documentation would be
+ appreciated but is not required.
+ 2. Altered source versions must be plainly marked as such, and must not be
+ misrepresented as being the original software.
+ 3. This notice may not be removed or altered from any source distribution.
+
+ Mark Adler
+ madler@alumni.caltech.edu
+ */
+
+/* Use hardware CRC instruction on Intel SSE 4.2 processors. This computes a
+ CRC-32C, *not* the CRC-32 used by Ethernet and zip, gzip, etc. A software
+ version is provided as a fall-back, as well as for speed comparisons. */
+
+/* Version history:
+ 1.0 10 Feb 2013 First version
+ 1.1 1 Aug 2013 Correct comments on why three crc instructions in parallel
+ */
+
+/* This version has been modified by dormando for inclusion in memcached */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <pthread.h>
+#include "crc32c.h"
+
+/* CRC-32C (iSCSI) polynomial in reversed bit order. */
+#define POLY 0x82f63b78
+
+/* Table for a quadword-at-a-time software crc. */
+static pthread_once_t crc32c_once_sw = PTHREAD_ONCE_INIT;
+static uint32_t crc32c_table[8][256];
+
+/* Construct table for software CRC-32C calculation. */
+static void crc32c_init_sw(void)
+{
+ uint32_t n, crc, k;
+
+ for (n = 0; n < 256; n++) {
+ crc = n;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
+ crc32c_table[0][n] = crc;
+ }
+ for (n = 0; n < 256; n++) {
+ crc = crc32c_table[0][n];
+ for (k = 1; k < 8; k++) {
+ crc = crc32c_table[0][crc & 0xff] ^ (crc >> 8);
+ crc32c_table[k][n] = crc;
+ }
+ }
+}
+
+/* Table-driven software version as a fall-back. This is about 15 times slower
+ than using the hardware instructions. This assumes little-endian integers,
+ as is the case on Intel processors that the assembler code here is for. */
+static uint32_t crc32c_sw(uint32_t crci, const void *buf, size_t len)
+{
+ const unsigned char *next = buf;
+ uint64_t crc;
+
+ pthread_once(&crc32c_once_sw, crc32c_init_sw);
+ crc = crci ^ 0xffffffff;
+ while (len && ((uintptr_t)next & 7) != 0) {
+ crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
+ len--;
+ }
+ while (len >= 8) {
+ crc ^= *(uint64_t *)next;
+ crc = crc32c_table[7][crc & 0xff] ^
+ crc32c_table[6][(crc >> 8) & 0xff] ^
+ crc32c_table[5][(crc >> 16) & 0xff] ^
+ crc32c_table[4][(crc >> 24) & 0xff] ^
+ crc32c_table[3][(crc >> 32) & 0xff] ^
+ crc32c_table[2][(crc >> 40) & 0xff] ^
+ crc32c_table[1][(crc >> 48) & 0xff] ^
+ crc32c_table[0][crc >> 56];
+ next += 8;
+ len -= 8;
+ }
+ while (len) {
+ crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
+ len--;
+ }
+ return (uint32_t)crc ^ 0xffffffff;
+}
+
+/* Multiply a matrix times a vector over the Galois field of two elements,
+ GF(2). Each element is a bit in an unsigned integer. mat must have at
+ least as many entries as the power of two for most significant one bit in
+ vec. */
+static inline uint32_t gf2_matrix_times(uint32_t *mat, uint32_t vec)
+{
+ uint32_t sum;
+
+ sum = 0;
+ while (vec) {
+ if (vec & 1)
+ sum ^= *mat;
+ vec >>= 1;
+ mat++;
+ }
+ return sum;
+}
+
+/* Multiply a matrix by itself over GF(2). Both mat and square must have 32
+ rows. */
+static inline void gf2_matrix_square(uint32_t *square, uint32_t *mat)
+{
+ int n;
+
+ for (n = 0; n < 32; n++)
+ square[n] = gf2_matrix_times(mat, mat[n]);
+}
+
+/* Construct an operator to apply len zeros to a crc. len must be a power of
+ two. If len is not a power of two, then the result is the same as for the
+ largest power of two less than len. The result for len == 0 is the same as
+ for len == 1. A version of this routine could be easily written for any
+ len, but that is not needed for this application. */
+static void crc32c_zeros_op(uint32_t *even, size_t len)
+{
+ int n;
+ uint32_t row;
+ uint32_t odd[32]; /* odd-power-of-two zeros operator */
+
+ /* put operator for one zero bit in odd */
+ odd[0] = POLY; /* CRC-32C polynomial */
+ row = 1;
+ for (n = 1; n < 32; n++) {
+ odd[n] = row;
+ row <<= 1;
+ }
+
+ /* put operator for two zero bits in even */
+ gf2_matrix_square(even, odd);
+
+ /* put operator for four zero bits in odd */
+ gf2_matrix_square(odd, even);
+
+ /* first square will put the operator for one zero byte (eight zero bits),
+ in even -- next square puts operator for two zero bytes in odd, and so
+ on, until len has been rotated down to zero */
+ do {
+ gf2_matrix_square(even, odd);
+ len >>= 1;
+ if (len == 0)
+ return;
+ gf2_matrix_square(odd, even);
+ len >>= 1;
+ } while (len);
+
+ /* answer ended up in odd -- copy to even */
+ for (n = 0; n < 32; n++)
+ even[n] = odd[n];
+}
+
+/* Take a length and build four lookup tables for applying the zeros operator
+ for that length, byte-by-byte on the operand. */
+static void crc32c_zeros(uint32_t zeros[][256], size_t len)
+{
+ uint32_t n;
+ uint32_t op[32];
+
+ crc32c_zeros_op(op, len);
+ for (n = 0; n < 256; n++) {
+ zeros[0][n] = gf2_matrix_times(op, n);
+ zeros[1][n] = gf2_matrix_times(op, n << 8);
+ zeros[2][n] = gf2_matrix_times(op, n << 16);
+ zeros[3][n] = gf2_matrix_times(op, n << 24);
+ }
+}
+
+/* Apply the zeros operator table to crc. */
+static inline uint32_t crc32c_shift(uint32_t zeros[][256], uint32_t crc)
+{
+ return zeros[0][crc & 0xff] ^ zeros[1][(crc >> 8) & 0xff] ^
+ zeros[2][(crc >> 16) & 0xff] ^ zeros[3][crc >> 24];
+}
+
+/* Block sizes for three-way parallel crc computation. LONG and SHORT must
+ both be powers of two. The associated string constants must be set
+ accordingly, for use in constructing the assembler instructions. */
+#define LONG 8192
+#define LONGx1 "8192"
+#define LONGx2 "16384"
+#define SHORT 256
+#define SHORTx1 "256"
+#define SHORTx2 "512"
+
+/* Tables for hardware crc that shift a crc by LONG and SHORT zeros. */
+static pthread_once_t crc32c_once_hw = PTHREAD_ONCE_INIT;
+static uint32_t crc32c_long[4][256];
+static uint32_t crc32c_short[4][256];
+
+/* Initialize tables for shifting crcs. */
+static void crc32c_init_hw(void)
+{
+ crc32c_zeros(crc32c_long, LONG);
+ crc32c_zeros(crc32c_short, SHORT);
+}
+
+/* Compute CRC-32C using the Intel hardware instruction. */
+static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
+{
+ const unsigned char *next = buf;
+ const unsigned char *end;
+ uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */
+
+ /* populate shift tables the first time through */
+ pthread_once(&crc32c_once_hw, crc32c_init_hw);
+
+ /* pre-process the crc */
+ crc0 = crc ^ 0xffffffff;
+
+ /* compute the crc for up to seven leading bytes to bring the data pointer
+ to an eight-byte boundary */
+ while (len && ((uintptr_t)next & 7) != 0) {
+ __asm__("crc32b\t" "(%1), %0"
+ : "=r"(crc0)
+ : "r"(next), "0"(crc0));
+ next++;
+ len--;
+ }
+
+ /* compute the crc on sets of LONG*3 bytes, executing three independent crc
+ instructions, each on LONG bytes -- this is optimized for the Nehalem,
+ Westmere, Sandy Bridge, and Ivy Bridge architectures, which have a
+ throughput of one crc per cycle, but a latency of three cycles */
+ while (len >= LONG*3) {
+ crc1 = 0;
+ crc2 = 0;
+ end = next + LONG;
+ do {
+ __asm__("crc32q\t" "(%3), %0\n\t"
+ "crc32q\t" LONGx1 "(%3), %1\n\t"
+ "crc32q\t" LONGx2 "(%3), %2"
+ : "=r"(crc0), "=r"(crc1), "=r"(crc2)
+ : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
+ next += 8;
+ } while (next < end);
+ crc0 = crc32c_shift(crc32c_long, crc0) ^ crc1;
+ crc0 = crc32c_shift(crc32c_long, crc0) ^ crc2;
+ next += LONG*2;
+ len -= LONG*3;
+ }
+
+ /* do the same thing, but now on SHORT*3 blocks for the remaining data less
+ than a LONG*3 block */
+ while (len >= SHORT*3) {
+ crc1 = 0;
+ crc2 = 0;
+ end = next + SHORT;
+ do {
+ __asm__("crc32q\t" "(%3), %0\n\t"
+ "crc32q\t" SHORTx1 "(%3), %1\n\t"
+ "crc32q\t" SHORTx2 "(%3), %2"
+ : "=r"(crc0), "=r"(crc1), "=r"(crc2)
+ : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
+ next += 8;
+ } while (next < end);
+ crc0 = crc32c_shift(crc32c_short, crc0) ^ crc1;
+ crc0 = crc32c_shift(crc32c_short, crc0) ^ crc2;
+ next += SHORT*2;
+ len -= SHORT*3;
+ }
+
+ /* compute the crc on the remaining eight-byte units less than a SHORT*3
+ block */
+ end = next + (len - (len & 7));
+ while (next < end) {
+ __asm__("crc32q\t" "(%1), %0"
+ : "=r"(crc0)
+ : "r"(next), "0"(crc0));
+ next += 8;
+ }
+ len &= 7;
+
+ /* compute the crc for up to seven trailing bytes */
+ while (len) {
+ __asm__("crc32b\t" "(%1), %0"
+ : "=r"(crc0)
+ : "r"(next), "0"(crc0));
+ next++;
+ len--;
+ }
+
+ /* return a post-processed crc */
+ return (uint32_t)crc0 ^ 0xffffffff;
+}
+
+/* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors
+ introduced in November, 2008. This does not check for the existence of the
+ cpuid instruction itself, which was introduced on the 486SL in 1992, so this
+ will fail on earlier x86 processors. cpuid works on all Pentium and later
+ processors. */
+#define SSE42(have) \
+ do { \
+ uint32_t eax, ecx; \
+ eax = 1; \
+ __asm__("cpuid" \
+ : "=c"(ecx) \
+ : "a"(eax) \
+ : "%ebx", "%edx"); \
+ (have) = (ecx >> 20) & 1; \
+ } while (0)
+
+/* Compute a CRC-32C. If the crc32 instruction is available, use the hardware
+ version. Otherwise, use the software version. */
+void crc32c_init(void) {
+ int sse42;
+
+ SSE42(sse42);
+ if (sse42) {
+ crc32c = crc32c_hw;
+ } else {
+ crc32c = crc32c_sw;
+ }
+}
diff --git a/crc32c.h b/crc32c.h
new file mode 100644
index 0000000..8b030de
--- /dev/null
+++ b/crc32c.h
@@ -0,0 +1,9 @@
+#ifndef CRC32C_H
+#define CRC32C_H
+
+typedef uint32_t (*crc_func)(uint32_t crc, const void *buf, size_t len);
+crc_func crc32c;
+
+void crc32c_init(void);
+
+#endif /* CRC32C_H */
diff --git a/doc/storage.txt b/doc/storage.txt
new file mode 100644
index 0000000..41a3c7e
--- /dev/null
+++ b/doc/storage.txt
@@ -0,0 +1,141 @@
+Storage system notes
+--------------------
+
+extstore.h defines the API.
+
+extstore_write() is a synchronous call which memcpy's the input buffer into a
+write buffer for an active page. A failure is not usually a hard failure, but
+indicates caller can try again another time. IE: it might be busy freeing
+pages or assigning new ones.
+
+As of this writing the write() implementation doesn't have an internal loop,
+so it can give spurious failures (good for testing integration)
+
+extstore_read() is an asynchronous call which takes a stack of IO objects and
+adds it to the end of a queue. It then signals the IO thread to run. Once an
+IO stack is submitted the caller must not touch the submitted objects anymore
+(they are relinked internally).
+
+extstore_delete() is a synchronous call which informs the storage engine an
+item has been removed from that page. It's important to call this as items are
+actively deleted or passively reaped due to TTL expiration. This allows the
+engine to intelligently reclaim pages.
+
+The IO threads execute each object in turn (or in bulk of running in the
+future libaio mode).
+
+Callbacks are issued from the IO threads. It's thus important to keep
+processing to a minimum. Callbacks may be issued out of order, and it is the
+caller's responsibility to know when its stack has been fully processed so it
+may reclaim the memory.
+
+With DIRECT_IO support, buffers submitted for read/write will need to be
+aligned with posix_memalign() or similar.
+
+Buckets
+-------
+
+During extstore_init(), a number of active buckets is specified. Pages are
+handled overall as a global pool, but writes can be redirected to specific
+active pages.
+
+This allows a lot of flexibility, ie:
+
+1) an idea of "high TTL" and "low TTL" being two buckets. TTL < 86400
+goes into bucket 0, rest into bucket 1. Co-locating low TTL items means
+those pages can reach zero objects and free up more easily.
+
+2) Extended: "low TTL" is one bucket, and then one bucket per slab class.
+If TTL's are low, mixed sized objects can go together as they are likely to
+expire before cycling out of flash (depending on workload, of course).
+For higher TTL items, pages are stored on chunk barriers. This means less
+space is wasted as items should fit nearly exactly into write buffers and
+pages. It also means you can blindly read items back if the system wants to
+free a page and we can indicate to the caller somehow which pages are up for
+probation. ie; issue a read against page 3 version 1 for byte range 0->1MB,
+then chunk and look up objects. Then read next 1MB chunk/etc. If there's
+anything we want to keep, pull it back into RAM before pages is freed.
+
+Pages are assigned into buckets on demand, so if you make 30 but use 1 there
+will only be a single active page with write buffers.
+
+Memcached integration
+---------------------
+
+With the POC: items.c's lru_maintainer_thread calls writes to storage if all
+memory has been allocated out to slab classes, and there is less than an
+amount of memory free. Original objects are swapped with items marked with
+ITEM_HDR flag. an ITEM_HDR contains copies of the original key and most of the
+header data. The ITEM_data() section of an ITEM_HDR object contains (item_hdr
+*), which describes enough information to retrieve the original object from
+storage.
+
+To get best performance is important that reads can be deeply pipelined.
+As much processing as possible is done ahead of time, IO's are submitted, and
+once IO's are done processing a minimal amount of code is executed before
+transmit() is possible. This should amortize the amount of latency incurred by
+hopping threads and waiting on IO.
+
+Recaching
+---------
+
+If a header is hit twice overall, and the second time within ~60s of the first
+time, it will have a chance of getting recached. "recache_rate" is a simple
+"counter % rate == 0" check. Setting to 1000 means one out of every 1000
+instances of an item being hit twice within ~60s it will be recached into
+memory. Very hot items will get pulled out of storage relatively quickly.
+
+Compaction
+----------
+
+A target fragmentation limit is set: "0.9", meaning "run compactions if pages
+exist which have less than 90% of their bytes used".
+
+This value is slewed based on the number of free pages in the system, and
+activates when half of the pages used. The percentage of free pages is
+multiplied against the target fragmentation limit, ie:
+limit of 0.9: 50% of pages free -> 0.9 * 0.5 -> 0.45%. If a page is 64
+megabytes, pages with less than 28.8 megabytes used would be targeted for
+compaction. If 0 pges are free, anything less than 90% used is targeted, which
+means it has to rewrite 10 pages to free one page.
+
+In memcached's integration, a second bucket is used for objects rewritten via
+the compactor. Potentially objects around long enough to get compacted might
+continue to stick around, so co-locating them could reduce fragmentation work.
+
+If an exclusive lock is made on a valid object header, the flash locations are
+rewritten directly in the object. As of this writing, if an object header is
+busy for some reason, the write is dropped (COW needs to be implemented). This
+is an unlikely scenario however.
+
+Objects are read back along the boundaries of a write buffer. If an 8 meg
+write buffer is used, 8 megs are read back at once and iterated for objects.
+
+This needs a fair amount of tuning, possibly more throttling. It will still
+evict pages if the compactor gets behind.
+
+TODO
+----
+
+Sharing my broad TODO items into here. While doing the work they get split up
+more into local notes. Adding this so others can follow along:
+
+(a bunch of the TODO has been completed and removed)
+- DIRECT_IO support
+- libaio support (requires DIRECT_IO)
+- code cleanup (funtion over form until I have bugs out)
+ - pull all of the inlined linked list code
+ - naming consistency
+ - clear FIXME/TODO's related to error handling
+- JBOD support (not first pass)
+ - 1-2 active pages per device. potentially dedicated IO threads per device.
+ with a RAID setup you risk any individual disk doing a GC pause stalling
+ all writes. could also simply rotate devices on a per-bucket basis.
+
+on memcached end:
+- fix append/prepend/incr/decr/etc
+- large item support
+- --configure gating for extstore being compiled (for now, at least)
+- binprot support
+- DIRECT_IO support; mostly memalign pages, but also making chunks grow
+ aligned to sector sizes once they are >= a single sector.
diff --git a/extstore.c b/extstore.c
new file mode 100644
index 0000000..536211d
--- /dev/null
+++ b/extstore.c
@@ -0,0 +1,800 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+// FIXME: config.h?
+#include <stdint.h>
+#include <stdbool.h>
+// end FIXME
+#include <stdlib.h>
+#include <limits.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include "extstore.h"
+
+// TODO: better if an init option turns this on/off.
+#ifdef EXTSTORE_DEBUG
+#define E_DEBUG(...) \
+ do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+#else
+#define E_DEBUG(...)
+#endif
+
+#define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
+#define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
+#define STAT_INCR(e, stat, amount) { \
+ pthread_mutex_lock(&e->stats_mutex); \
+ e->stats.stat += amount; \
+ pthread_mutex_unlock(&e->stats_mutex); \
+}
+
+#define STAT_DECR(e, stat, amount) { \
+ pthread_mutex_lock(&e->stats_mutex); \
+ e->stats.stat -= amount; \
+ pthread_mutex_unlock(&e->stats_mutex); \
+}
+
+typedef struct __store_wbuf {
+ struct __store_wbuf *next;
+ char *buf;
+ char *buf_pos;
+ unsigned int free;
+ unsigned int size;
+ unsigned int offset; /* offset into page this write starts at */
+ bool full; /* done writing to this page */
+ bool flushed; /* whether wbuf has been flushed to disk */
+} _store_wbuf;
+
+typedef struct _store_page {
+ pthread_mutex_t mutex; /* Need to be held for most operations */
+ uint64_t obj_count; /* _delete can decrease post-closing */
+ uint64_t bytes_used; /* _delete can decrease post-closing */
+ uint64_t offset; /* starting address of page within fd */
+ unsigned int version;
+ unsigned int refcount;
+ unsigned int allocated;
+ unsigned int written; /* item offsets can be past written if wbuf not flushed */
+ unsigned int bucket; /* which bucket the page is linked into */
+ int fd;
+ unsigned short id;
+ bool active; /* actively being written to */
+ bool closed; /* closed and draining before free */
+ bool free; /* on freelist */
+ _store_wbuf *wbuf; /* currently active wbuf from the stack */
+ struct _store_page *next;
+} store_page;
+
+typedef struct store_engine store_engine;
+typedef struct {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ obj_io *queue;
+ store_engine *e;
+} store_io_thread;
+
+typedef struct {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ store_engine *e;
+} store_maint_thread;
+
+/* TODO: Array of FDs for JBOD support */
+struct store_engine {
+ pthread_mutex_t mutex; /* covers internal stacks and variables */
+ store_page *pages; /* directly addressable page list */
+ _store_wbuf *wbuf_stack; /* wbuf freelist */
+ obj_io *io_stack; /* IO's to use with submitting wbuf's */
+ store_io_thread *io_threads;
+ store_maint_thread *maint_thread;
+ store_page *page_freelist;
+ store_page **page_buckets; /* stack of pages currently allocated to each bucket */
+ size_t page_size;
+ unsigned int version; /* global version counter */
+ unsigned int last_io_thread; /* round robin the IO threads */
+ unsigned int io_threadcount; /* count of IO threads */
+ unsigned int page_count;
+ unsigned int page_free; /* unallocated pages */
+ unsigned int page_bucketcount; /* count of potential page buckets */
+ unsigned int io_depth; /* FIXME: Might cache into thr struct */
+ pthread_mutex_t stats_mutex;
+ struct extstore_stats stats;
+};
+
+static _store_wbuf *wbuf_new(size_t size) {
+ _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
+ if (b == NULL)
+ return NULL;
+ b->buf = malloc(size);
+ if (b->buf == NULL) {
+ free(b);
+ return NULL;
+ }
+ b->buf_pos = b->buf;
+ b->free = size;
+ b->size = size;
+ return b;
+}
+
+static store_io_thread *_get_io_thread(store_engine *e) {
+ int tid;
+ pthread_mutex_lock(&e->mutex);
+ tid = (e->last_io_thread + 1) % e->io_threadcount;
+ e->last_io_thread = tid;
+ pthread_mutex_unlock(&e->mutex);
+
+ return &e->io_threads[tid];
+}
+
+static uint64_t _next_version(store_engine *e) {
+ return e->version++;
+}
+
+static void *extstore_io_thread(void *arg);
+static void *extstore_maint_thread(void *arg);
+
+/* Copies stats internal to engine and computes any derived values */
+void extstore_get_stats(void *ptr, struct extstore_stats *st) {
+ store_engine *e = (store_engine *)ptr;
+ STAT_L(e);
+ memcpy(st, &e->stats, sizeof(struct extstore_stats));
+ STAT_UL(e);
+
+ // grab pages_free/pages_used
+ pthread_mutex_lock(&e->mutex);
+ st->pages_free = e->page_free;
+ st->pages_used = e->page_count - e->page_free;
+ pthread_mutex_unlock(&e->mutex);
+ // calculate bytes_fragmented.
+ // note that open and yet-filled pages count against fragmentation.
+ st->bytes_fragmented = st->pages_used * e->page_size -
+ st->bytes_used;
+}
+
+void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
+ store_engine *e = (store_engine *)ptr;
+ STAT_L(e);
+ memcpy(st->page_data, e->stats.page_data,
+ sizeof(struct extstore_page_data) * e->page_count);
+ STAT_UL(e);
+}
+
+/* TODO: debug mode with prints? error code? */
+// TODO: Somehow pass real error codes from config failures
+void *extstore_init(char *fn, struct extstore_conf *cf) {
+ int i;
+ int fd;
+ uint64_t offset = 0;
+ pthread_t thread;
+
+ if (cf->page_size % cf->wbuf_size != 0) {
+ E_DEBUG("EXTSTORE: page_size must be divisible by wbuf_size\n");
+ return NULL;
+ }
+ // Should ensure at least one write buffer per potential page
+ if (cf->page_buckets > cf->wbuf_count) {
+ E_DEBUG("EXTSTORE: wbuf_count must be >= page_buckets\n");
+ return NULL;
+ }
+ if (cf->page_buckets < 1) {
+ E_DEBUG("EXTSTORE: page_buckets must be > 0\n");
+ return NULL;
+ }
+
+ // TODO: More intelligence around alignment of flash erasure block sizes
+ if (cf->page_size % (1024 * 1024 * 2) != 0 ||
+ cf->wbuf_size % (1024 * 1024 * 2) != 0) {
+ E_DEBUG("EXTSTORE: page_size and wbuf_size must be divisible by 1024*1024*2\n");
+ return NULL;
+ }
+
+ store_engine *e = calloc(1, sizeof(store_engine));
+ if (e == NULL) {
+ E_DEBUG("EXTSTORE: failed calloc for engine\n");
+ return NULL;
+ }
+
+ e->page_size = cf->page_size;
+ fd = open(fn, O_RDWR | O_CREAT | O_TRUNC, 0644);
+ if (fd < 0) {
+ E_DEBUG("EXTSTORE: failed to open file: %s\n", fn);
+#ifdef EXTSTORE_DEBUG
+ perror("open");
+#endif
+ free(e);
+ return NULL;
+ }
+
+ e->pages = calloc(cf->page_count, sizeof(store_page));
+ if (e->pages == NULL) {
+ E_DEBUG("EXTSTORE: failed to calloc storage pages\n");
+ close(fd);
+ free(e);
+ return NULL;
+ }
+
+ for (i = 0; i < cf->page_count; i++) {
+ pthread_mutex_init(&e->pages[i].mutex, NULL);
+ e->pages[i].id = i;
+ e->pages[i].fd = fd;
+ e->pages[i].offset = offset;
+ e->pages[i].free = true;
+ offset += e->page_size;
+ }
+
+ for (i = cf->page_count-1; i > 0; i--) {
+ e->pages[i].next = e->page_freelist;
+ e->page_freelist = &e->pages[i];
+ e->page_free++;
+ }
+
+ // 0 is magic "page is freed" version
+ e->version = 1;
+
+ e->page_count = cf->page_count;
+ // scratch data for stats. TODO: malloc failure handle
+ e->stats.page_data =
+ calloc(e->page_count, sizeof(struct extstore_page_data));
+ e->stats.page_count = e->page_count;
+ e->stats.page_size = e->page_size;
+
+ // page buckets lazily have pages assigned into them
+ e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
+ e->page_bucketcount = cf->page_buckets;
+
+ // allocate write buffers
+ // also IO's to use for shipping to IO thread
+ for (i = 0; i < cf->wbuf_count; i++) {
+ _store_wbuf *w = wbuf_new(cf->wbuf_size);
+ obj_io *io = calloc(1, sizeof(obj_io));
+ /* TODO: on error, loop again and free stack. */
+ w->next = e->wbuf_stack;
+ e->wbuf_stack = w;
+ io->next = e->io_stack;
+ e->io_stack = io;
+ }
+
+ pthread_mutex_init(&e->mutex, NULL);
+ pthread_mutex_init(&e->stats_mutex, NULL);
+
+ e->io_depth = cf->io_depth;
+
+ // spawn threads
+ e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
+ for (i = 0; i < cf->io_threadcount; i++) {
+ pthread_mutex_init(&e->io_threads[i].mutex, NULL);
+ pthread_cond_init(&e->io_threads[i].cond, NULL);
+ e->io_threads[i].e = e;
+ // FIXME: error handling
+ pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
+ }
+ e->io_threadcount = cf->io_threadcount;
+
+ e->maint_thread = calloc(1, sizeof(store_maint_thread));
+ e->maint_thread->e = e;
+ // FIXME: error handling
+ pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
+
+ return (void *)e;
+}
+
+void extstore_run_maint(void *ptr) {
+ store_engine *e = (store_engine *)ptr;
+ pthread_cond_signal(&e->maint_thread->cond);
+}
+
+// call with *e locked
+static store_page *_allocate_page(store_engine *e, unsigned int bucket) {
+ assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
+ store_page *tmp = e->page_freelist;
+ E_DEBUG("EXTSTORE: allocating new page\n");
+ if (e->page_free > 0) {
+ assert(e->page_freelist != NULL);
+ e->page_freelist = tmp->next;
+ tmp->next = e->page_buckets[bucket];
+ e->page_buckets[bucket] = tmp;
+ tmp->active = true;
+ tmp->free = false;
+ tmp->closed = false;
+ tmp->version = _next_version(e);
+ tmp->bucket = bucket;
+ e->page_free--;
+ STAT_INCR(e, page_allocs, 1);
+ } else {
+ extstore_run_maint(e);
+ }
+ if (tmp)
+ E_DEBUG("EXTSTORE: got page %u\n", tmp->id);
+ return tmp;
+}
+
+// call with *p locked. locks *e
+static void _allocate_wbuf(store_engine *e, store_page *p) {
+ _store_wbuf *wbuf = NULL;
+ assert(!p->wbuf);
+ pthread_mutex_lock(&e->mutex);
+ if (e->wbuf_stack) {
+ wbuf = e->wbuf_stack;
+ e->wbuf_stack = wbuf->next;
+ wbuf->next = 0;
+ }
+ pthread_mutex_unlock(&e->mutex);
+ if (wbuf) {
+ wbuf->offset = p->allocated;
+ p->allocated += wbuf->size;
+ wbuf->free = wbuf->size;
+ wbuf->buf_pos = wbuf->buf;
+ wbuf->full = false;
+ wbuf->flushed = false;
+
+ p->wbuf = wbuf;
+ }
+}
+
+/* callback after wbuf is flushed. can only remove wbuf's from the head onward
+ * if successfully flushed, which complicates this routine. each callback
+ * attempts to free the wbuf stack, which is finally done when the head wbuf's
+ * callback happens.
+ * It's rare flushes would happen out of order.
+ */
+static void _wbuf_cb(void *ep, obj_io *io, int ret) {
+ store_engine *e = (store_engine *)ep;
+ store_page *p = &e->pages[io->page_id];
+ _store_wbuf *w = (_store_wbuf *) io->data;
+
+ // TODO: Examine return code. Not entirely sure how to handle errors.
+ // Naive first-pass should probably cause the page to close/free.
+ w->flushed = true;
+ pthread_mutex_lock(&p->mutex);
+ assert(p->wbuf != NULL && p->wbuf == w);
+ assert(p->written == w->offset);
+ p->written += w->size;
+ p->wbuf = NULL;
+
+ if (p->written == e->page_size)
+ p->active = false;
+
+ // return the wbuf
+ pthread_mutex_lock(&e->mutex);
+ w->next = e->wbuf_stack;
+ e->wbuf_stack = w;
+ // also return the IO we just used.
+ io->next = e->io_stack;
+ e->io_stack = io;
+ pthread_mutex_unlock(&e->mutex);
+ pthread_mutex_unlock(&p->mutex);
+}
+
+/* Wraps pages current wbuf in an io and submits to IO thread.
+ * Called with p locked, locks e.
+ */
+static void _submit_wbuf(store_engine *e, store_page *p) {
+ _store_wbuf *w;
+ pthread_mutex_lock(&e->mutex);
+ obj_io *io = e->io_stack;
+ e->io_stack = io->next;
+ pthread_mutex_unlock(&e->mutex);
+ w = p->wbuf;
+
+ // zero out the end of the wbuf to allow blind readback of data.
+ memset(w->buf + (w->size - w->free), 0, w->free);
+
+ io->next = NULL;
+ io->mode = OBJ_IO_WRITE;
+ io->page_id = p->id;
+ io->data = w;
+ io->offset = w->offset;
+ io->len = w->size;
+ io->buf = w->buf;
+ io->cb = _wbuf_cb;
+
+ extstore_submit(e, io);
+}
+
+/* engine write function; takes engine, item_io.
+ * fast fail if no available write buffer (flushing)
+ * lock engine context, find active page, unlock
+ * if page full, submit page/buffer to io thread.
+ *
+ * write is designed to be flaky; if page full, caller must try again to get
+ * new page. best if used from a background thread that can harmlessly retry.
+ */
+
+int extstore_write(void *ptr, unsigned int bucket, obj_io *io) {
+ store_engine *e = (store_engine *)ptr;
+ store_page *p;
+ int ret = -1;
+ if (bucket >= e->page_bucketcount)
+ return ret;
+
+ pthread_mutex_lock(&e->mutex);
+ p = e->page_buckets[bucket];
+ if (!p) {
+ p = _allocate_page(e, bucket);
+ }
+ pthread_mutex_unlock(&e->mutex);
+ if (!p)
+ return ret;
+
+ pthread_mutex_lock(&p->mutex);
+
+ // FIXME: can't null out page_buckets!!!
+ // page is full, clear bucket and retry later.
+ if (!p->active ||
+ ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
+ pthread_mutex_unlock(&p->mutex);
+ pthread_mutex_lock(&e->mutex);
+ _allocate_page(e, bucket);
+ pthread_mutex_unlock(&e->mutex);
+ return ret;
+ }
+
+ // if io won't fit, submit IO for wbuf and find new one.
+ if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
+ _submit_wbuf(e, p);
+ p->wbuf->full = true;
+ }
+
+ if (!p->wbuf && p->allocated < e->page_size) {
+ _allocate_wbuf(e, p);
+ }
+
+ // memcpy into wbuf
+ if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
+ memcpy(p->wbuf->buf_pos, io->buf, io->len);
+ io->page_id = p->id;
+ io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
+ io->page_version = p->version;
+ p->wbuf->buf_pos += io->len;
+ p->wbuf->free -= io->len;
+ p->bytes_used += io->len;
+ p->obj_count++;
+ STAT_L(e);
+ e->stats.bytes_written += io->len;
+ e->stats.bytes_used += io->len;
+ e->stats.objects_written++;
+ e->stats.objects_used++;
+ STAT_UL(e);
+ ret = 0;
+ }
+
+ pthread_mutex_unlock(&p->mutex);
+ // p->written is incremented post-wbuf flush
+ return ret;
+}
+
+/* engine submit function; takes engine, item_io stack.
+ * lock io_thread context and add stack?
+ * signal io thread to wake.
+ * return sucess.
+ */
+int extstore_submit(void *ptr, obj_io *io) {
+ store_engine *e = (store_engine *)ptr;
+ store_io_thread *t = _get_io_thread(e);
+
+ pthread_mutex_lock(&t->mutex);
+ if (t->queue == NULL) {
+ t->queue = io;
+ } else {
+ /* Have to put the *io stack at the end of current queue.
+ * FIXME: Optimize by tracking tail.
+ */
+ obj_io *tmp = t->queue;
+ while (tmp->next != NULL) {
+ tmp = tmp->next;
+ assert(tmp != t->queue);
+ }
+ tmp->next = io;
+ }
+ pthread_mutex_unlock(&t->mutex);
+
+ //pthread_mutex_lock(&t->mutex);
+ pthread_cond_signal(&t->cond);
+ //pthread_mutex_unlock(&t->mutex);
+ return 0;
+}
+
+/* engine note delete function: takes engine, page id, size?
+ * note that an item in this page is no longer valid
+ */
+int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
+ unsigned int count, unsigned int bytes) {
+ store_engine *e = (store_engine *)ptr;
+ // FIXME: validate page_id in bounds
+ store_page *p = &e->pages[page_id];
+ int ret = 0;
+
+ pthread_mutex_lock(&p->mutex);
+ if (!p->closed && p->version == page_version) {
+ if (p->bytes_used >= bytes) {
+ p->bytes_used -= bytes;
+ } else {
+ p->bytes_used = 0;
+ }
+
+ if (p->obj_count >= count) {
+ p->obj_count -= count;
+ } else {
+ p->obj_count = 0; // caller has bad accounting?
+ }
+ STAT_L(e);
+ e->stats.bytes_used -= bytes;
+ e->stats.objects_used -= count;
+ STAT_UL(e);
+
+ if (p->obj_count == 0) {
+ extstore_run_maint(e);
+ }
+ } else {
+ ret = -1;
+ }
+ pthread_mutex_unlock(&p->mutex);
+ return ret;
+}
+
+int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
+ store_engine *e = (store_engine *)ptr;
+ store_page *p = &e->pages[page_id];
+ int ret = 0;
+
+ pthread_mutex_lock(&p->mutex);
+ if (p->version != page_version)
+ ret = -1;
+ pthread_mutex_unlock(&p->mutex);
+ return ret;
+}
+
+/* allows a compactor to say "we're done with this page, kill it. */
+void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
+ store_engine *e = (store_engine *)ptr;
+ store_page *p = &e->pages[page_id];
+
+ pthread_mutex_lock(&p->mutex);
+ if (!p->closed && p->version == page_version) {
+ p->closed = true;
+ extstore_run_maint(e);
+ }
+ pthread_mutex_unlock(&p->mutex);
+}
+
+/* Finds an attached wbuf that can satisfy the read.
+ * Since wbufs can potentially be flushed to disk out of order, they are only
+ * removed as the head of the list successfuly flushes to disk.
+ */
+// call with *p locked
+// FIXME: protect from reading past wbuf
+static inline int _read_from_wbuf(store_page *p, obj_io *io) {
+ _store_wbuf *wbuf = p->wbuf;
+ assert(wbuf != NULL);
+ assert(io->offset < p->written + wbuf->size);
+ memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
+ return io->len;
+}
+
+/* engine IO thread; takes engine context
+ * manage writes/reads
+ * runs IO callbacks inline after each IO
+ */
+// FIXME: protect from reading past page
+static void *extstore_io_thread(void *arg) {
+ store_io_thread *me = (store_io_thread *)arg;
+ store_engine *e = me->e;
+ while (1) {
+ obj_io *io_stack = NULL;
+ pthread_mutex_lock(&me->mutex);
+ if (me->queue == NULL) {
+ pthread_cond_wait(&me->cond, &me->mutex);
+ }
+
+ // Pull and disconnect a batch from the queue
+ if (me->queue != NULL) {
+ int i;
+ obj_io *end = NULL;
+ io_stack = me->queue;
+ end = io_stack;
+ for (i = 1; i < e->io_depth; i++) {
+ if (end->next) {
+ end = end->next;
+ } else {
+ break;
+ }
+ }
+ me->queue = end->next;
+ end->next = NULL;
+ }
+ pthread_mutex_unlock(&me->mutex);
+
+ obj_io *cur_io = io_stack;
+ while (cur_io) {
+ // We need to note next before the callback in case the obj_io
+ // gets reused.
+ obj_io *next = cur_io->next;
+ int ret = 0;
+ int do_op = 1;
+ store_page *p = &e->pages[cur_io->page_id];
+ // TODO: loop if not enough bytes were read/written.
+ switch (cur_io->mode) {
+ case OBJ_IO_READ:
+ // Page is currently open. deal if read is past the end.
+ pthread_mutex_lock(&p->mutex);
+ if (!p->free && !p->closed && p->version == cur_io->page_version) {
+ if (p->active && cur_io->offset >= p->written) {
+ ret = _read_from_wbuf(p, cur_io);
+ do_op = 0;
+ } else {
+ p->refcount++;
+ }
+ STAT_L(e);
+ e->stats.bytes_read += cur_io->len;
+ e->stats.objects_read++;
+ STAT_UL(e);
+ } else {
+ do_op = 0;
+ ret = -2; // TODO: enum in IO for status?
+ }
+ pthread_mutex_unlock(&p->mutex);
+ if (do_op)
+ ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
+ break;
+ case OBJ_IO_WRITE:
+ do_op = 0;
+ // FIXME: Should hold refcount during write. doesn't
+ // currently matter since page can't free while active.
+ ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
+ break;
+ }
+ if (ret == 0) {
+ E_DEBUG("read returned nothing\n");
+ }
+
+#ifdef EXTSTORE_DEBUG
+ if (ret == -1) {
+ perror("read/write op failed");
+ }
+#endif
+ cur_io->cb(e, cur_io, ret);
+ if (do_op) {
+ pthread_mutex_lock(&p->mutex);
+ p->refcount--;
+ pthread_mutex_unlock(&p->mutex);
+ }
+ cur_io = next;
+ }
+ }
+
+ return NULL;
+}
+
+// call with *p locked.
+static void _free_page(store_engine *e, store_page *p) {
+ store_page *tmp = NULL;
+ store_page *prev = NULL;
+ E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
+ STAT_L(e);
+ e->stats.objects_used -= p->obj_count;
+ e->stats.bytes_used -= p->bytes_used;
+ e->stats.page_reclaims++;
+ STAT_UL(e);
+ pthread_mutex_lock(&e->mutex);
+ // unlink page from bucket list
+ tmp = e->page_buckets[p->bucket];
+ while (tmp) {
+ if (tmp == p) {
+ if (prev) {
+ prev->next = tmp->next;
+ } else {
+ e->page_buckets[p->bucket] = tmp->next;
+ }
+ tmp->next = NULL;
+ break;
+ }
+ prev = tmp;
+ tmp = tmp->next;
+ }
+ // reset most values
+ p->version = 0;
+ p->obj_count = 0;
+ p->bytes_used = 0;
+ p->allocated = 0;
+ p->written = 0;
+ p->bucket = 0;
+ p->active = false;
+ p->closed = false;
+ p->free = true;
+ // add to page stack
+ p->next = e->page_freelist;
+ e->page_freelist = p;
+ e->page_free++;
+ pthread_mutex_unlock(&e->mutex);
+}
+
+/* engine maint thread; takes engine context.
+ * Uses version to ensure oldest possible objects are being evicted.
+ * Needs interface to inform owner of pages with fewer objects or most space
+ * free, which can then be actively compacted to avoid eviction.
+ *
+ * This gets called asynchronously after every page allocation. Could run less
+ * often if more pages are free.
+ *
+ * Another allocation call is required if an attempted free didn't happen
+ * due to the page having a refcount.
+ */
+
+// TODO: Don't over-evict pages if waiting on refcounts to drop
+static void *extstore_maint_thread(void *arg) {
+ store_maint_thread *me = (store_maint_thread *)arg;
+ store_engine *e = me->e;
+ struct extstore_page_data *pd =
+ calloc(e->page_count, sizeof(struct extstore_page_data));
+ pthread_mutex_lock(&me->mutex);
+ while (1) {
+ int i;
+ bool do_evict = false;
+ unsigned int low_page = 0;
+ uint64_t low_version = ULLONG_MAX;
+
+ pthread_cond_wait(&me->cond, &me->mutex);
+ pthread_mutex_lock(&e->mutex);
+ if (e->page_free == 0) {
+ do_evict = true;
+ }
+ pthread_mutex_unlock(&e->mutex);
+ memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count);
+
+ for (i = 0; i < e->page_count; i++) {
+ store_page *p = &e->pages[i];
+ pthread_mutex_lock(&p->mutex);
+ if (p->active || p->free) {
+ pthread_mutex_unlock(&p->mutex);
+ continue;
+ }
+ if (p->obj_count > 0 && !p->closed) {
+ pd[p->id].version = p->version;
+ pd[p->id].bytes_used = p->bytes_used;
+ if (p->version < low_version) {
+ low_version = p->version;
+ low_page = i;
+ }
+ }
+ if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
+ _free_page(e, p);
+ // Found a page to free, no longer need to evict.
+ do_evict = false;
+ }
+ pthread_mutex_unlock(&p->mutex);
+ }
+
+ if (do_evict && low_version != ULLONG_MAX) {
+ store_page *p = &e->pages[low_page];
+ E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
+ p->id, (unsigned long long) p->version);
+ pthread_mutex_lock(&p->mutex);
+ if (!p->closed) {
+ p->closed = true;
+ STAT_L(e);
+ e->stats.page_evictions++;
+ e->stats.objects_evicted += p->obj_count;
+ e->stats.bytes_evicted += p->bytes_used;
+ STAT_UL(e);
+ if (p->refcount == 0) {
+ _free_page(e, p);
+ }
+ }
+ pthread_mutex_unlock(&p->mutex);
+ }
+
+ // copy the page data into engine context so callers can use it from
+ // the stats lock.
+ STAT_L(e);
+ memcpy(e->stats.page_data, pd,
+ sizeof(struct extstore_page_data) * e->page_count);
+ STAT_UL(e);
+ }
+
+ return NULL;
+}
diff --git a/extstore.h b/extstore.h
new file mode 100644
index 0000000..1088101
--- /dev/null
+++ b/extstore.h
@@ -0,0 +1,94 @@
+#ifndef EXTSTORE_H
+#define EXTSTORE_H
+
+/* A safe-to-read dataset for determining compaction.
+ * id is the array index.
+ */
+struct extstore_page_data {
+ uint64_t version;
+ uint64_t bytes_used;
+};
+
+/* Pages can have objects deleted from them at any time. This creates holes
+ * that can't be reused until the page is either evicted or all objects are
+ * deleted.
+ * bytes_fragmented is the total bytes for all of these holes.
+ * It is the size of all used pages minus each page's bytes_used value.
+ */
+struct extstore_stats {
+ uint64_t page_allocs;
+ uint64_t page_count; /* total page count */
+ uint64_t page_evictions;
+ uint64_t page_reclaims;
+ uint64_t page_size; /* size in bytes per page (supplied by caller) */
+ uint64_t pages_free; /* currently unallocated/unused pages */
+ uint64_t pages_used;
+ uint64_t objects_evicted;
+ uint64_t objects_read;
+ uint64_t objects_written;
+ uint64_t objects_used; /* total number of objects stored */
+ uint64_t bytes_evicted;
+ uint64_t bytes_written;
+ uint64_t bytes_read; /* wbuf - read -> bytes read from storage */
+ uint64_t bytes_used; /* total number of bytes stored */
+ uint64_t bytes_fragmented; /* see above comment */
+ struct extstore_page_data *page_data;
+};
+
+// TODO: Temporary configuration structure. A "real" library should have an
+// extstore_set(enum, void *ptr) which hides the implementation.
+// this is plenty for quick development.
+struct extstore_conf {
+ unsigned int page_size; // ideally 64-256M in size
+ unsigned int page_count;
+ unsigned int page_buckets; // number of different writeable pages
+ unsigned int wbuf_size; // must divide cleanly into page_size
+ unsigned int wbuf_count; // this might get locked to "2 per active page"
+ unsigned int io_threadcount;
+ unsigned int io_depth; // with normal I/O, hits locks less. req'd for AIO
+};
+
+enum obj_io_mode {
+ OBJ_IO_READ = 0,
+ OBJ_IO_WRITE,
+};
+
+typedef struct _obj_io obj_io;
+typedef void (*obj_io_cb)(void *e, obj_io *io, int ret);
+
+/* An object for both reads and writes to the storage engine.
+ * Once an IO is submitted, ->next may be changed by the IO thread. It is not
+ * safe to further modify the IO stack until the entire request is completed.
+ */
+struct _obj_io {
+ void *data; /* user supplied data pointer */
+ struct _obj_io *next;
+ char *buf; /* buffer of data to read or write to */
+ unsigned int page_version; /* page version for read mode */
+ unsigned int len; /* for both modes */
+ unsigned int offset; /* for read mode */
+ unsigned short page_id; /* for read mode */
+ enum obj_io_mode mode;
+ /* callback pointers? */
+ obj_io_cb cb;
+};
+
+
+void *extstore_init(char *fn, struct extstore_conf *cf);
+int extstore_write(void *ptr, unsigned int bucket, obj_io *io);
+int extstore_submit(void *ptr, obj_io *io);
+/* count are the number of objects being removed, bytes are the original
+ * length of those objects. Bytes is optional but you can't track
+ * fragmentation without it.
+ */
+int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version);
+int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version, unsigned int count, unsigned int bytes);
+void extstore_get_stats(void *ptr, struct extstore_stats *st);
+/* add page data array to a stats structure.
+ * caller must allocate its stats.page_data memory first.
+ */
+void extstore_get_page_data(void *ptr, struct extstore_stats *st);
+void extstore_run_maint(void *ptr);
+void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version);
+
+#endif
diff --git a/items.c b/items.c
index 4fe7857..f02ccb9 100644
--- a/items.c
+++ b/items.c
@@ -2,6 +2,9 @@
#include "memcached.h"
#include "bipbuffer.h"
#include "slab_automove.h"
+#ifdef EXTSTORE
+#include "storage.h"
+#endif
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/resource.h>
@@ -960,6 +963,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
was_found = 1;
if (item_is_flushed(it)) {
do_item_unlink(it, hv);
+ STORAGE_delete(c->thread->storage, it);
do_item_remove(it);
it = NULL;
pthread_mutex_lock(&c->thread->stats.mutex);
@@ -971,6 +975,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
was_found = 2;
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
+ STORAGE_delete(c->thread->storage, it);
do_item_remove(it);
it = NULL;
pthread_mutex_lock(&c->thread->stats.mutex);
@@ -1489,6 +1494,10 @@ static pthread_t lru_maintainer_tid;
#define MIN_LRU_MAINTAINER_SLEEP 1000
static void *lru_maintainer_thread(void *arg) {
+#ifdef EXTSTORE
+ void *storage = arg;
+ int x;
+#endif
int i;
useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
useconds_t last_sleep = MIN_LRU_MAINTAINER_SLEEP;
@@ -1542,6 +1551,18 @@ static void *lru_maintainer_thread(void *arg) {
}
int did_moves = lru_maintainer_juggle(i);
+#ifdef EXTSTORE
+ // Deeper loop to speed up pushing to storage.
+ for (x = 0; x < 500; x++) {
+ int found;
+ found = lru_maintainer_store(storage, i);
+ if (found) {
+ did_moves += found;
+ } else {
+ break;
+ }
+ }
+#endif
if (did_moves == 0) {
if (backoff_juggles[i] != 0) {
backoff_juggles[i] += backoff_juggles[i] / 8;
diff --git a/items.h b/items.h
index e4c6ecd..bf126d7 100644
--- a/items.h
+++ b/items.h
@@ -80,3 +80,16 @@ void lru_maintainer_pause(void);
void lru_maintainer_resume(void);
void *lru_bump_buf_create(void);
+
+#ifdef EXTSTORE
+#define STORAGE_delete(e, it) \
+ do { \
+ if (it->it_flags & ITEM_HDR) { \
+ item_hdr *hdr = (item_hdr *)ITEM_data(it); \
+ extstore_delete(e, hdr->page_id, hdr->page_version, \
+ 1, ITEM_ntotal(it)); \
+ } \
+ } while (0)
+#else
+#define STORAGE_delete(...)
+#endif
diff --git a/logger.c b/logger.c
index fdc2c19..b9165eb 100644
--- a/logger.c
+++ b/logger.c
@@ -54,7 +54,27 @@ static const entry_details default_entries[] = {
},
[LOGGER_SLAB_MOVE] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
"type=slab_move src=%d dst=%d"
- }
+ },
+#ifdef EXTSTORE
+ [LOGGER_COMPACT_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_start id=%lu version=%llu"
+ },
+ [LOGGER_COMPACT_ABORT] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_abort id=%lu"
+ },
+ [LOGGER_COMPACT_READ_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_read_start id=%lu offset=%llu"
+ },
+ [LOGGER_COMPACT_READ_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu"
+ },
+ [LOGGER_COMPACT_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_end id=%lu"
+ },
+ [LOGGER_COMPACT_FRAGINFO] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
+ "type=compact_fraginfo ratio=%.2f bytes=%lu"
+ },
+#endif
};
#define WATCHER_ALL -1
diff --git a/logger.h b/logger.h
index cd3dcbd..f528327 100644
--- a/logger.h
+++ b/logger.h
@@ -20,6 +20,14 @@ enum log_entry_type {
LOGGER_ITEM_STORE,
LOGGER_CRAWLER_STATUS,
LOGGER_SLAB_MOVE,
+#ifdef EXTSTORE
+ LOGGER_COMPACT_START,
+ LOGGER_COMPACT_ABORT,
+ LOGGER_COMPACT_READ_START,
+ LOGGER_COMPACT_READ_END,
+ LOGGER_COMPACT_END,
+ LOGGER_COMPACT_FRAGINFO,
+#endif
};
enum log_entry_subtype {
diff --git a/memcached.c b/memcached.c
index 2616ddb..80edcfa 100644
--- a/memcached.c
+++ b/memcached.c
@@ -14,6 +14,9 @@
* Brad Fitzpatrick <brad@danga.com>
*/
#include "memcached.h"
+#ifdef EXTSTORE
+#include "storage.h"
+#endif
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -472,9 +475,18 @@ void conn_worker_readd(conn *c) {
event_base_set(c->thread->base, &c->event);
c->state = conn_new_cmd;
+ // TODO: call conn_cleanup/fail/etc
if (event_add(&c->event, 0) == -1) {
perror("event_add");
}
+#ifdef EXTSTORE
+ // If we had IO objects, process
+ if (c->io_wraplist) {
+ //assert(c->io_wrapleft == 0); // assert no more to process
+ conn_set_state(c, conn_mwrite);
+ drive_machine(c); // FIXME: Again, is it really this easy?
+ }
+#endif
}
conn *conn_new(const int sfd, enum conn_states init_state,
@@ -592,6 +604,10 @@ conn *conn_new(const int sfd, enum conn_states init_state,
c->msgused = 0;
c->authenticated = false;
c->last_cmd_time = current_time; /* initialize for idle kicker */
+#ifdef EXTSTORE
+ c->io_wraplist = NULL;
+ c->io_wrapleft = 0;
+#endif
c->write_and_go = init_state;
c->write_and_free = 0;
@@ -617,7 +633,60 @@ conn *conn_new(const int sfd, enum conn_states init_state,
return c;
}
+#ifdef EXTSTORE
+static void recache_or_free(conn *c, io_wrap *wrap) {
+ item *it = (item *)wrap->io.buf;
+ // If request was ultimately a miss, unlink the header.
+ if (wrap->miss) {
+ size_t ntotal = ITEM_ntotal(wrap->hdr_it);
+ item_unlink(wrap->hdr_it);
+ slabs_free(it, ntotal, slabs_clsid(ntotal));
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.miss_from_extstore++;
+ if (wrap->badcrc)
+ c->thread->stats.badcrc_from_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+ } else {
+ // hashvalue is cuddled during store
+ uint32_t hv = (uint32_t)it->time;
+ bool do_free = true;
+ // opt to throw away rather than wait on a lock.
+ void *hold_lock = item_trylock(hv);
+ if (hold_lock != NULL) {
+ item *h_it = wrap->hdr_it;
+ uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
+ // Item must be recently hit at least twice to recache.
+ if (((h_it->it_flags & flags) == flags) &&
+ h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
+ c->recache_counter++ % settings.ext_recache_rate == 0) {
+ do_free = false;
+ // In case it's been updated.
+ it->exptime = h_it->exptime;
+ it->it_flags &= ~ITEM_LINKED;
+ it->refcount = 0;
+ it->h_next = NULL; // might not be necessary.
+ STORAGE_delete(c->thread->storage, h_it);
+ item_replace(h_it, it, hv);
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.recache_from_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+ }
+ }
+
+ if (do_free)
+ slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
+ if (hold_lock)
+ item_trylock_unlock(hold_lock);
+ }
+ wrap->io.buf = NULL; // sanity.
+ wrap->io.next = NULL;
+ wrap->next = NULL;
+ wrap->active = false;
+ // TODO: reuse lock and/or hv.
+ item_remove(wrap->hdr_it);
+}
+#endif
static void conn_release_items(conn *c) {
assert(c != NULL);
@@ -639,7 +708,18 @@ static void conn_release_items(conn *c) {
do_cache_free(c->thread->suffix_cache, *(c->suffixcurr));
}
}
-
+#ifdef EXTSTORE
+ if (c->io_wraplist) {
+ io_wrap *tmp = c->io_wraplist;
+ while (tmp) {
+ io_wrap *next = tmp->next;
+ recache_or_free(c, tmp);
+ do_cache_free(c->thread->io_cache, tmp); // lockless
+ tmp = next;
+ }
+ c->io_wraplist = NULL;
+ }
+#endif
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
}
@@ -2298,6 +2378,7 @@ static void process_bin_update(conn *c) {
it = item_get(key, nkey, c, DONT_UPDATE);
if (it) {
item_unlink(it);
+ STORAGE_delete(c->thread->storage, it);
item_remove(it);
}
}
@@ -2456,6 +2537,7 @@ static void process_bin_delete(conn *c) {
c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_unlink(it);
+ STORAGE_delete(c->thread->storage, it);
write_bin_response(c, NULL, 0, 0, 0);
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
@@ -2676,6 +2758,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
+ STORAGE_delete(c->thread->storage, old_it);
item_replace(old_it, it, hv);
stored = STORED;
} else {
@@ -2735,10 +2818,12 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
}
if (stored == NOT_STORED && failed_alloc == 0) {
- if (old_it != NULL)
+ if (old_it != NULL) {
+ STORAGE_delete(c->thread->storage, old_it);
item_replace(old_it, it, hv);
- else
+ } else {
do_item_link(it, hv);
+ }
c->cas = ITEM_get_cas(it);
@@ -2911,7 +2996,9 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
threadlocal_stats_aggregate(&thread_stats);
struct slab_stats slab_stats;
slab_stats_aggregate(&thread_stats, &slab_stats);
-
+#ifdef EXTSTORE
+ struct extstore_stats st;
+#endif
#ifndef WIN32
struct rusage usage;
getrusage(RUSAGE_SELF, &usage);
@@ -2951,6 +3038,12 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
APPEND_STAT("get_expired", "%llu", (unsigned long long)thread_stats.get_expired);
APPEND_STAT("get_flushed", "%llu", (unsigned long long)thread_stats.get_flushed);
+#ifdef EXTSTORE
+ APPEND_STAT("get_extstore", "%llu", (unsigned long long)thread_stats.get_extstore);
+ APPEND_STAT("recache_from_extstore", "%llu", (unsigned long long)thread_stats.recache_from_extstore);
+ APPEND_STAT("miss_from_extstore", "%llu", (unsigned long long)thread_stats.miss_from_extstore);
+ APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore);
+#endif
APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
@@ -3002,6 +3095,23 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_watcher_skipped);
APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watcher_sent);
STATS_UNLOCK();
+#ifdef EXTSTORE
+ extstore_get_stats(c->thread->storage, &st);
+ APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
+ APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
+ APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
+ APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
+ APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
+ APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
+ APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
+ APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
+ APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
+ APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
+ APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
+ APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
+ APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
+ APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
+#endif
}
static void process_stat_settings(ADD_STAT add_stats, void *c) {
@@ -3336,6 +3446,128 @@ static inline char *_ascii_get_suffix_buf(conn *c, int i) {
*(c->suffixlist + i) = suffix;
return suffix;
}
+#ifdef EXTSTORE
+// FIXME: This runs in the IO thread. to get better IO performance this should
+// simply mark the io wrapper with the return value and decrement wrapleft, if
+// zero redispatching. Still a bit of work being done in the side thread but
+// minimized at least.
+static void _ascii_get_extstore_cb(void *e, obj_io *io, int ret) {
+ // FIXME: assumes success
+ io_wrap *wrap = (io_wrap *)io->data;
+ conn *c = wrap->c;
+ assert(wrap->active == true);
+ item *read_it = (item *)io->buf;
+ bool miss = false;
+
+ // TODO: How to do counters for hit/misses?
+ if (ret < 1) {
+ miss = true;
+ } else {
+ uint32_t crc = (uint32_t) read_it->exptime;
+ read_it->exptime = 0; // to match what was crc'ed
+ uint32_t crc2 = crc32c(0, (char *)read_it+24, io->len-24);
+ if (crc != crc2) {
+ miss = true;
+ wrap->badcrc = true;
+ }
+ }
+
+ if (miss) {
+ int i;
+ struct iovec *v;
+ for (i = 0; i < wrap->iovec_count; i++) {
+ v = &c->iov[wrap->iovec_start + i];
+ v->iov_len = 0;
+ v->iov_base = NULL;
+ }
+ wrap->miss = true;
+ } else {
+ assert(read_it->slabs_clsid != 0);
+ c->iov[wrap->iovec_data].iov_base = ITEM_data(read_it);
+ // iov_len is already set
+ // TODO: Should do that here instead and cuddle in the wrap object
+ }
+ c->io_wrapleft--;
+ wrap->active = false;
+ //assert(c->io_wrapleft >= 0);
+
+ // All IO's have returned, lets re-attach this connection to our original
+ // thread.
+ if (c->io_wrapleft == 0) {
+ assert(c->io_queued == true);
+ c->io_queued = false;
+ // FIXME: Is it really this easy?
+ // I have worries this won't be performant enough under load though.
+ // Can cause the IO threads to block if the pipes are full, too.
+ redispatch_conn(c);
+ }
+}
+
+// FIXME: This completely breaks UDP support.
+static inline int _ascii_get_extstore(conn *c, item *it) {
+ item_hdr *hdr = (item_hdr *)ITEM_data(it);
+ size_t ntotal = ITEM_ntotal(it);
+ unsigned int clsid = slabs_clsid(ntotal);
+ item *new_it = do_item_alloc_pull(ntotal, clsid);
+ if (new_it == NULL)
+ return -1;
+ assert(!c->io_queued); // FIXME: debugging.
+ // so we can free the chunk on a miss
+ new_it->slabs_clsid = clsid;
+
+ io_wrap *io = do_cache_alloc(c->thread->io_cache);
+ io->active = true;
+ io->miss = false;
+ io->badcrc = false;
+ // io_wrap owns the reference for this object now.
+ io->hdr_it = it;
+
+ // FIXME: error handling.
+ // The offsets we'll wipe on a miss.
+ io->iovec_start = c->iovused - 3;
+ io->iovec_count = 4;
+ // FIXME: error handling.
+ // This is probably super dangerous. keep it at 0 and fill into wrap
+ // object?
+ io->iovec_data = c->iovused;
+ add_iov(c, "", it->nbytes);
+ // The offset we'll fill in on a hit.
+ io->c = c;
+ // We need to stack the sub-struct IO's together as well.
+ if (c->io_wraplist) {
+ io->io.next = &c->io_wraplist->io;
+ } else {
+ io->io.next = NULL;
+ }
+ // IO queue for this connection.
+ io->next = c->io_wraplist;
+ c->io_wraplist = io;
+ assert(c->io_wrapleft >= 0);
+ c->io_wrapleft++;
+ // reference ourselves for the callback.
+ io->io.data = (void *)io;
+ io->io.buf = (void *)new_it;
+
+ // Now, fill in io->io based on what was in our header.
+ io->io.page_version = hdr->page_version;
+ io->io.page_id = hdr->page_id;
+ io->io.offset = hdr->offset;
+ io->io.len = ITEM_ntotal(it);
+ io->io.mode = OBJ_IO_READ;
+
+ // TODO: set the callback function
+ io->io.cb = _ascii_get_extstore_cb;
+ //fprintf(stderr, "EXTSTORE: IO stacked %u\n", io->iovec_data);
+ // FIXME: This stat needs to move to reflect # of flash hits vs misses
+ // for now it's a good gauge on how often we request out to flash at
+ // least.
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_extstore++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+
+ return 0;
+}
+#endif
// FIXME: the 'breaks' around memory malloc's should break all the way down,
// fill ileft/suffixleft, then run conn_releaseitems()
/* ntokens is overwritten here... shrug.. */
@@ -3417,7 +3649,16 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
item_remove(it);
break;
}
+#ifdef EXTSTORE
+ if (it->it_flags & ITEM_HDR) {
+ if (_ascii_get_extstore(c, it) != 0) {
+ item_remove(it);
+ break;
+ }
+ } else if ((it->it_flags & ITEM_CHUNKED) == 0) {
+#else
if ((it->it_flags & ITEM_CHUNKED) == 0) {
+#endif
add_iov(c, ITEM_data(it), it->nbytes);
} else if (add_chunked_item_iovs(c, it, it->nbytes) != 0) {
item_remove(it);
@@ -3468,9 +3709,16 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
c->thread->stats.get_cmds++;
}
pthread_mutex_unlock(&c->thread->stats.mutex);
+#ifdef EXTSTORE
+ /* If ITEM_HDR, an io_wrap owns the reference. */
+ if ((it->it_flags & ITEM_HDR) == 0) {
+ *(c->ilist + i) = it;
+ i++;
+ }
+#else
*(c->ilist + i) = it;
i++;
-
+#endif
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
if (should_touch) {
@@ -3602,6 +3850,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
it = item_get(key, nkey, c, DONT_UPDATE);
if (it) {
item_unlink(it);
+ STORAGE_delete(c->thread->storage, it);
item_remove(it);
}
}
@@ -3869,6 +4118,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
pthread_mutex_unlock(&c->thread->stats.mutex);
item_unlink(it);
+ STORAGE_delete(c->thread->storage, it);
item_remove(it); /* release our reference */
out_string(c, "DELETED");
} else {
@@ -4072,7 +4322,38 @@ static void process_lru_command(conn *c, token_t *tokens, const size_t ntokens)
out_string(c, "ERROR");
}
}
-
+#ifdef EXTSTORE
+static void process_extstore_command(conn *c, token_t *tokens, const size_t ntokens) {
+ set_noreply_maybe(c, tokens, ntokens);
+ if (strcmp(tokens[1].value, "item_size") == 0 && ntokens >= 3) {
+ if (!safe_strtoul(tokens[2].value, &settings.ext_item_size)) {
+ out_string(c, "ERROR");
+ } else {
+ out_string(c, "OK");
+ }
+ } else if (strcmp(tokens[1].value, "item_age") == 0 && ntokens >= 3) {
+ if (!safe_strtoul(tokens[2].value, &settings.ext_item_age)) {
+ out_string(c, "ERROR");
+ } else {
+ out_string(c, "OK");
+ }
+ } else if (strcmp(tokens[1].value, "recache_rate") == 0 && ntokens >= 3) {
+ if (!safe_strtoul(tokens[2].value, &settings.ext_recache_rate)) {
+ out_string(c, "ERROR");
+ } else {
+ out_string(c, "OK");
+ }
+ } else if (strcmp(tokens[1].value, "max_frag") == 0 && ntokens >= 3) {
+ if (!safe_strtod(tokens[2].value, &settings.ext_max_frag)) {
+ out_string(c, "ERROR");
+ } else {
+ out_string(c, "OK");
+ }
+ } else {
+ out_string(c, "ERROR");
+ }
+}
+#endif
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
@@ -4371,6 +4652,10 @@ static void process_command(conn *c, char *command) {
} else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "misbehave") == 0)) {
process_misbehave_command(c);
#endif
+#ifdef EXTSTORE
+ } else if (ntokens >= 3 && strcmp(tokens[COMMAND_TOKEN].value, "extstore") == 0) {
+ process_extstore_command(c, tokens, ntokens);
+#endif
} else {
if (ntokens >= 2 && strncmp(tokens[ntokens - 2].value, "HTTP/", 5) == 0) {
conn_set_state(c, conn_closing);
@@ -5126,6 +5411,23 @@ static void drive_machine(conn *c) {
/* fall through... */
case conn_mwrite:
+#ifdef EXTSTORE
+ /* have side IO's that must process before transmit() can run.
+ * remove the connection from the worker thread and dispatch the
+ * IO queue
+ */
+ if (c->io_wrapleft) {
+ assert(c->io_queued == false);
+ assert(c->io_wraplist != NULL);
+ // TODO: create proper state for this condition
+ conn_set_state(c, conn_watch);
+ event_del(&c->event);
+ c->io_queued = true;
+ extstore_submit(c->thread->storage, &c->io_wraplist->io);
+ stop = true;
+ break;
+ }
+#endif
if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
if (settings.verbose > 0)
fprintf(stderr, "Failed to build UDP headers\n");
@@ -5983,7 +6285,11 @@ int main (int argc, char **argv) {
bool use_slab_sizes = false;
char *slab_sizes_unparsed = NULL;
bool slab_chunk_size_changed = false;
-
+#ifdef EXTSTORE
+ void *storage = NULL;
+ char *storage_file = "/dev/shm/extstore";
+ struct extstore_conf ext_cf;
+#endif
char *subopts, *subopts_orig;
char *subopts_value;
enum {
@@ -6025,6 +6331,19 @@ int main (int argc, char **argv) {
#ifdef MEMCACHED_DEBUG
RELAXED_PRIVILEGES,
#endif
+#ifdef EXTSTORE
+ EXT_PAGE_SIZE,
+ EXT_PAGE_COUNT,
+ EXT_WBUF_SIZE,
+ EXT_WBUF_COUNT,
+ EXT_THREADS,
+ EXT_IO_DEPTH,
+ EXT_PATH,
+ EXT_ITEM_SIZE,
+ EXT_ITEM_AGE,
+ EXT_RECACHE_RATE,
+ EXT_MAX_FRAG,
+#endif
};
char *const subopts_tokens[] = {
[MAXCONNS_FAST] = "maxconns_fast",
@@ -6065,6 +6384,19 @@ int main (int argc, char **argv) {
#ifdef MEMCACHED_DEBUG
[RELAXED_PRIVILEGES] = "relaxed_privileges",
#endif
+#ifdef EXTSTORE
+ [EXT_PAGE_SIZE] = "ext_page_size",
+ [EXT_PAGE_COUNT] = "ext_page_count",
+ [EXT_WBUF_SIZE] = "ext_wbuf_size",
+ [EXT_WBUF_COUNT] = "ext_wbuf_count",
+ [EXT_THREADS] = "ext_threads",
+ [EXT_IO_DEPTH] = "ext_io_depth",
+ [EXT_PATH] = "ext_path",
+ [EXT_ITEM_SIZE] = "ext_item_size",
+ [EXT_ITEM_AGE] = "ext_item_age",
+ [EXT_RECACHE_RATE] = "ext_recache_rate",
+ [EXT_MAX_FRAG] = "ext_max_frag",
+#endif
NULL
};
@@ -6078,9 +6410,22 @@ int main (int argc, char **argv) {
/* init settings */
settings_init();
+#ifdef EXTSTORE
+ settings.ext_item_size = 512;
+ settings.ext_item_age = 0;
+ settings.ext_recache_rate = 2;
+ settings.ext_max_frag = 0.9;
+ settings.ext_wbuf_size = 1024 * 1024 * 8;
+ ext_cf.page_size = 1024 * 1024 * 64;
+ ext_cf.page_count = 64;
+ ext_cf.wbuf_size = settings.ext_wbuf_size;
+ ext_cf.wbuf_count = 2;
+ ext_cf.io_threadcount = 1;
+ ext_cf.io_depth = 1;
+ ext_cf.page_buckets = 2;
+#endif
/* Run regardless of initializing it later */
- init_lru_crawler(NULL);
init_lru_maintainer();
/* set stderr non-buffering (for running under, say, daemontools) */
@@ -6604,6 +6949,114 @@ int main (int argc, char **argv) {
start_lru_maintainer = false;
settings.lru_segmented = false;
break;
+#ifdef EXTSTORE
+ case EXT_PAGE_SIZE:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_page_size argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.page_size)) {
+ fprintf(stderr, "could not parse argument to ext_page_size\n");
+ return 1;
+ }
+ ext_cf.page_size *= 1024 * 1024; /* megabytes */
+ break;
+ case EXT_PAGE_COUNT:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_page_count argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.page_count)) {
+ fprintf(stderr, "could not parse argument to ext_page_count\n");
+ return 1;
+ }
+ break;
+ case EXT_WBUF_SIZE:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_wbuf_size argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.wbuf_size)) {
+ fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
+ return 1;
+ }
+ ext_cf.wbuf_size *= 1024 * 1024; /* megabytes */
+ settings.ext_wbuf_size = ext_cf.wbuf_size;
+ break;
+ case EXT_WBUF_COUNT:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_wbuf_count argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.wbuf_count)) {
+ fprintf(stderr, "could not parse argument to ext_wbuf_count\n");
+ return 1;
+ }
+ break;
+ case EXT_THREADS:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_threads argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.io_threadcount)) {
+ fprintf(stderr, "could not parse argument to ext_threads\n");
+ return 1;
+ }
+ break;
+ case EXT_IO_DEPTH:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_io_depth argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &ext_cf.io_depth)) {
+ fprintf(stderr, "could not parse argument to ext_io_depth\n");
+ return 1;
+ }
+ break;
+ case EXT_ITEM_SIZE:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_item_size argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
+ fprintf(stderr, "could not parse argument to ext_item_size\n");
+ return 1;
+ }
+ break;
+ case EXT_ITEM_AGE:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_item_age argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
+ fprintf(stderr, "could not parse argument to ext_item_age\n");
+ return 1;
+ }
+ break;
+ case EXT_RECACHE_RATE:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_recache_rate argument\n");
+ return 1;
+ }
+ if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
+ fprintf(stderr, "could not parse argument to ext_recache_rate\n");
+ return 1;
+ }
+ break;
+ case EXT_MAX_FRAG:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing ext_max_frag argument\n");
+ return 1;
+ }
+ if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
+ fprintf(stderr, "could not parse argument to ext_max_frag\n");
+ return 1;
+ }
+ break;
+ case EXT_PATH:
+ storage_file = strdup(subopts_value);
+ break;
+#endif
case MODERN:
/* currently no new defaults */
break;
@@ -6815,7 +7268,14 @@ int main (int argc, char **argv) {
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate,
use_slab_sizes ? slab_sizes : NULL);
-
+#ifdef EXTSTORE
+ crc32c_init();
+ storage = extstore_init(storage_file, &ext_cf);
+ if (storage == NULL) {
+ fprintf(stderr, "Failed to initialize external storage\n");
+ exit(EXIT_FAILURE);
+ }
+#endif
/*
* ignore SIGPIPE signals; we can use errno == EPIPE if we
* need that information
@@ -6825,18 +7285,31 @@ int main (int argc, char **argv) {
exit(EX_OSERR);
}
/* start up worker threads if MT mode */
+#ifdef EXTSTORE
+ memcached_thread_init(settings.num_threads, storage);
+ init_lru_crawler(storage);
+#else
memcached_thread_init(settings.num_threads, NULL);
+ init_lru_crawler(NULL);
+#endif
if (start_assoc_maint && start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
-
if (start_lru_crawler && start_item_crawler_thread() != 0) {
fprintf(stderr, "Failed to enable LRU crawler thread\n");
exit(EXIT_FAILURE);
}
+#ifdef EXTSTORE
+ if (start_storage_compact_thread(storage) != 0) {
+ fprintf(stderr, "Failed to start storage compaction thread\n");
+ exit(EXIT_FAILURE);
+ }
+ if (start_lru_maintainer && start_lru_maintainer_thread(storage) != 0) {
+#else
if (start_lru_maintainer && start_lru_maintainer_thread(NULL) != 0) {
+#endif
fprintf(stderr, "Failed to enable LRU maintainer thread\n");
return 1;
}
diff --git a/memcached.h b/memcached.h
index 69c3b50..97c78f6 100644
--- a/memcached.h
+++ b/memcached.h
@@ -5,6 +5,8 @@
* structures and function prototypes.
*/
+#define EXTSTORE 1
+
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
@@ -24,6 +26,11 @@
#include "cache.h"
#include "logger.h"
+#ifdef EXTSTORE
+#include "extstore.h"
+#include "crc32c.h"
+#endif
+
#include "sasl_defs.h"
/** Maximum length of a key. */
@@ -266,6 +273,14 @@ struct slab_stats {
X(auth_errors) \
X(idle_kicks) /* idle connections killed */
+#ifdef EXTSTORE
+#define EXTSTORE_THREAD_STATS_FIELDS \
+ X(get_extstore) \
+ X(recache_from_extstore) \
+ X(miss_from_extstore) \
+ X(badcrc_from_extstore)
+#endif
+
/**
* Stats stored per-thread.
*/
@@ -273,6 +288,9 @@ struct thread_stats {
pthread_mutex_t mutex;
#define X(name) uint64_t name;
THREAD_STATS_FIELDS
+#ifdef EXTSTORE
+ EXTSTORE_THREAD_STATS_FIELDS
+#endif
#undef X
struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
uint64_t lru_hits[POWER_LARGEST];
@@ -384,6 +402,13 @@ struct settings {
unsigned int logger_buf_size; /* size of per-thread logger buffer */
bool drop_privileges; /* Whether or not to drop unnecessary process privileges */
bool relaxed_privileges; /* Relax process restrictions when running testapp */
+#ifdef EXTSTORE
+ unsigned int ext_item_size; /* minimum size of items to store externally */
+ unsigned int ext_item_age; /* max age of tail item before storing ext. */
+ unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */
+ unsigned int ext_wbuf_size; /* read only note for the engine */
+ double ext_max_frag; /* ideal maximum page fragmentation */
+#endif
};
extern struct stats stats;
@@ -404,6 +429,10 @@ extern struct settings settings;
/* If an item's storage are chained chunks. */
#define ITEM_CHUNKED 32
#define ITEM_CHUNK 64
+#ifdef EXTSTORE
+/* ITEM_data bulk is external to item */
+#define ITEM_HDR 128
+#endif
/**
* Structure for storing items within memcached.
@@ -471,7 +500,13 @@ typedef struct _strchunk {
uint8_t slabs_clsid; /* Same as above. */
char data[];
} item_chunk;
-
+#ifdef EXTSTORE
+typedef struct {
+ unsigned int page_version; /* from IO header */
+ unsigned int offset; /* from IO header */
+ unsigned short page_id; /* from IO header */
+} item_hdr;
+#endif
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
@@ -481,14 +516,31 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
+#ifdef EXTSTORE
+ cache_t *io_cache; /* IO objects */
+ void *storage; /* data object for storage system */
+#endif
logger *l; /* logger buffer */
void *lru_bump_buf; /* async LRU bump buffer */
} LIBEVENT_THREAD;
-
+typedef struct conn conn;
+#ifdef EXTSTORE
+typedef struct _io_wrap {
+ obj_io io;
+ struct _io_wrap *next;
+ conn *c;
+ item *hdr_it; /* original header item. */
+ unsigned int iovec_start; /* start of the iovecs for this IO */
+ unsigned int iovec_count; /* total number of iovecs */
+ unsigned int iovec_data; /* specific index of data iovec */
+ bool miss; /* signal a miss to unlink hdr_it */
+ bool badcrc; /* signal a crc failure */
+ bool active; // FIXME: canary for test. remove
+} io_wrap;
+#endif
/**
* The structure representing a connection into memcached.
*/
-typedef struct conn conn;
struct conn {
int sfd;
sasl_conn_t *sasl_conn;
@@ -549,7 +601,12 @@ struct conn {
int suffixsize;
char **suffixcurr;
int suffixleft;
-
+#ifdef EXTSTORE
+ int io_wrapleft;
+ unsigned int recache_counter;
+ io_wrap *io_wraplist; /* linked list of io_wraps */
+ bool io_queued; /* FIXME: debugging flag */
+#endif
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
diff --git a/sizes.c b/sizes.c
index 95a644c..4fb5388 100644
--- a/sizes.c
+++ b/sizes.c
@@ -16,6 +16,9 @@ int main(int argc, char **argv) {
display("Settings", sizeof(struct settings));
display("Item (no cas)", sizeof(item));
display("Item (cas)", sizeof(item) + sizeof(uint64_t));
+#ifdef EXTSTORE
+ display("extstore header", sizeof(item_hdr));
+#endif
display("Libevent thread",
sizeof(LIBEVENT_THREAD) - sizeof(struct thread_stats));
display("Connection", sizeof(conn));
diff --git a/slabs.c b/slabs.c
index 0fc3d16..c523a5f 100644
--- a/slabs.c
+++ b/slabs.c
@@ -360,6 +360,9 @@ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
it = (item *)ptr;
if ((it->it_flags & ITEM_CHUNKED) == 0) {
+#ifdef EXTSTORE
+ bool is_hdr = it->it_flags & ITEM_HDR;
+#endif
it->it_flags = ITEM_SLABBED;
it->slabs_clsid = 0;
it->prev = 0;
@@ -368,7 +371,15 @@ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
p->slots = it;
p->sl_curr++;
+#ifdef EXTSTORE
+ if (!is_hdr) {
+ p->requested -= size;
+ } else {
+ p->requested -= (size - it->nbytes) + sizeof(item_hdr);
+ }
+#else
p->requested -= size;
+#endif
} else {
do_slabs_free_chunked(it, size);
}
@@ -855,6 +866,11 @@ static int slab_rebalance_move(void) {
*/
/* Check if expired or flushed */
ntotal = ITEM_ntotal(it);
+#ifdef EXTSTORE
+ if (it->it_flags & ITEM_HDR) {
+ ntotal = (ntotal - it->nbytes) + sizeof(item_hdr);
+ }
+#endif
/* REQUIRES slabs_lock: CHECK FOR cls->sl_curr > 0 */
if (ch == NULL && (it->it_flags & ITEM_CHUNKED)) {
/* Chunked should be identical to non-chunked, except we need
@@ -928,6 +944,7 @@ static int slab_rebalance_move(void) {
} else {
/* restore ntotal in case we tried saving a head chunk. */
ntotal = ITEM_ntotal(it);
+ // FIXME: need storage instance to call extstore_delete
do_item_unlink(it, hv);
slabs_free(it, ntotal, slab_rebal.s_clsid);
/* Swing around again later to remove it from the freelist. */
@@ -1027,6 +1044,7 @@ static void slab_rebalance_finish(void) {
slab_rebal.d_clsid);
} else if (slab_rebal.d_clsid == SLAB_GLOBAL_PAGE_POOL) {
/* mem_malloc'ed might be higher than mem_limit. */
+ mem_limit_reached = false;
memory_release();
}
diff --git a/storage.c b/storage.c
new file mode 100644
index 0000000..21fd1f1
--- /dev/null
+++ b/storage.c
@@ -0,0 +1,379 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include "memcached.h"
+#include "storage.h"
+#include <stdlib.h>
+#include <string.h>
+#include <limits.h>
+
+int lru_maintainer_store(void *storage, const int clsid) {
+ //int i;
+ int did_moves = 0;
+ int item_age = settings.ext_item_age;
+ bool mem_limit_reached = false;
+ unsigned int chunks_free;
+ unsigned int chunks_perslab;
+ struct lru_pull_tail_return it_info;
+ // FIXME: need to directly ask the slabber how big a class is
+ if (slabs_clsid(settings.ext_item_size) > clsid)
+ return 0;
+ chunks_free = slabs_available_chunks(clsid, &mem_limit_reached,
+ NULL, &chunks_perslab);
+ // if we are low on chunks and no spare, push out early.
+ if (chunks_free < (chunks_perslab / 2) && mem_limit_reached)
+ item_age = 0;
+
+ it_info.it = NULL;
+ lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
+ /* Item isn't locked, but we have a reference to it. */
+ if (it_info.it != NULL) {
+ obj_io io;
+ item *it = it_info.it;
+ /* First, storage for the header object */
+ size_t orig_ntotal = ITEM_ntotal(it);
+ uint32_t flags;
+ // TODO: Doesn't presently work with chunked items. */
+ if ((it->it_flags & (ITEM_CHUNKED|ITEM_HDR)) == 0 &&
+ (item_age == 0 || current_time - it->time > item_age)) {
+ if (settings.inline_ascii_response) {
+ flags = (uint32_t) strtoul(ITEM_suffix(it)+1, (char **) NULL, 10);
+ } else if (it->nsuffix > 0) {
+ flags = *((uint32_t *)ITEM_suffix(it));
+ } else {
+ flags = 0;
+ }
+ item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
+ /* Run the storage write understanding the header is dirty.
+ * We will fill it from the header on read either way.
+ */
+ if (hdr_it != NULL) {
+ // N.B.: rel_time_t happens to be an unsigned int.
+ // to do this fully safely requires more refactoring.
+ // cuddle the hash value into the time field so we don't have
+ // to recalcultae it.
+ uint32_t orig_time = (uint32_t) it->time;
+ it->time = it_info.hv;
+ // Also cuddle a crc32c into the exptime field.
+ // first clear it so we get a clean crc
+ uint32_t orig_exptime = (uint32_t) it->exptime;
+ it->exptime = 0;
+ it->exptime = crc32c(0, (char*)it+24, orig_ntotal-24);
+ hdr_it->it_flags |= ITEM_HDR;
+ io.buf = (void *) it;
+ io.len = orig_ntotal;
+ io.mode = OBJ_IO_WRITE;
+ // NOTE: when the item is read back in, the slab mover
+ // may see it. Important to have refcount>=2 or ~ITEM_LINKED
+ assert(it->refcount >= 2);
+ if (extstore_write(storage, 0, &io) == 0) {
+ item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
+ hdr->page_version = io.page_version;
+ hdr->page_id = io.page_id;
+ hdr->offset = io.offset;
+ // overload nbytes for the header it
+ hdr_it->nbytes = it->nbytes;
+ /* success! Now we need to fill relevant data into the new
+ * header and replace. Most of this requires the item lock
+ */
+ /* CAS gets set while linking. Copy post-replace */
+ item_replace(it, hdr_it, it_info.hv);
+ ITEM_set_cas(hdr_it, ITEM_get_cas(it));
+ //fprintf(stderr, "EXTSTORE: swapped an item: %s %lu %lu\n", ITEM_key(it), orig_ntotal, ntotal);
+ do_item_remove(hdr_it);
+ did_moves = 1;
+ } else {
+ //fprintf(stderr, "EXTSTORE: failed to write\n");
+ /* Failed to write for some reason, can't continue. */
+ slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
+ }
+ it->time = orig_time;
+ it->exptime = orig_exptime;
+ }
+ }
+ do_item_remove(it);
+ item_unlock(it_info.hv);
+ }
+ return did_moves;
+}
+
+/* Fetch stats from the external storage system and decide to compact.
+ * If we're more than half full, start skewing how aggressively to run
+ * compaction, up to a desired target when all pages are full.
+ */
+static int storage_compact_check(void *storage, logger *l,
+ uint32_t *page_id,
+ uint64_t *page_version, uint64_t *page_size) {
+ struct extstore_stats st;
+ int x;
+ double rate;
+ uint64_t frag_limit;
+ uint64_t low_version = ULLONG_MAX;
+ unsigned int low_page = 0;
+ extstore_get_stats(storage, &st);
+ if (st.pages_used == 0)
+ return 0;
+
+ // lets pick a target "wasted" value and slew.
+ if (st.pages_free > st.page_count / 4)
+ return 0;
+
+ // the number of free pages reduces the configured frag limit
+ // this allows us to defrag early if pages are very empty.
+ rate = 1.0 - ((double)st.pages_free / st.page_count);
+ rate *= settings.ext_max_frag;
+ frag_limit = st.page_size * rate;
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
+ NULL, rate, frag_limit);
+ st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
+ extstore_get_page_data(storage, &st);
+
+ // find oldest page by version that violates the constraint
+ for (x = 0; x < st.page_count; x++) {
+ if (st.page_data[x].version == 0)
+ continue;
+ if (st.page_data[x].bytes_used < frag_limit) {
+ if (st.page_data[x].version < low_version) {
+ low_page = x;
+ low_version = st.page_data[x].version;
+ }
+ }
+ }
+ *page_size = st.page_size;
+ free(st.page_data);
+
+ // we have a page + version to attempt to reclaim.
+ if (low_version != ULLONG_MAX) {
+ *page_id = low_page;
+ *page_version = low_version;
+ return 1;
+ }
+
+ return 0;
+}
+
+static pthread_t storage_compact_tid;
+#define MIN_STORAGE_COMPACT_SLEEP 10000
+#define MAX_STORAGE_COMPACT_SLEEP 2000000
+
+struct storage_compact_wrap {
+ obj_io io;
+ pthread_mutex_t lock; // gates the bools.
+ bool done;
+ bool submitted;
+ bool miss; // version flipped out from under us
+};
+
+static void storage_compact_readback(void *storage, logger *l,
+ char *readback_buf,
+ uint32_t page_id, uint64_t page_version, uint64_t read_size) {
+ uint64_t offset = 0;
+ unsigned int rescues = 0;
+ unsigned int lost = 0;
+
+ while (offset < read_size) {
+ item *hdr_it = NULL;
+ item_hdr *hdr = NULL;
+ item *it = (item *)(readback_buf+offset);
+ unsigned int ntotal;
+ // probably zeroed out junk at the end of the wbuf
+ if (it->nkey == 0) {
+ break;
+ }
+
+ ntotal = ITEM_ntotal(it);
+ uint32_t hv = (uint32_t)it->time;
+ item_lock(hv);
+ // We don't have a conn and don't need to do most of do_item_get
+ hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
+ if (hdr_it != NULL) {
+ bool do_write = false;
+ refcount_incr(hdr_it);
+
+ // Check validity but don't bother removing it.
+ if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
+ (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
+ hdr = (item_hdr *)ITEM_data(hdr_it);
+ if (hdr->page_id == page_id && hdr->page_version == page_version) {
+ // Item header is still completely valid.
+ extstore_delete(storage, page_id, page_version, 1, ntotal);
+ do_write = true;
+ }
+ }
+
+ if (do_write) {
+ bool do_update = false;
+ int tries;
+ obj_io io;
+ io.buf = (void *)it;
+ io.len = ntotal;
+ io.mode = OBJ_IO_WRITE;
+ for (tries = 10; tries > 0; tries--) {
+ if (extstore_write(storage, 1, &io) == 0) {
+ do_update = true;
+ break;
+ } else {
+ usleep(1000);
+ }
+ }
+
+ if (do_update) {
+ if (it->refcount == 2) {
+ hdr->page_version = io.page_version;
+ hdr->page_id = io.page_id;
+ hdr->offset = io.offset;
+ rescues++;
+ } else {
+ lost++;
+ // TODO: re-alloc and replace header.
+ }
+ }
+ }
+
+ do_item_remove(hdr_it);
+ }
+
+ item_unlock(hv);
+ offset += ntotal;
+ if (read_size - offset < sizeof(struct _stritem))
+ break;
+ }
+
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
+ NULL, page_id, offset, rescues, lost);
+}
+
+static void _storage_compact_cb(void *e, obj_io *io, int ret) {
+ struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
+ assert(wrap->submitted == true);
+
+ pthread_mutex_lock(&wrap->lock);
+
+ if (ret < 1) {
+ wrap->miss = true;
+ }
+ wrap->done = true;
+
+ pthread_mutex_unlock(&wrap->lock);
+}
+
+// TODO: hoist the storage bits from lru_maintainer_thread in here.
+// would be nice if they could avoid hammering the same locks though?
+// I guess it's only COLD. that's probably fine.
+static void *storage_compact_thread(void *arg) {
+ void *storage = arg;
+ useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP;
+ bool compacting = false;
+ uint64_t page_version = 0;
+ uint64_t page_size = 0;
+ uint64_t page_offset = 0;
+ uint32_t page_id = 0;
+ char *readback_buf = NULL;
+ struct storage_compact_wrap wrap;
+
+ logger *l = logger_create();
+ if (l == NULL) {
+ fprintf(stderr, "Failed to allocate logger for LRU maintainer thread\n");
+ abort();
+ }
+
+ // TODO: check error.
+ readback_buf = malloc(settings.ext_wbuf_size);
+
+ pthread_mutex_init(&wrap.lock, NULL);
+ wrap.done = false;
+ wrap.submitted = false;
+ wrap.io.data = &wrap;
+ wrap.io.buf = (void *)readback_buf;
+
+ wrap.io.len = settings.ext_wbuf_size;
+ wrap.io.mode = OBJ_IO_READ;
+ wrap.io.cb = _storage_compact_cb;
+
+ while (1) {
+ if (to_sleep) {
+ extstore_run_maint(storage);
+ usleep(to_sleep);
+ }
+
+ if (!compacting && storage_compact_check(storage, l,
+ &page_id, &page_version, &page_size)) {
+ page_offset = 0;
+ compacting = true;
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
+ NULL, page_id, page_version);
+ }
+
+ if (compacting) {
+ pthread_mutex_lock(&wrap.lock);
+ if (page_offset < page_size && !wrap.done && !wrap.submitted) {
+ wrap.io.page_version = page_version;
+ wrap.io.page_id = page_id;
+ wrap.io.offset = page_offset;
+ // FIXME: should be smarter about io->next (unlink at use?)
+ wrap.io.next = NULL;
+ wrap.submitted = true;
+ wrap.miss = false;
+
+ extstore_submit(storage, &wrap.io);
+ } else if (wrap.miss) {
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
+ NULL, page_id);
+ wrap.done = false;
+ wrap.submitted = false;
+ compacting = false;
+ } else if (wrap.submitted && wrap.done) {
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
+ NULL, page_id, page_offset);
+ storage_compact_readback(storage, l,
+ readback_buf, page_id, page_version, settings.ext_wbuf_size);
+ page_offset += settings.ext_wbuf_size;
+ wrap.done = false;
+ wrap.submitted = false;
+ } else if (page_offset >= page_size) {
+ compacting = false;
+ wrap.done = false;
+ wrap.submitted = false;
+ extstore_close_page(storage, page_id, page_version);
+ LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
+ NULL, page_id);
+ }
+ pthread_mutex_unlock(&wrap.lock);
+
+ if (to_sleep > MIN_STORAGE_COMPACT_SLEEP)
+ to_sleep /= 2;
+ } else {
+ if (to_sleep < MAX_STORAGE_COMPACT_SLEEP)
+ to_sleep += MIN_STORAGE_COMPACT_SLEEP;
+ }
+ }
+ free(readback_buf);
+
+ return NULL;
+}
+
+// TODO
+/*int stop_storage_compact_thread(void) {
+ int ret;
+ pthread_mutex_lock(&lru_maintainer_lock);
+ do_run_lru_maintainer_thread = 0;
+ pthread_mutex_unlock(&lru_maintainer_lock);
+ if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
+ fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
+ return -1;
+ }
+ settings.lru_maintainer_thread = false;
+ return 0;
+}*/
+
+int start_storage_compact_thread(void *arg) {
+ int ret;
+
+ if ((ret = pthread_create(&storage_compact_tid, NULL,
+ storage_compact_thread, arg)) != 0) {
+ fprintf(stderr, "Can't create storage_compact thread: %s\n",
+ strerror(ret));
+ return -1;
+ }
+
+ return 0;
+}
+
diff --git a/storage.h b/storage.h
new file mode 100644
index 0000000..3bfeacc
--- /dev/null
+++ b/storage.h
@@ -0,0 +1,7 @@
+#ifndef STORAGE_H
+#define STORAGE_H
+
+int lru_maintainer_store(void *storage, const int clsid);
+int start_storage_compact_thread(void *arg);
+
+#endif
diff --git a/thread.c b/thread.c
index 0bca3df..e9f40a6 100644
--- a/thread.c
+++ b/thread.c
@@ -334,6 +334,13 @@ static void setup_thread(LIBEVENT_THREAD *me) {
fprintf(stderr, "Failed to create suffix cache\n");
exit(EXIT_FAILURE);
}
+#ifdef EXTSTORE
+ me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL);
+ if (me->io_cache == NULL) {
+ fprintf(stderr, "Failed to create IO object cache\n");
+ exit(EXIT_FAILURE);
+ }
+#endif
}
/*
@@ -641,6 +648,9 @@ void threadlocal_stats_reset(void) {
pthread_mutex_lock(&threads[ii].stats.mutex);
#define X(name) threads[ii].stats.name = 0;
THREAD_STATS_FIELDS
+#ifdef EXTSTORE
+ EXTSTORE_THREAD_STATS_FIELDS
+#endif
#undef X
memset(&threads[ii].stats.slab_stats, 0,
@@ -663,6 +673,9 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) {
pthread_mutex_lock(&threads[ii].stats.mutex);
#define X(name) stats->name += threads[ii].stats.name;
THREAD_STATS_FIELDS
+#ifdef EXTSTORE
+ EXTSTORE_THREAD_STATS_FIELDS
+#endif
#undef X
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
@@ -765,7 +778,9 @@ void memcached_thread_init(int nthreads, void *arg) {
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
-
+#ifdef EXTSTORE
+ threads[i].storage = arg;
+#endif
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats_state.reserved_fds += 5;