diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/compressors/bzip2/bzip2_compress.c | 49 | ||||
-rw-r--r-- | ext/compressors/zlib/Makefile.am | 6 | ||||
-rw-r--r-- | ext/compressors/zlib/zlib_compress.c | 367 | ||||
-rw-r--r-- | ext/datasources/helium/Makefile.am | 11 | ||||
-rw-r--r-- | ext/datasources/helium/README (renamed from ext/test/memrata/README) | 62 | ||||
-rw-r--r-- | ext/datasources/helium/helium.c (renamed from ext/test/memrata/memrata.c) | 1708 | ||||
-rw-r--r-- | ext/test/memrata/Makefile.am | 12 |
7 files changed, 1308 insertions, 907 deletions
diff --git a/ext/compressors/bzip2/bzip2_compress.c b/ext/compressors/bzip2/bzip2_compress.c index 73e9ef3a932..dd97e2abee3 100644 --- a/ext/compressors/bzip2/bzip2_compress.c +++ b/ext/compressors/bzip2/bzip2_compress.c @@ -42,12 +42,10 @@ bzip2_decompress(WT_COMPRESSOR *, WT_SESSION *, uint8_t *, size_t, uint8_t *, size_t, size_t *); static int bzip2_terminate(WT_COMPRESSOR *, WT_SESSION *); -#ifdef WIREDTIGER_TEST_COMPRESS_RAW static int bzip2_compress_raw(WT_COMPRESSOR *, WT_SESSION *, size_t, int, size_t, uint8_t *, uint32_t *, uint32_t, uint8_t *, size_t, int, size_t *, uint32_t *); -#endif /* Local compressor structure. */ typedef struct { @@ -70,18 +68,26 @@ typedef struct { WT_SESSION *session; } BZIP_OPAQUE; -int -wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +/* + * bzip2_add_compressor -- + * Add a bzip2 compressor. + */ +static int +bzip2_add_compressor(WT_CONNECTION *connection, int raw, const char *name) { BZIP_COMPRESSOR *bzip_compressor; - (void)config; /* Unused parameters */ - + /* + * There are two almost identical bzip2 compressors: one supporting raw + * compression (used by test/format to test raw compression), the other + * without raw compression, that might be useful for real applications. + */ if ((bzip_compressor = calloc(1, sizeof(BZIP_COMPRESSOR))) == NULL) return (errno); bzip_compressor->compressor.compress = bzip2_compress; - bzip_compressor->compressor.compress_raw = NULL; + bzip_compressor-> + compressor.compress_raw = raw ? bzip2_compress_raw : NULL; bzip_compressor->compressor.decompress = bzip2_decompress; bzip_compressor->compressor.pre_size = NULL; bzip_compressor->compressor.terminate = bzip2_terminate; @@ -109,15 +115,22 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) */ bzip_compressor->bz_small = 0; - /* Load the compressor */ -#ifdef WIREDTIGER_TEST_COMPRESS_RAW - bzip_compressor->compressor.compress_raw = bzip2_compress_raw; - return (connection->add_compressor( - connection, "raw", (WT_COMPRESSOR *)bzip_compressor, NULL)); -#else - return (connection->add_compressor( - connection, "bzip2", (WT_COMPRESSOR *)bzip_compressor, NULL)); -#endif + return (connection->add_compressor( /* Load the compressor */ + connection, name, (WT_COMPRESSOR *)bzip_compressor, NULL)); +} + +int +wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +{ + int ret; + + (void)config; /* Unused parameters */ + + if ((ret = bzip2_add_compressor(connection, 0, "bzip2")) != 0) + return (ret); + if ((ret = bzip2_add_compressor(connection, 1, "bzip2-raw-test")) != 0) + return (ret); + return (0); } /* @@ -238,7 +251,6 @@ bzip2_compress(WT_COMPRESSOR *compressor, WT_SESSION *session, return (0); } -#ifdef WIREDTIGER_TEST_COMPRESS_RAW /* * __bzip2_compress_raw_random -- * Return a 32-bit pseudo-random number. @@ -328,9 +340,8 @@ bzip2_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session, (uintmax_t)page_max, split_pct, (uintmax_t)extra, slots, take, offsets[take], (uintmax_t)*result_lenp); #endif - return (0); + return (take == 0 ? EAGAIN : 0); } -#endif static int bzip2_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, diff --git a/ext/compressors/zlib/Makefile.am b/ext/compressors/zlib/Makefile.am new file mode 100644 index 00000000000..373277c92c2 --- /dev/null +++ b/ext/compressors/zlib/Makefile.am @@ -0,0 +1,6 @@ +AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include + +lib_LTLIBRARIES = libwiredtiger_zlib.la +libwiredtiger_zlib_la_SOURCES = zlib_compress.c +libwiredtiger_zlib_la_LDFLAGS = -avoid-version -module +libwiredtiger_zlib_la_LIBADD = -lz diff --git a/ext/compressors/zlib/zlib_compress.c b/ext/compressors/zlib/zlib_compress.c new file mode 100644 index 00000000000..a48037c8526 --- /dev/null +++ b/ext/compressors/zlib/zlib_compress.c @@ -0,0 +1,367 @@ +/*- + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <zlib.h> +#include <errno.h> +#include <inttypes.h> +#include <stdlib.h> +#include <string.h> + +#include <wiredtiger.h> +#include <wiredtiger_ext.h> + +/* Local compressor structure. */ +typedef struct { + WT_COMPRESSOR compressor; /* Must come first */ + + WT_EXTENSION_API *wt_api; /* Extension API */ + + int zlib_level; /* Configuration */ +} ZLIB_COMPRESSOR; + +/* + * zlib gives us a cookie to pass to the underlying allocation functions; we + * need two handles, package them up. + */ +typedef struct { + WT_COMPRESSOR *compressor; + WT_SESSION *session; +} ZLIB_OPAQUE; + +/* + * zlib_error -- + * Output an error message, and return a standard error code. + */ +static int +zlib_error( + WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int zret) +{ + WT_EXTENSION_API *wt_api; + + wt_api = ((ZLIB_COMPRESSOR *)compressor)->wt_api; + + (void)wt_api->err_printf(wt_api, session, + "zlib error: %s: %s: %d", call, zError(zret), zret); + return (WT_ERROR); +} + +static void * +zalloc(void *cookie, u_int number, u_int size) +{ + ZLIB_OPAQUE *opaque; + WT_EXTENSION_API *wt_api; + + opaque = cookie; + wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api; + return (wt_api->scr_alloc( + wt_api, opaque->session, (size_t)(number * size))); +} + +static void +zfree(void *cookie, void *p) +{ + ZLIB_OPAQUE *opaque; + WT_EXTENSION_API *wt_api; + + opaque = cookie; + wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api; + wt_api->scr_free(wt_api, opaque->session, p); +} + +static int +zlib_compress(WT_COMPRESSOR *compressor, WT_SESSION *session, + uint8_t *src, size_t src_len, + uint8_t *dst, size_t dst_len, + size_t *result_lenp, int *compression_failed) +{ + ZLIB_COMPRESSOR *zlib_compressor; + ZLIB_OPAQUE opaque; + z_stream zs; + int ret; + + zlib_compressor = (ZLIB_COMPRESSOR *)compressor; + + memset(&zs, 0, sizeof(zs)); + zs.zalloc = zalloc; + zs.zfree = zfree; + opaque.compressor = compressor; + opaque.session = session; + zs.opaque = &opaque; + + if ((ret = deflateInit(&zs, zlib_compressor->zlib_level)) != Z_OK) + return (zlib_error( + compressor, session, "deflateInit", ret)); + + zs.next_in = src; + zs.avail_in = (uint32_t)src_len; + zs.next_out = dst; + zs.avail_out = (uint32_t)dst_len - 1; + while ((ret = deflate(&zs, Z_FINISH)) == Z_OK) + ; + if (ret == Z_STREAM_END) { + *compression_failed = 0; + *result_lenp = zs.total_out; + } else + *compression_failed = 1; + + if ((ret = deflateEnd(&zs)) != Z_OK) + return ( + zlib_error(compressor, session, "deflateEnd", ret)); + + return (0); +} + +/* + * zlib_find_slot -- + * Find the slot containing the target offset (binary search). + */ +static inline uint32_t +zlib_find_slot(uint32_t target, uint32_t *offsets, uint32_t slots) +{ + uint32_t base, indx, limit; + + indx = 1; + + /* Figure out which slot we got to: binary search */ + if (target >= offsets[slots]) + indx = slots; + else if (target > offsets[1]) + for (base = 2, limit = slots - base; limit != 0; limit >>= 1) { + indx = base + (limit >> 1); + if (target < offsets[indx]) + continue; + base = indx + 1; + --limit; + } + + return (indx); +} + +/* + * zlib_compress_raw -- + * Pack records into a specified on-disk page size. + */ +static int +zlib_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session, + size_t page_max, int split_pct, size_t extra, + uint8_t *src, uint32_t *offsets, uint32_t slots, + uint8_t *dst, size_t dst_len, int final, + size_t *result_lenp, uint32_t *result_slotsp) +{ + ZLIB_COMPRESSOR *zlib_compressor; + ZLIB_OPAQUE opaque; + z_stream last_zs, zs; + uint32_t curr_slot, last_slot; + int ret; + + curr_slot = last_slot = 0; + (void)split_pct; + (void)dst_len; + (void)final; + + zlib_compressor = (ZLIB_COMPRESSOR *)compressor; + + memset(&zs, 0, sizeof(zs)); + zs.zalloc = zalloc; + zs.zfree = zfree; + opaque.compressor = compressor; + opaque.session = session; + zs.opaque = &opaque; + + if ((ret = deflateInit(&zs, + zlib_compressor->zlib_level)) != Z_OK) + return (zlib_error( + compressor, session, "deflateInit", ret)); + + zs.next_in = src; + zs.next_out = dst; + /* + * Experimentally derived, reserve this many bytes for zlib to finish + * up a buffer. If this isn't sufficient, we don't fail but we will be + * inefficient. + */ +#define WT_ZLIB_RESERVED 12 + zs.avail_out = (uint32_t)(page_max - extra - WT_ZLIB_RESERVED); + last_zs = zs; + + /* + * Strategy: take the available output size and compress that much + * input. Continue until there is no input small enough or the + * compression fails to fit. + */ + while (zs.avail_out > 0) { + /* Find the slot we will try to compress up to. */ + if ((curr_slot = zlib_find_slot( + zs.total_in + zs.avail_out, offsets, slots)) <= last_slot) + break; + + zs.avail_in = offsets[curr_slot] - offsets[last_slot]; + /* Save the stream state in case the chosen data doesn't fit. */ + last_zs = zs; + + while (zs.avail_in > 0 && zs.avail_out > 0) + if ((ret = deflate(&zs, Z_SYNC_FLUSH)) != Z_OK) + return (zlib_error( + compressor, session, "deflate", ret)); + + /* Roll back the if the last deflate didn't complete. */ + if (zs.avail_in > 0) { + zs = last_zs; + break; + } else + last_slot = curr_slot; + } + + zs.avail_out += WT_ZLIB_RESERVED; + while ((ret = deflate(&zs, Z_FINISH)) == Z_OK) + ; + /* + * If the end marker didn't fit, report that we got no work done. WT + * will compress the (possibly large) page image using ordinary + * compression instead. + */ + if (ret == Z_BUF_ERROR) + last_slot = 0; + else if (ret != Z_STREAM_END) + return ( + zlib_error(compressor, session, "deflate end block", ret)); + + if ((ret = deflateEnd(&zs)) != Z_OK && ret != Z_DATA_ERROR) + return ( + zlib_error(compressor, session, "deflateEnd", ret)); + + if (last_slot > 0) { + *result_slotsp = last_slot; + *result_lenp = zs.total_out; + } else { + /* We didn't manage to compress anything: don't retry. */ + *result_slotsp = 0; + *result_lenp = 1; + } + +#if 0 + fprintf(stderr, + "zlib_compress_raw (%s): page_max %" PRIuMAX ", slots %" PRIu32 + ", take %" PRIu32 ": %" PRIu32 " -> %" PRIuMAX "\n", + final ? "final" : "not final", (uintmax_t)page_max, + slots, last_slot, offsets[last_slot], (uintmax_t)*result_lenp); +#endif + return (0); +} + +static int +zlib_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, + uint8_t *src, size_t src_len, + uint8_t *dst, size_t dst_len, + size_t *result_lenp) +{ + ZLIB_OPAQUE opaque; + z_stream zs; + int ret, tret; + + memset(&zs, 0, sizeof(zs)); + zs.zalloc = zalloc; + zs.zfree = zfree; + opaque.compressor = compressor; + opaque.session = session; + zs.opaque = &opaque; + + if ((ret = inflateInit(&zs)) != Z_OK) + return (zlib_error( + compressor, session, "inflateInit", ret)); + + zs.next_in = src; + zs.avail_in = (uint32_t)src_len; + zs.next_out = dst; + zs.avail_out = (uint32_t)dst_len; + while ((ret = inflate(&zs, Z_FINISH)) == Z_OK) + ; + if (ret == Z_STREAM_END) { + *result_lenp = zs.total_out; + ret = Z_OK; + } + + if ((tret = inflateEnd(&zs)) != Z_OK && ret == Z_OK) + ret = tret; + + return (ret == Z_OK ? + 0 : zlib_error(compressor, session, "inflate", ret)); +} + +static int +zlib_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session) +{ + (void)session; /* Unused parameters */ + + free(compressor); + return (0); +} + +static int +zlib_add_compressor(WT_CONNECTION *connection, int raw, const char *name) +{ + ZLIB_COMPRESSOR *zlib_compressor; + + /* + * There are two almost identical zlib compressors: one supporting raw + * compression, and one without. + */ + if ((zlib_compressor = calloc(1, sizeof(ZLIB_COMPRESSOR))) == NULL) + return (errno); + + zlib_compressor->compressor.compress = zlib_compress; + zlib_compressor->compressor.compress_raw = raw ? + zlib_compress_raw : NULL; + zlib_compressor->compressor.decompress = zlib_decompress; + zlib_compressor->compressor.pre_size = NULL; + zlib_compressor->compressor.terminate = zlib_terminate; + + zlib_compressor->wt_api = connection->get_extension_api(connection); + + /* + * between 0-10: level: see zlib manual. + */ + zlib_compressor->zlib_level = Z_DEFAULT_COMPRESSION; + + /* Load the standard compressor. */ + return (connection->add_compressor( + connection, name, &zlib_compressor->compressor, NULL)); +} + +int +wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +{ + int ret; + + (void)config; /* Unused parameters */ + + if ((ret = zlib_add_compressor(connection, 1, "zlib")) != 0) + return (ret); + if ((ret = zlib_add_compressor(connection, 0, "zlib-noraw")) != 0) + return (ret); + return (0); +} diff --git a/ext/datasources/helium/Makefile.am b/ext/datasources/helium/Makefile.am new file mode 100644 index 00000000000..b4e6e67e2cd --- /dev/null +++ b/ext/datasources/helium/Makefile.am @@ -0,0 +1,11 @@ +AM_CPPFLAGS = -I$(top_builddir) \ + -I$(top_srcdir)/src/include -I$(HELIUM_PATH) + +noinst_LTLIBRARIES = libwiredtiger_helium.la +libwiredtiger_helium_la_SOURCES = helium.c +libwiredtiger_helium_la_LIBADD = -L$(HELIUM_PATH) -lhe + +# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well +# as installation, it will only build static libraries. As far as I can tell, +# the "approved" libtool way to turn them back on is by adding -rpath. +libwiredtiger_helium_la_LDFLAGS = -avoid-version -module -rpath /nowhere diff --git a/ext/test/memrata/README b/ext/datasources/helium/README index ee338474566..e78ba58c71d 100644 --- a/ext/test/memrata/README +++ b/ext/datasources/helium/README @@ -1,15 +1,15 @@ -Memrata README. +Helium README. -The data structures are "KVS sources" which map to one or more physical -devices; each KVS source supports any number of "WiredTiger sources", -where a WiredTiger source will be an object similar to a Btree "file:" -object. Each WiredTiger source supports any number of WiredTiger cursors. +The data structures are "Helium sources" which map to one or more physical +volumes; each Helium source supports any number of "WiredTiger sources", +where a WiredTiger source is an object similar to a Btree "file:" object. +Each WiredTiger source supports any number of WiredTiger cursors. -Each KVS source is given a logical name when the Memrata device is loaded, -and that logical name is subsequently used when a WiredTiger source is -created. For example, a KVS source might be named "dev1", and correspond -to /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create calls would specify -a URI like "table:dev1/my_table". +Each Helium source is given a logical name when first referenced, and that +logical name is subsequently used when a WiredTiger source is created. For +example, the logical name for a Helium source might be "dev1", and it would +map to the Helium volumes /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create +calls specify a URI like "table:dev1/my_table". For each WiredTiger source, we create two namespaces on the underlying device, a "cache" and a "primary". @@ -51,23 +51,23 @@ When a next/prev is done: move to the next/prev visible item in the primary return the one closest to the starting position -Note locks are not acquired for read operations, and no flushes are done for -any of these operations. +Locks are not acquired for read operations, and no flushes are done for any of +these operations. -We also create one additional namespace, the "txn" name space, which serves -all of the WiredTiger and KVS sources. Whenever a transaction commits, we -insert a commit record into the txn name space and flush the device. When a -transaction rolls back, we insert an abort record into the txn name space, -but don't flush the device. +We also create one additional object, the transaction name space, which serves +all of the WiredTiger and Helium objects in a WiredTiger connection. Whenever +a transaction involving a Helium source commits, we insert a commit record into +the transaction name space and flush the device. When a transaction rolls back, +we insert an abort record into the txn name space, but don't flush the device. The visibility check is slightly different than the rest of WiredTiger: we do not reset anything when a transaction aborts, and so we have to check if the transaction has been aborted as well as check the transaction ID for visibility. -We create a "cleanup" thread for every KVS source. The job of this thread is -to migrate rows from the cache into the primary. Any committed, globally -visible change in the cache can be copied into the primary and removed from -the cache: +We create a "cleanup" thread for every underlying Helium source. The job of +this thread is to migrate rows from the cache object into the primary. Any +committed, globally visible change in the cache can be copied into the primary +and removed from the cache: set BaseTxnID to the oldest transaction ID not yet visible to a running transaction @@ -95,24 +95,26 @@ to the primary. No lock is required when removing rows from the transaction store, once the transaction ID is less than the BaseTxnID, it will never be read. -Memrata recovery is almost identical to the cleanup thread, which migrates -rows from the cache into the primary. For every cache/primary name space, -we migrate every commit to the primary (by definition, at recovery time it -must be globally visible), and discard everything (by defintion, at recovery -time anything not committed has been aborted. +Helium recovery is almost identical to the cleanup thread, which migrates rows +from the cache into the primary. For every cache/primary pair, migrate every +commit to the primary (by definition, at recovery time it must be globally +visible), and discard everything else (by definition, at recovery time anything +not committed has been aborted. =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= Questions, problems, whatever: -* This implementation is endian-specific, that is, a store created on a -little-endian machine is not portable to a big-endian machine. +* The implementation is endian-specific, that is, the WiredTiger metadata +stored on the Helium device is on not portable to a big-endian machine. +Helium's metadata is portable between different endian machines, so this +should probably be fixed. * There's a problem with transactions in WiredTiger that span more than a single data source. For example, consider a transaction that modifies -both a KVS object and a Btree object. If we commit and push the KVS +both a Helium object and a Btree object. If we commit and push the Helium commit record to stable storage, and then crash before committing the Btree change, the enclosing WiredTiger transaction will/should end up aborting, -and there's no way for us to back out the change in KVS. I'm leaving +and there's no way for us to back out the change in Helium. I'm leaving this problem alone until WiredTiger fine-grained durability is complete, we're going to need WiredTiger support for some kind of 2PC to solve this. diff --git a/ext/test/memrata/memrata.c b/ext/datasources/helium/helium.c index 442c078b5a2..1239c88befa 100644 --- a/ext/test/memrata/memrata.c +++ b/ext/datasources/helium/helium.c @@ -24,6 +24,7 @@ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ +#include <sys/select.h> #include <ctype.h> #include <errno.h> @@ -33,56 +34,80 @@ #include <stdlib.h> #include <string.h> -#include <kvs.h> +#include <he.h> + #include <wiredtiger.h> #include <wiredtiger_ext.h> +typedef struct he_env HE_ENV; +typedef struct he_item HE_ITEM; +typedef struct he_stats HE_STATS; + +static int verbose = 0; /* Verbose messages */ + /* - * Macros to output an error message and set or return an error. - * Requires local variables: - * int ret; + * Macros to output error and verbose messages, and set or return an error. + * Error macros require local "ret" variable. * * ESET: update an error value, handling more/less important errors. - * ERET: output a message and return the error. - * EMSG, EMSG_ERR: - * output a message and set the local error value, optionally jump to the - * err label. + * ERET: output a message, return the error. + * EMSG: output a message, set the local error value. + * EMSG_ERR: + * output a message, set the local error value, jump to the err label. + * VMSG: verbose message. */ #undef ESET #define ESET(a) do { \ - int __ret; \ - if ((__ret = (a)) != 0 && \ - (__ret == WT_PANIC || \ - ret == 0 || ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND)) \ - ret = __ret; \ + int __v; \ + if ((__v = (a)) != 0) { \ + /* \ + * On error, check for a panic (it overrides all other \ + * returns). Else, if there's no return value or the \ + * return value is not strictly an error, override it \ + * with the error. \ + */ \ + if (__v == WT_PANIC || \ + ret == 0 || \ + ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND) \ + ret = __v; \ + /* \ + * If we're set to a Helium error at the end of the day,\ + * switch to a generic WiredTiger error. \ + */ \ + if (ret < 0 && ret > -31,800) \ + ret = WT_ERROR; \ + } \ } while (0) #undef ERET #define ERET(wtext, session, v, ...) do { \ (void) \ - wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \ - return (v); \ + wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \ + ESET(v); \ + return (ret); \ } while (0) #undef EMSG #define EMSG(wtext, session, v, ...) do { \ (void) \ - wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \ + wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \ ESET(v); \ } while (0) #undef EMSG_ERR #define EMSG_ERR(wtext, session, v, ...) do { \ (void) \ - wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \ + wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \ ESET(v); \ goto err; \ } while (0) - -/* - * STRING_MATCH -- - * Return if a string matches a bytestring of a specified length. - */ -#undef STRING_MATCH -#define STRING_MATCH(str, bytes, len) \ - (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0') +#undef VERBOSE_L1 +#define VERBOSE_L1 1 +#undef VERBOSE_L2 +#define VERBOSE_L2 2 +#undef VMSG +#define VMSG(wtext, session, v, ...) do { \ + if (verbose >= v) \ + (void)wtext-> \ + msg_printf(wtext, session, "helium: " __VA_ARGS__); \ +} while (0) /* * OVERWRITE_AND_FREE -- @@ -95,20 +120,29 @@ } while (0) /* - * Version each file, out of sheer raging paranoia. + * Version each object, out of sheer raging paranoia. */ -#define KVS_MAJOR 1 /* KVS major, minor version */ -#define KVS_MINOR 0 +#define WIREDTIGER_HELIUM_MAJOR 1 /* Major, minor version */ +#define WIREDTIGER_HELIUM_MINOR 0 /* - * WiredTiger name space on the memrata device: all primary store objects are - * named "WiredTiger.XXX", the cache store object is "WiredTiger.XXX.cache", - * and the per-device transaction file is "WiredTiger.txn". + * WiredTiger name space on the Helium store: all objects are named with the + * WiredTiger prefix (we don't require the Helium store be exclusive to our + * files). Primary objects are named "WiredTiger.[name]", associated cache + * objects are "WiredTiger.[name].cache". The per-connection transaction + * object is "WiredTiger.WiredTigerTxn". When we first open a Helium volume, + * we open/close a file in order to apply flags for the first open of the + * volume, that's "WiredTiger.WiredTigerInit". */ #define WT_NAME_PREFIX "WiredTiger." -#define WT_NAME_TXN "WiredTiger.txn" +#define WT_NAME_INIT "WiredTiger.WiredTigerInit" +#define WT_NAME_TXN "WiredTiger.WiredTigerTxn" #define WT_NAME_CACHE ".cache" +/* + * WT_SOURCE -- + * A WiredTiger source, supporting one or more cursors. + */ typedef struct __wt_source { char *uri; /* Unique name */ @@ -120,68 +154,79 @@ typedef struct __wt_source { uint64_t append_recno; /* Allocation record number */ - int config_recno; /* config "key_format=r" */ int config_bitfield; /* config "value_format=#t" */ + int config_compress; /* config "helium_o_compress" */ + int config_recno; /* config "key_format=r" */ /* - * Each WiredTiger object has a "primary" namespace in a KVS store plus - * a "cache" namespace, which has not-yet-resolved updates. There's a - * dirty flag so we can ignore the cache until it's used. + * Each WiredTiger object has a "primary" namespace in a Helium store + * plus a "cache" namespace, which has not-yet-resolved updates. There + * is a dirty flag so read-only data sets can ignore the cache. */ - kvs_t kvs; /* Underlying KVS object */ - kvs_t kvscache; /* Underlying KVS cache */ - int kvscache_inuse; + he_t he; /* Underlying Helium object */ + he_t he_cache; /* Underlying Helium cache */ + int he_cache_inuse; - uint64_t cleaner_bytes; /* Bytes since clean */ - uint64_t cleaner_ops; /* Operations since clean */ - - struct __kvs_source *ks; /* Underlying KVS source */ + struct __he_source *hs; /* Underlying Helium source */ struct __wt_source *next; /* List of WiredTiger objects */ } WT_SOURCE; -typedef struct __kvs_source { +/* + * HELIUM_SOURCE -- + * A Helium volume, supporting one or more WT_SOURCE objects. + */ +typedef struct __he_source { /* * XXX * The transaction commit handler must appear first in the structure. */ WT_TXN_NOTIFY txn_notify; /* Transaction commit handler */ - char *name; /* Unique name */ + WT_EXTENSION_API *wtext; /* Extension functions */ + + char *name; /* Unique WiredTiger name */ + char *device; /* Unique Helium volume name */ - kvs_t kvs_device; /* Underlying KVS store */ + /* + * Maintain a handle for each underlying Helium source so checkpoint is + * faster, we can "commit" a single handle per source, regardless of the + * number of objects. + */ + he_t he_volume; struct __wt_source *ws_head; /* List of WiredTiger sources */ /* - * Each KVS source has a cleaner thread to migrate WiredTiger source + * Each Helium source has a cleaner thread to migrate WiredTiger source * updates from the cache namespace to the primary namespace, based on - * the number of bytes or the number of operations. We read these - * fields without a lock, but serialize writes to minimize races (and - * because it costs us nothing). - * - * There's a cleaner thread per KVS store because migration operations - * can overlap. + * the number of bytes or the number of operations. (There's a cleaner + * thread per Helium store so migration operations can overlap.) We + * read these fields without a lock, but serialize writes to minimize + * races (and because it costs us nothing). */ - WT_EXTENSION_API *wtext; /* Extension functions */ pthread_t cleaner_id; /* Cleaner thread ID */ volatile int cleaner_stop; /* Cleaner thread quit flag */ /* * Each WiredTiger connection has a transaction namespace which lists * resolved transactions with their committed or aborted state as a - * value. We create that namespace in the first KVS store created, - * and then simply reference it from other, subsequently created KVS - * stores. + * value. That namespace appears in a single Helium store (the first + * one created, if it doesn't already exist), and then it's referenced + * from other Helium stores. */ #define TXN_ABORTED 'A' #define TXN_COMMITTED 'C' #define TXN_UNRESOLVED 0 - kvs_t kvstxn; /* Underlying KVS txn store */ - int kvsowner; /* Owns transaction store */ + he_t he_txn; /* Helium txn store */ + int he_owner; /* Owns transaction store */ - struct __kvs_source *next; /* List of KVS sources */ -} KVS_SOURCE; + struct __he_source *next; /* List of Helium sources */ +} HELIUM_SOURCE; +/* + * DATA_SOURCE -- + * A WiredTiger data source, supporting one or more HELIUM_SOURCE objects. + */ typedef struct __data_source { WT_DATA_SOURCE wtds; /* Must come first */ @@ -190,10 +235,13 @@ typedef struct __data_source { pthread_rwlock_t global_lock; /* Global lock */ int lockinit; /* Lock created */ - KVS_SOURCE *kvs_head; /* List of KVS sources */ + struct __he_source *hs_head; /* List of Helium sources */ } DATA_SOURCE; /* + * CACHE_RECORD -- + * An array of updates from the cache object. + * * Values in the cache store are marshalled/unmarshalled to/from the store, * using a simple encoding: * {N records: 4B} @@ -203,7 +251,7 @@ typedef struct __data_source { * {record#1 data} * ... * - * Each KVS cursor potentially has a single set of these values. + * Each cursor potentially has a single set of these values. */ typedef struct __cache_record { uint8_t *v; /* Value */ @@ -213,15 +261,19 @@ typedef struct __cache_record { int remove; /* 1/0 remove flag */ } CACHE_RECORD; +/* + * CURSOR -- + * A cursor, supporting a single WiredTiger cursor. + */ typedef struct __cursor { WT_CURSOR wtcursor; /* Must come first */ WT_EXTENSION_API *wtext; /* Extension functions */ - WT_SOURCE *ws; /* WiredTiger source */ + WT_SOURCE *ws; /* Underlying source */ - struct kvs_record record; /* Record */ - uint8_t __key[KVS_MAX_KEY_LEN]; /* Record.key, Record.value */ + HE_ITEM record; /* Record */ + uint8_t __key[HE_MAX_KEY_LEN]; /* Record.key, Record.value */ uint8_t *v; size_t len; size_t mem_len; @@ -241,6 +293,26 @@ typedef struct __cursor { } CURSOR; /* + * prefix_match -- + * Return if a string matches a prefix. + */ +static inline int +prefix_match(const char *str, const char *pfx) +{ + return (strncmp(str, pfx, strlen(pfx)) == 0); +} + +/* + * string_match -- + * Return if a string matches a byte string of len bytes. + */ +static inline int +string_match(const char *str, const char *bytes, size_t len) +{ + return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0'); +} + +/* * cursor_destroy -- * Free a cursor's memory, and optionally the cursor itself. */ @@ -259,7 +331,7 @@ cursor_destroy(CURSOR *cursor) /* * os_errno -- - * Limit our use of errno so it's easy to remove. + * Limit our use of errno so it's easy to find/remove. */ static int os_errno(void) @@ -272,8 +344,7 @@ os_errno(void) * Initialize a lock. */ static int -lock_init( - WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp) +lock_init(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp) { int ret = 0; @@ -304,8 +375,7 @@ lock_destroy( * Acquire a write lock. */ static inline int -writelock( - WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp) +writelock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp) { int ret = 0; @@ -331,71 +401,90 @@ unlock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp) } #if 0 -static int -kvs_dump_print(uint8_t *p, size_t len, FILE *fp) +static void +helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp) { + (void)fprintf(stderr, "%s %3zu: ", pfx, len); for (; len > 0; --len, ++p) if (!isspace(*p) && isprint(*p)) - putc(*p, fp); + (void)putc(*p, fp); else if (len == 1 && *p == '\0') /* Skip string nuls. */ continue; else - fprintf(fp, "%#x", *p); + (void)fprintf(fp, "%#x", *p); + (void)putc('\n', fp); } /* - * kvs_dump -- - * Dump the records in a KVS store. + * he_dump -- + * Dump the records in a Helium store. */ static int -kvs_dump(kvs_t kvs, const char *tag) +helium_dump(WT_EXTENSION_API *wtext, he_t he, const char *tag) { - FILE *fp; - struct kvs_record *r, _r; - size_t maxbuf = 4 * 1024; + HE_ITEM *r, _r; + uint8_t k[4 * 1024], v[4 * 1024]; int ret = 0; r = &_r; memset(r, 0, sizeof(*r)); - r->key = malloc(maxbuf); - r->key_len = 0; - r->val = malloc(maxbuf); - r->val_len = maxbuf; - - (void)snprintf(r->val, maxbuf, "dump.%s", tag); - fp = fopen(r->val, "w"); - fprintf(fp, "== %s\n", tag); - - while ((ret = kvs_next(kvs, r, 0UL, (unsigned long)maxbuf)) == 0) { - kvs_dump_print(r->key, r->key_len, fp); - putc('\t', fp); - kvs_dump_print(r->val, r->val_len, fp); - putc('\n', fp); + r->key = k; + r->val = v; - r->val_len = maxbuf; + (void)fprintf(stderr, "== %s\n", tag); + while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) { +#if 0 + uint64_t recno; + if ((ret = wtext->struct_unpack(wtext, + NULL, r->key, r->key_len, "r", &recno)) != 0) + return (ret); + fprintf(stderr, "K: %" PRIu64, recno); +#else + helium_dump_kv("K: ", r->key, r->key_len, stderr); +#endif + helium_dump_kv("V: ", r->val, r->val_len, stderr); } - if (ret == KVS_E_KEY_NOT_FOUND) - ret = 0; - fprintf(fp, "========================== (%d)\n", ret); - fclose(fp); + if (ret != HE_ERR_ITEM_NOT_FOUND) { + fprintf(stderr, "he_next: %s\n", he_strerror(ret)); + ret = WT_ERROR; + } + return (ret); +} - free(r->key); - free(r->val); +/* + * helium_stats -- + * Display Helium statistics for a datastore. + */ +static int +helium_stats( + WT_EXTENSION_API *wtext, WT_SESSION *session, he_t he, const char *tag) +{ + HE_STATS stats; + int ret = 0; - return (ret); + if ((ret = he_stats(he, &stats)) != 0) + ERET(wtext, session, ret, "he_stats: %s", he_strerror(ret)); + fprintf(stderr, "== %s\n", tag); + fprintf(stderr, "name=%s\n", stats.name); + fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items); + fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items); + fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items); + fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity); + fprintf(stderr, "size=%" PRIu64 "B\n", stats.size); + return (0); } #endif /* - * kvs_call -- - * Call a KVS key retrieval function, handling overflow. + * helium_call -- + * Call a Helium key retrieval function, handling overflow. */ static inline int -kvs_call(WT_CURSOR *wtcursor, const char *fname, kvs_t kvs, - int (*f)(kvs_t, struct kvs_record *, unsigned long, unsigned long)) +helium_call(WT_CURSOR *wtcursor, const char *fname, + he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t)) { - struct kvs_record *r; CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; WT_SESSION *session; int ret = 0; @@ -409,18 +498,17 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname, kvs_t kvs, r->val = cursor->v; restart: - if ((ret = f(kvs, r, 0UL, (unsigned long)cursor->mem_len)) != 0) { - if (ret == KVS_E_KEY_NOT_FOUND) + if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) { + if (ret == HE_ERR_ITEM_NOT_FOUND) return (WT_NOTFOUND); - ERET(wtext, - session, WT_ERROR, "%s: %s", fname, kvs_strerror(ret)); + ERET(wtext, session, ret, "%s: %s", fname, he_strerror(ret)); } /* * If the returned length is larger than our passed-in length, we didn't - * get the complete value. Grow the buffer and use kvs_get to complete - * the retrieval (kvs_get because the call succeeded and the key was - * copied out, so calling kvs_next/kvs_prev again would skip key/value + * get the complete value. Grow the buffer and use he_lookup to do the + * retrieval (he_lookup because the call succeeded and the key was + * copied out, so calling he_next/he_prev again would skip key/value * pairs). * * We have to loop, another thread of control might change the length of @@ -441,12 +529,11 @@ restart: cursor->v = r->val = p; cursor->mem_len = r->val_len + 32; - if ((ret = kvs_get( - kvs, r, 0UL, (unsigned long)cursor->mem_len)) != 0) { - if (ret == KVS_E_KEY_NOT_FOUND) + if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) { + if (ret == HE_ERR_ITEM_NOT_FOUND) goto restart; - ERET(wtext, session, - WT_ERROR, "kvs_get: %s", kvs_strerror(ret)); + ERET(wtext, + session, ret, "he_lookup: %s", he_strerror(ret)); } } /* NOTREACHED */ @@ -458,47 +545,46 @@ restart: */ static int txn_state_set(WT_EXTENSION_API *wtext, - WT_SESSION *session, KVS_SOURCE *ks, uint64_t txnid, int commit) + WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit) { - struct kvs_record txn; + HE_ITEM txn; uint8_t val; int ret = 0; - /* Update the store -- commits must be durable, flush the device. */ - memset(&txn, 0, sizeof(txn)); - txn.key = &txnid; - txn.key_len = sizeof(txnid); - /* + * Update the store -- commits must be durable, flush the volume. + * + * XXX * Not endian-portable, we're writing a native transaction ID to the * store. */ + memset(&txn, 0, sizeof(txn)); + txn.key = &txnid; + txn.key_len = sizeof(txnid); val = commit ? TXN_COMMITTED : TXN_ABORTED; txn.val = &val; - txn.val_len = 1; + txn.val_len = sizeof(val); - if ((ret = kvs_set(ks->kvstxn, &txn)) != 0) - ERET(wtext, session, - WT_ERROR, "kvs_set: %s", kvs_strerror(ret)); + if ((ret = he_update(hs->he_txn, &txn)) != 0) + ERET(wtext, session, ret, "he_update: %s", he_strerror(ret)); - if (commit && (ret = kvs_commit(ks->kvs_device)) != 0) - ERET(wtext, session, - WT_ERROR, "kvs_commit: %s", kvs_strerror(ret)); + if (commit && (ret = he_commit(hs->he_txn)) != 0) + ERET(wtext, session, ret, "he_commit: %s", he_strerror(ret)); return (0); } /* * txn_notify -- - * Resolve a transaction. + * Resolve a transaction; called from WiredTiger during commit/abort. */ static int txn_notify(WT_TXN_NOTIFY *handler, WT_SESSION *session, uint64_t txnid, int committed) { - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; - ks = (KVS_SOURCE *)handler; - return (txn_state_set(ks->wtext, session, ks, txnid, committed)); + hs = (HELIUM_SOURCE *)handler; + return (txn_state_set(hs->wtext, session, hs, txnid, committed)); } /* @@ -508,13 +594,13 @@ txn_notify(WT_TXN_NOTIFY *handler, static int txn_state(WT_CURSOR *wtcursor, uint64_t txnid) { - struct kvs_record txn; CURSOR *cursor; - KVS_SOURCE *ks; + HE_ITEM txn; + HELIUM_SOURCE *hs; uint8_t val_buf[16]; cursor = (CURSOR *)wtcursor; - ks = cursor->ws->ks; + hs = cursor->ws->hs; memset(&txn, 0, sizeof(txn)); txn.key = &txnid; @@ -522,7 +608,7 @@ txn_state(WT_CURSOR *wtcursor, uint64_t txnid) txn.val = val_buf; txn.val_len = sizeof(val_buf); - if (kvs_get(ks->kvstxn, &txn, 0UL, (unsigned long)sizeof(val_buf)) == 0) + if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0) return (val_buf[0]); return (TXN_UNRESOLVED); } @@ -534,8 +620,8 @@ txn_state(WT_CURSOR *wtcursor, uint64_t txnid) static int cache_value_append(WT_CURSOR *wtcursor, int remove_op) { - struct kvs_record *r; CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; WT_SESSION *session; uint64_t txnid; @@ -587,6 +673,7 @@ cache_value_append(WT_CURSOR *wtcursor, int remove_op) * Copy the WiredTiger cursor's data into place: txn ID, remove * tombstone, data length, data. * + * XXX * Not endian-portable, we're writing a native transaction ID to the * store. */ @@ -604,7 +691,7 @@ cache_value_append(WT_CURSOR *wtcursor, int remove_op) } cursor->len = (size_t)(p - cursor->v); - /* Update the underlying KVS record. */ + /* Update the underlying Helium record. */ r->val = cursor->v; r->val_len = cursor->len; @@ -764,10 +851,8 @@ cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest) { CACHE_RECORD *cp; CURSOR *cursor; - WT_SESSION *session; u_int i; - session = wtcursor->session; cursor = (CURSOR *)wtcursor; /* @@ -882,21 +967,23 @@ cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp) static int key_max_err(WT_EXTENSION_API *wtext, WT_SESSION *session, size_t len) { + int ret = 0; + ERET(wtext, session, EINVAL, - "key length (%" PRIuMAX " bytes) larger than the maximum Memrata " + "key length (%zu bytes) larger than the maximum Helium " "key length of %d bytes", - (uintmax_t)len, KVS_MAX_KEY_LEN); + len, HE_MAX_KEY_LEN); } /* * copyin_key -- - * Copy a WT_CURSOR key to a struct kvs_record key. + * Copy a WT_CURSOR key to a HE_ITEM key. */ static inline int copyin_key(WT_CURSOR *wtcursor, int allocate_key) { - struct kvs_record *r; CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; @@ -944,14 +1031,14 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) if ((ret = wtext->struct_size(wtext, session, &size, "r", wtcursor->recno)) != 0 || (ret = wtext->struct_pack(wtext, session, - r->key, KVS_MAX_KEY_LEN, "r", wtcursor->recno)) != 0) + r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno)) != 0) return (ret); r->key_len = size; } else { /* I'm not sure this test is necessary, but it's cheap. */ - if (wtcursor->key.size > KVS_MAX_KEY_LEN) - return (key_max_err( - wtext, session, (size_t)wtcursor->key.size)); + if (wtcursor->key.size > HE_MAX_KEY_LEN) + return ( + key_max_err(wtext, session, wtcursor->key.size)); /* * A set cursor key might reference application memory, which @@ -969,13 +1056,13 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key) /* * copyout_key -- - * Copy a struct kvs_record key to a WT_CURSOR key. + * Copy a HE_ITEM key to a WT_CURSOR key. */ static inline int copyout_key(WT_CURSOR *wtcursor) { - struct kvs_record *r; CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; @@ -993,14 +1080,14 @@ copyout_key(WT_CURSOR *wtcursor) return (ret); } else { wtcursor->key.data = r->key; - wtcursor->key.size = (uint32_t)r->key_len; + wtcursor->key.size = (size_t)r->key_len; } return (0); } /* * copyout_val -- - * Copy a kvs store's struct kvs_record value to a WT_CURSOR value. + * Copy a Helium store's HE_ITEM value to a WT_CURSOR value. */ static inline int copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp) @@ -1011,7 +1098,7 @@ copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp) if (cp == NULL) { wtcursor->value.data = cursor->v; - wtcursor->value.size = (uint32_t)cursor->len; + wtcursor->value.size = cursor->len; } else { wtcursor->value.data = cp->v; wtcursor->value.size = cp->len; @@ -1025,11 +1112,11 @@ copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp) */ static int nextprev(WT_CURSOR *wtcursor, const char *fname, - int (*f)(kvs_t, struct kvs_record *, unsigned long, unsigned long)) + int (*f)(he_t, HE_ITEM *, size_t, size_t)) { - struct kvs_record *r; CACHE_RECORD *cp; CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; WT_ITEM a, b; WT_SESSION *session; @@ -1050,7 +1137,7 @@ nextprev(WT_CURSOR *wtcursor, const char *fname, * the store. We don't care if we race, we're not guaranteeing any * special behavior with respect to phantoms. */ - if (ws->kvscache_inuse == 0) { + if (ws->he_cache_inuse == 0) { cache_ret = WT_NOTFOUND; goto cache_clean; } @@ -1079,7 +1166,7 @@ skip_deleted: * entry, or we reach the end/beginning. */ for (cache_rm = 0;;) { - if ((ret = kvs_call(wtcursor, fname, ws->kvscache, f)) != 0) + if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0) break; if ((ret = cache_value_unmarshall(wtcursor)) != 0) return (ret); @@ -1134,7 +1221,7 @@ skip_deleted: cache_clean: /* Get the next/prev entry from the store. */ - ret = kvs_call(wtcursor, fname, ws->kvs, f); + ret = helium_call(wtcursor, fname, ws->he, f); if (ret != 0 && ret != WT_NOTFOUND) return (ret); @@ -1154,7 +1241,7 @@ cache_clean: if ((ret = wtext->collate(wtext, session, &a, &b, &cmp)) != 0) return (ret); - if (f == kvs_next) { + if (f == he_next) { if (cmp >= 0) ret = WT_NOTFOUND; else @@ -1196,34 +1283,34 @@ cache_clean: } /* - * kvs_cursor_next -- + * helium_cursor_next -- * WT_CURSOR.next method. */ static int -kvs_cursor_next(WT_CURSOR *wtcursor) +helium_cursor_next(WT_CURSOR *wtcursor) { - return (nextprev(wtcursor, "kvs_next", kvs_next)); + return (nextprev(wtcursor, "he_next", he_next)); } /* - * kvs_cursor_prev -- + * helium_cursor_prev -- * WT_CURSOR.prev method. */ static int -kvs_cursor_prev(WT_CURSOR *wtcursor) +helium_cursor_prev(WT_CURSOR *wtcursor) { - return (nextprev(wtcursor, "kvs_prev", kvs_prev)); + return (nextprev(wtcursor, "he_prev", he_prev)); } /* - * kvs_cursor_reset -- + * helium_cursor_reset -- * WT_CURSOR.reset method. */ static int -kvs_cursor_reset(WT_CURSOR *wtcursor) +helium_cursor_reset(WT_CURSOR *wtcursor) { - struct kvs_record *r; CURSOR *cursor; + HE_ITEM *r; cursor = (CURSOR *)wtcursor; r = &cursor->record; @@ -1237,11 +1324,11 @@ kvs_cursor_reset(WT_CURSOR *wtcursor) } /* - * kvs_cursor_search -- + * helium_cursor_search -- * WT_CURSOR.search method. */ static int -kvs_cursor_search(WT_CURSOR *wtcursor) +helium_cursor_search(WT_CURSOR *wtcursor) { CACHE_RECORD *cp; CURSOR *cursor; @@ -1259,7 +1346,8 @@ kvs_cursor_search(WT_CURSOR *wtcursor) * Check for an entry in the cache. If we find one, unmarshall it * and check for a visible entry we can return. */ - if ((ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) == 0) { + if ((ret = + helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) { if ((ret = cache_value_unmarshall(wtcursor)) != 0) return (ret); if (cache_value_visible(wtcursor, &cp)) @@ -1269,18 +1357,18 @@ kvs_cursor_search(WT_CURSOR *wtcursor) return (ret); /* Check for an entry in the primary store. */ - if ((ret = kvs_call(wtcursor, "kvs_get", ws->kvs, kvs_get)) != 0) + if ((ret = helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0) return (ret); return (copyout_val(wtcursor, NULL)); } /* - * kvs_cursor_search_near -- + * helium_cursor_search_near -- * WT_CURSOR.search_near method. */ static int -kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact) +helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact) { int ret = 0; @@ -1294,7 +1382,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact) */ /* Search for an exact match. */ - if ((ret = kvs_cursor_search(wtcursor)) == 0) { + if ((ret = helium_cursor_search(wtcursor)) == 0) { *exact = 0; return (0); } @@ -1302,7 +1390,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact) return (ret); /* Search for a key that's larger. */ - if ((ret = kvs_cursor_next(wtcursor)) == 0) { + if ((ret = helium_cursor_next(wtcursor)) == 0) { *exact = 1; return (0); } @@ -1310,7 +1398,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact) return (ret); /* Search for a key that's smaller. */ - if ((ret = kvs_cursor_prev(wtcursor)) == 0) { + if ((ret = helium_cursor_prev(wtcursor)) == 0) { *exact = -1; return (0); } @@ -1319,16 +1407,16 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact) } /* - * kvs_cursor_insert -- + * helium_cursor_insert -- * WT_CURSOR.insert method. */ static int -kvs_cursor_insert(WT_CURSOR *wtcursor) +helium_cursor_insert(WT_CURSOR *wtcursor) { - struct kvs_record *r; CACHE_RECORD *cp; CURSOR *cursor; - KVS_SOURCE *ks; + HE_ITEM *r; + HELIUM_SOURCE *hs; WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; @@ -1338,13 +1426,16 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; ws = cursor->ws; - ks = ws->ks; + hs = ws->hs; r = &cursor->record; /* Get the WiredTiger cursor's key. */ if ((ret = copyin_key(wtcursor, 1)) != 0) return (ret); + VMSG(wtext, session, VERBOSE_L2, + "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val); + /* Clear the value, assume we're adding the first cache entry. */ cursor->len = 0; @@ -1353,7 +1444,8 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) return (ret); /* Read the record from the cache store. */ - switch (ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) { + switch (ret = helium_call( + wtcursor, "he_lookup", ws->he_cache, he_lookup)) { case 0: /* Crack the record. */ if ((ret = cache_value_unmarshall(wtcursor)) != 0) @@ -1385,8 +1477,8 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) break; /* If overwrite is false, an entry is an error. */ - if ((ret = kvs_call( - wtcursor, "kvs_get", ws->kvs, kvs_get)) != WT_NOTFOUND) { + if ((ret = helium_call( + wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) { if (ret == 0) ret = WT_DUPLICATE_KEY; goto err; @@ -1398,21 +1490,17 @@ kvs_cursor_insert(WT_CURSOR *wtcursor) } /* - * Create a new cache value based on the current cache record plus the - * WiredTiger cursor's value. + * Create a new value using the current cache record plus the WiredTiger + * cursor's value, and update the cache. */ if ((ret = cache_value_append(wtcursor, 0)) != 0) goto err; - - /* Push the record into the cache. */ - if ((ret = kvs_set(ws->kvscache, r)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_set: %s", kvs_strerror(ret)); + if ((ret = he_update(ws->he_cache, r)) != 0) + EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret)); /* Update the state while still holding the lock. */ - ws->kvscache_inuse = 1; - ws->cleaner_bytes += wtcursor->value.size; - ++ws->cleaner_ops; + if (ws->he_cache_inuse == 0) + ws->he_cache_inuse = 1; /* Discard the lock. */ err: ESET(unlock(wtext, session, &ws->lock)); @@ -1420,7 +1508,7 @@ err: ESET(unlock(wtext, session, &ws->lock)); /* If successful, request notification at transaction resolution. */ if (ret == 0) ESET( - wtext->transaction_notify(wtext, session, &ks->txn_notify)); + wtext->transaction_notify(wtext, session, &hs->txn_notify)); return (ret); } @@ -1432,10 +1520,10 @@ err: ESET(unlock(wtext, session, &ws->lock)); static int update(WT_CURSOR *wtcursor, int remove_op) { - struct kvs_record *r; CACHE_RECORD *cp; CURSOR *cursor; - KVS_SOURCE *ks; + HE_ITEM *r; + HELIUM_SOURCE *hs; WT_EXTENSION_API *wtext; WT_SESSION *session; WT_SOURCE *ws; @@ -1445,13 +1533,18 @@ update(WT_CURSOR *wtcursor, int remove_op) cursor = (CURSOR *)wtcursor; wtext = cursor->wtext; ws = cursor->ws; - ks = ws->ks; + hs = ws->hs; r = &cursor->record; /* Get the WiredTiger cursor's key. */ if ((ret = copyin_key(wtcursor, 0)) != 0) return (ret); + VMSG(wtext, session, VERBOSE_L2, + "%c %.*s.%.*s", + remove_op ? 'R' : 'U', + (int)r->key_len, r->key, (int)r->val_len, r->val); + /* Clear the value, assume we're adding the first cache entry. */ cursor->len = 0; @@ -1460,7 +1553,8 @@ update(WT_CURSOR *wtcursor, int remove_op) return (ret); /* Read the record from the cache store. */ - switch (ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) { + switch (ret = helium_call( + wtcursor, "he_lookup", ws->he_cache, he_lookup)) { case 0: /* Crack the record. */ if ((ret = cache_value_unmarshall(wtcursor)) != 0) @@ -1491,8 +1585,8 @@ update(WT_CURSOR *wtcursor, int remove_op) break; /* If overwrite is false, no entry is an error. */ - if ((ret = kvs_call( - wtcursor, "kvs_get", ws->kvs, kvs_get)) != 0) + if ((ret = + helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0) goto err; /* @@ -1513,10 +1607,12 @@ update(WT_CURSOR *wtcursor, int remove_op) goto err; /* Push the record into the cache. */ - if ((ret = kvs_set(ws->kvscache, r)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_set: %s", kvs_strerror(ret)); - ws->kvscache_inuse = 1; + if ((ret = he_update(ws->he_cache, r)) != 0) + EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret)); + + /* Update the state while still holding the lock. */ + if (ws->he_cache_inuse == 0) + ws->he_cache_inuse = 1; /* Discard the lock. */ err: ESET(unlock(wtext, session, &ws->lock)); @@ -1524,27 +1620,27 @@ err: ESET(unlock(wtext, session, &ws->lock)); /* If successful, request notification at transaction resolution. */ if (ret == 0) ESET( - wtext->transaction_notify(wtext, session, &ks->txn_notify)); + wtext->transaction_notify(wtext, session, &hs->txn_notify)); return (ret); } /* - * kvs_cursor_update -- + * helium_cursor_update -- * WT_CURSOR.update method. */ static int -kvs_cursor_update(WT_CURSOR *wtcursor) +helium_cursor_update(WT_CURSOR *wtcursor) { return (update(wtcursor, 0)); } /* - * kvs_cursor_remove -- + * helium_cursor_remove -- * WT_CURSOR.remove method. */ static int -kvs_cursor_remove(WT_CURSOR *wtcursor) +helium_cursor_remove(WT_CURSOR *wtcursor) { CURSOR *cursor; WT_SOURCE *ws; @@ -1558,18 +1654,18 @@ kvs_cursor_remove(WT_CURSOR *wtcursor) */ if (ws->config_bitfield) { wtcursor->value.size = 1; - wtcursor->value.data = "\0"; + wtcursor->value.data = ""; return (update(wtcursor, 0)); } return (update(wtcursor, 1)); } /* - * kvs_cursor_close -- + * helium_cursor_close -- * WT_CURSOR.close method. */ static int -kvs_cursor_close(WT_CURSOR *wtcursor) +helium_cursor_close(WT_CURSOR *wtcursor) { CURSOR *cursor; WT_EXTENSION_API *wtext; @@ -1595,27 +1691,27 @@ kvs_cursor_close(WT_CURSOR *wtcursor) * ws_source_name -- * Build a namespace name. */ -static inline int +static int ws_source_name(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, const char *suffix, char **pp) { DATA_SOURCE *ds; WT_EXTENSION_API *wtext; size_t len; + int ret = 0; const char *p; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; /* - * Create the store's name. Application URIs are "memrata:device/XXX"; - * we want the names on the memrata device to be obviously WiredTiger's, - * and the device name isn't interesting. Convert to "WiredTiger:XXX", + * Create the store's name. Application URIs are "helium:device/name"; + * we want the names on the Helium device to be obviously WiredTiger's, + * and the device name isn't interesting. Convert to "WiredTiger:name", * and add an optional suffix. */ - if (strncmp(uri, "memrata:", sizeof("memrata:") - 1) != 0 || - (p = strchr(uri, '/')) == NULL) - ERET(wtext, session, EINVAL, "%s: illegal memrata URI", uri); + if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL) + ERET(wtext, session, EINVAL, "%s: illegal Helium URI", uri); ++p; len = strlen(WT_NAME_PREFIX) + @@ -1628,85 +1724,39 @@ ws_source_name(WT_DATA_SOURCE *wtds, } /* - * ws_source_drop_namespace -- - * Drop a namespace. - */ -static int -ws_source_drop_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, const char *suffix, kvs_t kvs_device) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - char *p; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - p = NULL; - - /* Drop the underlying KVS namespace. */ - if ((ret = ws_source_name(wtds, session, uri, suffix, &p)) != 0) - return (ret); - if ((ret = kvs_delete_namespace(kvs_device, p)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_delete_namespace: %s: %s", p, kvs_strerror(ret)); - - free(p); - return (ret); -} - -/* - * ws_source_rename_namespace -- - * Rename a namespace. - */ -static int -ws_source_rename_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, const char *newuri, const char *suffix, kvs_t kvs_device) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - int ret = 0; - char *p, *pnew; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - p = pnew = NULL; - - /* Rename the underlying KVS namespace. */ - ret = ws_source_name(wtds, session, uri, suffix, &p); - if (ret == 0) - ret = ws_source_name(wtds, session, newuri, suffix, &pnew); - if (ret == 0 && (ret = kvs_rename_namespace(kvs_device, p, pnew)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_rename_namespace: %s: %s", p, kvs_strerror(ret)); - - free(p); - free(pnew); - return (ret); -} - -/* * ws_source_close -- - * Kill a WT_SOURCE structure. + * Close a WT_SOURCE reference. */ static int ws_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, WT_SOURCE *ws) { - int ret = 0; + int ret = 0, tret; + /* + * Warn if open cursors: it shouldn't happen because the upper layers of + * WiredTiger prevent it, so we don't do anything more than warn. + */ if (ws->ref != 0) EMSG(wtext, session, WT_ERROR, "%s: open object with %u open cursors being closed", ws->uri, ws->ref); - if (ws->kvs != NULL && (ret = kvs_close(ws->kvs)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_close: %s: %s", ws->uri, kvs_strerror(ret)); - ws->kvs = NULL; - if (ws->kvscache != NULL && (ret = kvs_close(ws->kvscache)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_close: %s(cache): %s", ws->uri, kvs_strerror(ret)); - ws->kvscache = NULL; + if (ws->he != NULL) { + if ((tret = he_commit(ws->he)) != 0) + EMSG(wtext, session, tret, + "he_commit: %s: %s", ws->uri, he_strerror(tret)); + if ((tret = he_close(ws->he)) != 0) + EMSG(wtext, session, tret, + "he_close: %s: %s", ws->uri, he_strerror(tret)); + ws->he = NULL; + } + if (ws->he_cache != NULL) { + if ((tret = he_close(ws->he_cache)) != 0) + EMSG(wtext, session, tret, + "he_close: %s(cache): %s", + ws->uri, he_strerror(tret)); + ws->he_cache = NULL; + } if (ws->lockinit) ESET(lock_destroy(wtext, session, &ws->lock)); @@ -1718,33 +1768,36 @@ ws_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, WT_SOURCE *ws) } /* - * ws_source_open_namespace -- - * Open a namespace. + * ws_source_open_object -- + * Open an object in the Helium store. */ static int -ws_source_open_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, const char *suffix, kvs_t kvs_device, int flags, - kvs_t *kvsp) +ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session, + HELIUM_SOURCE *hs, + const char *uri, const char *suffix, int flags, he_t *hep) { DATA_SOURCE *ds; WT_EXTENSION_API *wtext; - kvs_t kvs; + he_t he; char *p; int ret = 0; - *kvsp = NULL; + *hep = NULL; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; p = NULL; - /* Open the underlying KVS namespace. */ + /* Open the underlying Helium object. */ if ((ret = ws_source_name(wtds, session, uri, suffix, &p)) != 0) return (ret); - if ((kvs = kvs_open_namespace(kvs_device, p, flags)) == NULL) - EMSG(wtext, session, WT_ERROR, - "kvs_open_namespace: %s: %s", p, kvs_strerror(os_errno())); - *kvsp = kvs; + VMSG(wtext, session, VERBOSE_L1, "open %s/%s", hs->name, p); + if ((he = he_open(hs->device, p, flags, NULL)) == NULL) { + ret = os_errno(); + EMSG(wtext, session, ret, + "he_open: %s/%s: %s", hs->name, p, he_strerror(ret)); + } + *hep = he; free(p); return (ret); @@ -1763,7 +1816,7 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp) { DATA_SOURCE *ds; - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; WT_CONFIG_ITEM a; WT_EXTENSION_API *wtext; WT_SOURCE *ws; @@ -1778,26 +1831,26 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, ws = NULL; /* - * The URI will be "memrata:" followed by a KVS name and object name - * pair separated by a slash, for example, "memrata:dev/object". + * The URI will be "helium:" followed by a Helium name and object name + * pair separated by a slash, for example, "helium:volume/object". */ - if (strncmp(uri, "memrata:", strlen("memrata:")) != 0) + if (!prefix_match(uri, "helium:")) goto bad_name; - p = uri + strlen("memrata:"); + p = uri + strlen("helium:"); if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0') bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri); len = (size_t)(t - p); - /* Find a matching KVS device. */ - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if (STRING_MATCH(ks->name, p, len)) + /* Find a matching Helium device. */ + for (hs = ds->hs_head; hs != NULL; hs = hs->next) + if (string_match(hs->name, p, len)) break; - if (ks == NULL) + if (hs == NULL) ERET(wtext, NULL, - EINVAL, "%s: no matching Memrata store found", uri); + EINVAL, "%s: no matching Helium store found", uri); /* - * We're about to walk the KVS device's list of files, acquire the + * We're about to walk the Helium device's list of files, acquire the * global lock. */ if ((ret = writelock(wtext, session, &ds->global_lock)) != 0) @@ -1808,7 +1861,7 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri); * for the object's lock, optionally check if the object is busy, and * return. */ - for (ws = ks->ws_head; ws != NULL; ws = ws->next) + for (ws = hs->ws_head; ws != NULL; ws = ws->next) if (strcmp(ws->uri, uri) == 0) { /* Check to see if the object is busy. */ if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) { @@ -1836,45 +1889,35 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri); if ((ret = lock_init(wtext, session, &ws->lock)) != 0) goto err; ws->lockinit = 1; - ws->ks = ks; + ws->hs = hs; /* - * Open the underlying KVS namespaces, then push the change. + * Open the underlying Helium objects, then push the change. * * The naming scheme is simple: the URI names the primary store, and the * URI with a trailing suffix names the associated caching store. * - * We can set debug and truncate flags, we always set the create flag, - * our caller handles attempts to create existing objects. + * We can set truncate flag, we always set the create flag, our caller + * handles attempts to create existing objects. */ - oflags = KVS_O_CREATE; + oflags = HE_O_CREATE; if ((ret = wtext->config_get(wtext, - session, config, "kvs_open_o_debug", &a)) == 0 && a.val != 0) - oflags |= KVS_O_DEBUG; - if (ret != 0 && ret != WT_NOTFOUND) { - EMSG(wtext, session, ret, - "kvs_open_o_debug configuration: %s", wtext->strerror(ret)); - goto err; - } - if ((ret = wtext->config_get(wtext, - session, config, "kvs_open_o_truncate", &a)) == 0 && a.val != 0) - oflags |= KVS_O_TRUNCATE; - if (ret != 0 && ret != WT_NOTFOUND) { - EMSG(wtext, session, ret, - "kvs_open_o_truncate configuration: %s", + session, config, "helium_o_truncate", &a)) == 0 && a.val != 0) + oflags |= HE_O_TRUNCATE; + if (ret != 0 && ret != WT_NOTFOUND) + EMSG_ERR(wtext, session, ret, + "helium_o_truncate configuration: %s", wtext->strerror(ret)); - goto err; - } - if ((ret = ws_source_open_namespace(wtds, - session, uri, NULL, ks->kvs_device, oflags, &ws->kvs)) != 0) + if ((ret = ws_source_open_object( + wtds, session, hs, uri, NULL, oflags, &ws->he)) != 0) goto err; - if ((ret = ws_source_open_namespace(wtds, session, - uri, WT_NAME_CACHE, ks->kvs_device, oflags, &ws->kvscache)) != 0) + if ((ret = ws_source_open_object( + wtds, session, hs, uri, WT_NAME_CACHE, oflags, &ws->he_cache)) != 0) goto err; - if ((ret = kvs_commit(ws->kvs)) != 0) - EMSG_ERR(wtext, session, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); + if ((ret = he_commit(ws->he)) != 0) + EMSG_ERR(wtext, session, ret, + "he_commit: %s", he_strerror(ret)); /* Optionally trade the global lock for the object lock. */ if (!(flags & WS_SOURCE_OPEN_GLOBAL) && @@ -1882,8 +1925,8 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri); goto err; /* Insert the new entry at the head of the list. */ - ws->next = ks->ws_head; - ks->ws_head = ws; + ws->next = hs->ws_head; + hs->ws_head = ws; *refp = ws; ws = NULL; @@ -1905,7 +1948,7 @@ err: if (ws != NULL) /* * master_uri_get -- - * Get the KVS master record for a URI. + * Get the Helium master record for a URI. */ static int master_uri_get(WT_DATA_SOURCE *wtds, @@ -1922,7 +1965,7 @@ master_uri_get(WT_DATA_SOURCE *wtds, /* * master_uri_drop -- - * Drop the KVS master record for a URI. + * Drop the Helium master record for a URI. */ static int master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri) @@ -1938,7 +1981,7 @@ master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri) /* * master_uri_rename -- - * Rename the KVS master record for a URI. + * Rename the Helium master record for a URI. */ static int master_uri_rename(WT_DATA_SOURCE *wtds, @@ -1971,14 +2014,14 @@ err: free((void *)value); /* * master_uri_set -- - * Set the KVS master record for a URI. + * Set the Helium master record for a URI. */ static int master_uri_set(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - WT_CONFIG_ITEM a, b; + WT_CONFIG_ITEM a, b, c; WT_EXTENSION_API *wtext; int exclusive, ret = 0; char value[1024]; @@ -2016,14 +2059,27 @@ master_uri_set(WT_DATA_SOURCE *wtds, wtext->strerror(ret)); } + /* Get the compression configuration. */ + if ((ret = wtext->config_get( + wtext, session, config, "helium_o_compress", &c)) != 0) { + if (ret == WT_NOTFOUND) + c.val = 0; + else + ERET(wtext, session, ret, + "helium_o_compress configuration: %s", + wtext->strerror(ret)); + } + /* * Create a new reference using insert (which fails if the record - * already exists). If that succeeds, we just used up a unique ID, - * update the master ID record. + * already exists). */ (void)snprintf(value, sizeof(value), - "version=(major=%d,minor=%d),key_format=%.*s,value_format=%.*s", - KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str); + "wiredtiger_helium_version=(major=%d,minor=%d)," + "key_format=%.*s,value_format=%.*s," + "helium_o_compress=%d", + WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR, + (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0); if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0) return (0); if (ret == WT_DUPLICATE_KEY) @@ -2032,11 +2088,11 @@ master_uri_set(WT_DATA_SOURCE *wtds, } /* - * kvs_session_open_cursor -- + * helium_session_open_cursor -- * WT_SESSION.open_cursor method. */ static int -kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, +helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor) { CURSOR *cursor; @@ -2078,15 +2134,15 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, "collator configuration: %s", wtext->strerror(ret)); /* Finish initializing the cursor. */ - cursor->wtcursor.close = kvs_cursor_close; - cursor->wtcursor.insert = kvs_cursor_insert; - cursor->wtcursor.next = kvs_cursor_next; - cursor->wtcursor.prev = kvs_cursor_prev; - cursor->wtcursor.remove = kvs_cursor_remove; - cursor->wtcursor.reset = kvs_cursor_reset; - cursor->wtcursor.search = kvs_cursor_search; - cursor->wtcursor.search_near = kvs_cursor_search_near; - cursor->wtcursor.update = kvs_cursor_update; + cursor->wtcursor.close = helium_cursor_close; + cursor->wtcursor.insert = helium_cursor_insert; + cursor->wtcursor.next = helium_cursor_next; + cursor->wtcursor.prev = helium_cursor_prev; + cursor->wtcursor.remove = helium_cursor_remove; + cursor->wtcursor.reset = helium_cursor_reset; + cursor->wtcursor.search = helium_cursor_search; + cursor->wtcursor.search_near = helium_cursor_search_near; + cursor->wtcursor.update = helium_cursor_update; cursor->wtext = wtext; cursor->record.key = cursor->__key; @@ -2123,21 +2179,28 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, ws->config_bitfield = v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't'; + if ((ret = wtext->config_strget( + wtext, session, value, "helium_o_compress", &v)) != 0) + EMSG_ERR(wtext, session, ret, + "helium_o_compress configuration: %s", + wtext->strerror(ret)); + ws->config_compress = v.val ? 1 : 0; + /* * If it's a record-number key, read the last record from the * object and set the allocation record value. */ if (ws->config_recno) { wtcursor = (WT_CURSOR *)cursor; - if ((ret = kvs_cursor_reset(wtcursor)) != 0) + if ((ret = helium_cursor_reset(wtcursor)) != 0) goto err; - if ((ret = kvs_cursor_prev(wtcursor)) == 0) + if ((ret = helium_cursor_prev(wtcursor)) == 0) ws->append_recno = wtcursor->recno; else if (ret != WT_NOTFOUND) goto err; - if ((ret = kvs_cursor_reset(wtcursor)) != 0) + if ((ret = helium_cursor_reset(wtcursor)) != 0) goto err; } @@ -2161,11 +2224,11 @@ err: if (ws != NULL && locked) } /* - * kvs_session_create -- + * helium_session_create -- * WT_SESSION.create method. */ static int -kvs_session_create(WT_DATA_SOURCE *wtds, +helium_session_create(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; @@ -2191,7 +2254,7 @@ kvs_session_create(WT_DATA_SOURCE *wtds, * We've discarded the lock, but that's OK, creates are single-threaded * at the WiredTiger level, it's not our problem to solve. * - * If unable to enter a WiredTiger record, leave the KVS store alone. + * If unable to enter a WiredTiger record, leave the Helium store alone. * A subsequent create should do the right thing, we aren't leaving * anything in an inconsistent state. */ @@ -2199,15 +2262,15 @@ kvs_session_create(WT_DATA_SOURCE *wtds, } /* - * kvs_session_drop -- + * helium_session_drop -- * WT_SESSION.drop method. */ static int -kvs_session_drop(WT_DATA_SOURCE *wtds, +helium_session_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; WT_EXTENSION_API *wtext; WT_SOURCE **p, *ws; int ret = 0; @@ -2217,7 +2280,7 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, /* * Get a locked reference to the data source: hold the global lock, - * we are going to change the list of objects for a KVS store. + * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects. * * Remove the entry from the WT_SOURCE list -- it's a singly-linked * list, find the reference to it. @@ -2225,28 +2288,23 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, if ((ret = ws_source_open(wtds, session, uri, config, WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0) return (ret); - ks = ws->ks; - for (p = &ks->ws_head; *p != NULL; p = &(*p)->next) + hs = ws->hs; + for (p = &hs->ws_head; *p != NULL; p = &(*p)->next) if (*p == ws) { *p = (*p)->next; break; } - /* Close the source, discarding the handles and structure. */ + /* Drop the underlying Helium objects. */ + ESET(he_remove(ws->he)); + ws->he = NULL; /* The handle is dead. */ + ESET(he_remove(ws->he_cache)); + ws->he_cache = NULL; /* The handle is dead. */ + + /* Close the source, discarding the structure. */ ESET(ws_source_close(wtext, session, ws)); ws = NULL; - /* Drop the underlying namespaces. */ - ESET(ws_source_drop_namespace( - wtds, session, uri, NULL, ks->kvs_device)); - ESET(ws_source_drop_namespace( - wtds, session, uri, WT_NAME_CACHE, ks->kvs_device)); - - /* Push the change. */ - if ((ret = kvs_commit(ks->kvs_device)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); - /* Discard the metadata entry. */ ESET(master_uri_drop(wtds, session, uri)); @@ -2262,19 +2320,18 @@ kvs_session_drop(WT_DATA_SOURCE *wtds, } /* - * kvs_session_rename -- + * helium_session_rename -- * WT_SESSION.rename method. */ static int -kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, +helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, const char *newuri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - KVS_SOURCE *ks; WT_EXTENSION_API *wtext; WT_SOURCE *ws; int ret = 0; - char *copy; + char *p; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; @@ -2287,27 +2344,26 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, if ((ret = ws_source_open(wtds, session, uri, config, WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0) return (ret); - ks = ws->ks; - /* Get a copy of the new name. */ - if ((copy = strdup(newuri)) == NULL) { + /* Get a copy of the new name for the WT_SOURCE structure. */ + if ((p = strdup(newuri)) == NULL) { ret = os_errno(); goto err; } free(ws->uri); - ws->uri = copy; - copy = NULL; + ws->uri = p; - /* Rename the underlying namespaces. */ - ESET(ws_source_rename_namespace( - wtds, session, uri, newuri, NULL, ks->kvs_device)); - ESET(ws_source_rename_namespace( - wtds, session, uri, newuri, WT_NAME_CACHE, ks->kvs_device)); - - /* Push the change. */ - if ((ret = kvs_commit(ws->kvs)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); + /* Rename the underlying Helium objects. */ + ESET(ws_source_name(wtds, session, newuri, NULL, &p)); + if (ret == 0) { + ESET(he_rename(ws->he, p)); + free(p); + } + ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p)); + if (ret == 0) { + ESET(he_rename(ws->he_cache, p)); + free(p); + } /* Update the metadata record. */ ESET(master_uri_rename(wtds, session, uri, newuri)); @@ -2325,17 +2381,17 @@ err: ESET(unlock(wtext, session, &ds->global_lock)); } /* - * kvs_session_truncate -- + * helium_session_truncate -- * WT_SESSION.truncate method. */ static int -kvs_session_truncate(WT_DATA_SOURCE *wtds, +helium_session_truncate(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; WT_EXTENSION_API *wtext; WT_SOURCE *ws; - int ret = 0; + int ret = 0, tret; ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; @@ -2346,47 +2402,42 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds, return (ret); /* Truncate the underlying namespaces. */ - if ((ret = kvs_truncate(ws->kvs)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret)); - if ((ret = kvs_truncate(ws->kvscache)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret)); + if ((tret = he_truncate(ws->he)) != 0) + EMSG(wtext, session, tret, + "he_truncate: %s: %s", ws->uri, he_strerror(tret)); + if ((tret = he_truncate(ws->he_cache)) != 0) + EMSG(wtext, session, tret, + "he_truncate: %s: %s", ws->uri, he_strerror(tret)); ESET(unlock(wtext, session, &ws->lock)); return (ret); } /* - * kvs_session_verify -- + * helium_session_verify -- * WT_SESSION.verify method. */ static int -kvs_session_verify(WT_DATA_SOURCE *wtds, +helium_session_verify(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) { - DATA_SOURCE *ds; - WT_EXTENSION_API *wtext; - + (void)wtds; + (void)session; (void)uri; (void)config; - - ds = (DATA_SOURCE *)wtds; - wtext = ds->wtext; - - ERET(wtext, session, ENOTSUP, "verify: %s", strerror(ENOTSUP)); + return (0); } /* - * kvs_session_checkpoint -- + * helium_session_checkpoint -- * WT_SESSION.checkpoint method. */ static int -kvs_session_checkpoint( +helium_session_checkpoint( WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config) { DATA_SOURCE *ds; - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; WT_EXTENSION_API *wtext; int ret = 0; @@ -2395,208 +2446,68 @@ kvs_session_checkpoint( ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - /* - * Flush the device. - * - * XXX - * This is a placeholder until we figure out what recovery is going - * to look like. - */ - if ((ks = ds->kvs_head) != NULL && - (ret = kvs_commit(ks->kvs_device)) != 0) - ERET(wtext, session, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); - - return (0); -} - -/* - * kvs_config_devices -- - * Convert the device list into an argv[] array. - */ -static int -kvs_config_devices( - WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *orig, char ***devices) -{ - WT_CONFIG_ITEM k, v; - WT_CONFIG_SCAN *scan; - size_t len; - u_int cnt, slots; - int ret = 0; - char **argv, **p; - - argv = NULL; - - /* Set up the scan of the device list. */ - if ((ret = wtext->config_scan_begin( - wtext, NULL, orig->str, orig->len, &scan)) != 0) - EMSG_ERR(wtext, NULL, ret, - "WT_EXTENSION_API.config_scan_begin: %s", - wtext->strerror(ret)); - - for (cnt = slots = 0; (ret = wtext-> - config_scan_next(wtext, scan, &k, &v)) == 0; ++cnt) { - if (cnt + 1 >= slots) { /* NULL-terminate the array */ - len = slots + 20 * sizeof(*argv); - if ((p = realloc(argv, len)) == NULL) { - ret = os_errno(); - goto err; - } - argv = p; - slots += 20; - } - len = k.len + 1; - if ((argv[cnt] = calloc(len, sizeof(**argv))) == NULL) { - ret = os_errno(); - goto err; - } - argv[cnt + 1] = NULL; - memcpy(argv[cnt], k.str, k.len); - } - if (ret != WT_NOTFOUND) - EMSG_ERR(wtext, NULL, ret, - "WT_EXTENSION_API.config_scan_next: %s", - wtext->strerror(ret)); - if ((ret = wtext->config_scan_end(wtext, scan)) != 0) - EMSG_ERR(wtext, NULL, ret, - "WT_EXTENSION_API.config_scan_end: %s", - wtext->strerror(ret)); + /* Flush all volumes. */ + if ((hs = ds->hs_head) != NULL && + (ret = he_commit(hs->he_volume)) != 0) + ERET(wtext, session, ret, + "he_commit: %s: %s", hs->device, he_strerror(ret)); - *devices = argv; return (0); - -err: if (argv != NULL) { - for (p = argv; *p != NULL; ++p) - free(*p); - free(argv); - } - return (ret); -} - -/* - * kvs_config_read -- - * Read KVS configuration. - */ -static int -kvs_config_read(WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *config, - char ***devices, struct kvs_config *kvs_config, int *flagsp) -{ - WT_CONFIG_ITEM k, v; - WT_CONFIG_SCAN *scan; - int ret = 0, tret; - - *flagsp = 0; /* Return default values. */ - if ((ret = kvs_default_config(kvs_config)) != 0) - ERET(wtext, NULL, - EINVAL, "kvs_default_config: %s", kvs_strerror(os_errno())); - - /* Set up the scan of the configuration arguments list. */ - if ((ret = wtext->config_scan_begin( - wtext, NULL, config->str, config->len, &scan)) != 0) - ERET(wtext, NULL, ret, - "WT_EXTENSION_API.config_scan_begin: %s", - wtext->strerror(ret)); - while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) { - if (STRING_MATCH("kvs_devices", k.str, k.len)) { - if ((ret = kvs_config_devices(wtext, &v, devices)) != 0) - return (ret); - continue; - } - -#define KVS_CONFIG_SET(s, f) \ - if (STRING_MATCH(s, k.str, k.len)) { \ - kvs_config->f = (unsigned long)v.val; \ - continue; \ - } - KVS_CONFIG_SET("kvs_parallelism", parallelism); - KVS_CONFIG_SET("kvs_granularity", granularity); - KVS_CONFIG_SET("kvs_avg_key_len", avg_key_len); - KVS_CONFIG_SET("kvs_avg_val_len", avg_val_len); - KVS_CONFIG_SET("kvs_write_bufs", write_bufs); - KVS_CONFIG_SET("kvs_read_bufs", read_bufs); - KVS_CONFIG_SET("kvs_commit_timeout", commit_timeout); - KVS_CONFIG_SET("kvs_reclaim_threshold", reclaim_threshold); - KVS_CONFIG_SET("kvs_reclaim_period", reclaim_period); - -#define KVS_FLAG_SET(s, f) \ - if (STRING_MATCH(s, k.str, k.len)) { \ - if (v.val != 0) \ - *flagsp |= f; \ - continue; \ - } - /* - * We don't export KVS_O_CREATE: WT_SESSION.create - * always adds it in. - */ - KVS_FLAG_SET("kvs_open_o_debug", KVS_O_DEBUG); - KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_TRUNCATE); - - EMSG_ERR(wtext, NULL, EINVAL, - "unknown configuration key value pair %.*s/%.*s", - (int)k.len, k.str, (int)v.len, v.str); - } - - if (ret == WT_NOTFOUND) - ret = 0; - if (ret != 0) - EMSG_ERR(wtext, NULL, ret, - "WT_EXTENSION_API.config_scan_next: %s", - wtext->strerror(ret)); - -err: if ((tret = wtext->config_scan_end(wtext, scan)) != 0) - EMSG(wtext, NULL, tret, - "WT_EXTENSION_API.config_scan_end: %s", - wtext->strerror(ret)); - - return (ret); } /* - * kvs_source_close -- - * Kill a KVS_SOURCE structure. + * helium_source_close -- + * Discard a HELIUM_SOURCE. */ static int -kvs_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, KVS_SOURCE *ks) +helium_source_close( + WT_EXTENSION_API *wtext, WT_SESSION *session, HELIUM_SOURCE *hs) { WT_SOURCE *ws; int ret = 0, tret; /* Resolve the cache into the primary one last time and quit. */ - if (ks->cleaner_id != 0) { - ks->cleaner_stop = 1; + if (hs->cleaner_id != 0) { + hs->cleaner_stop = 1; - if ((tret = pthread_join(ks->cleaner_id, NULL)) != 0) + if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0) EMSG(wtext, session, tret, "pthread_join: %s", strerror(tret)); - ks->cleaner_id = 0; + hs->cleaner_id = 0; } /* Close the underlying WiredTiger sources. */ - while ((ws = ks->ws_head) != NULL) { - ks->ws_head = ws->next; + while ((ws = hs->ws_head) != NULL) { + hs->ws_head = ws->next; ESET(ws_source_close(wtext, session, ws)); } - /* Flush and close the KVS source. */ - if (ks->kvs_device != NULL) { - if ((tret = kvs_commit(ks->kvs_device)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_commit: %s: %s", ks->name, kvs_strerror(tret)); + /* If the owner, close the database transaction store. */ + if (hs->he_txn != NULL && hs->he_owner) { + if ((tret = he_close(hs->he_txn)) != 0) + EMSG(wtext, session, tret, + "he_close: %s: %s: %s", + hs->name, WT_NAME_TXN, he_strerror(tret)); + hs->he_txn = NULL; + } - /* If the owner, close the database transaction store. */ - if (ks->kvsowner && (tret = kvs_close(ks->kvstxn)) != 0) + /* Flush and close the Helium source. */ + if (hs->he_volume != NULL) { + if ((tret = he_commit(hs->he_volume)) != 0) EMSG(wtext, session, tret, - "kvs_close: %s: %s", - WT_NAME_TXN, kvs_strerror(tret)); + "he_commit: %s: %s", + hs->device, he_strerror(tret)); - if ((tret = kvs_close(ks->kvs_device)) != 0) - EMSG(wtext, session, WT_ERROR, - "kvs_close: %s: %s", ks->name, kvs_strerror(tret)); - ks->kvs_device = NULL; + if ((tret = he_close(hs->he_volume)) != 0) + EMSG(wtext, session, tret, + "he_close: %s: %s: %s", + hs->name, WT_NAME_INIT, he_strerror(tret)); + hs->he_volume = NULL; } - free(ks->name); - OVERWRITE_AND_FREE(ks); + free(hs->name); + free(hs->device); + OVERWRITE_AND_FREE(hs); return (ret); } @@ -2609,12 +2520,12 @@ static int cache_cleaner(WT_EXTENSION_API *wtext, WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp) { - struct kvs_record *r; CACHE_RECORD *cp; CURSOR *cursor; + HE_ITEM *r; WT_SOURCE *ws; uint64_t txnid; - int locked, recovery, ret = 0; + int locked, pushed, recovery, ret = 0; /* * Called in two ways: in normal processing mode where we're supplied a @@ -2633,14 +2544,14 @@ cache_cleaner(WT_EXTENSION_API *wtext, cursor = (CURSOR *)wtcursor; ws = cursor->ws; r = &cursor->record; - locked = 0; + locked = pushed = 0; /* * For every cache key where all updates are globally visible: * Migrate the most recent update value to the primary store. */ for (r->key_len = 0; (ret = - kvs_call(wtcursor, "kvs_next", ws->kvscache, kvs_next)) == 0;) { + helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) { /* * Unmarshall the value, and if all of the updates are globally * visible, update the primary with the last committed update. @@ -2660,8 +2571,10 @@ cache_cleaner(WT_EXTENSION_API *wtext, cache_value_last_not_aborted(wtcursor, &cp); if (cp == NULL) continue; + + pushed = 1; if (cp->remove) { - if ((ret = kvs_del(ws->kvs, r)) == 0) + if ((ret = he_delete(ws->he, r)) == 0) continue; /* @@ -2669,35 +2582,50 @@ cache_cleaner(WT_EXTENSION_API *wtext, * primary at all, that is, an insert and remove pair * may be confined to the cache. */ - if (ret == KVS_E_KEY_NOT_FOUND) { + if (ret == HE_ERR_ITEM_NOT_FOUND) { ret = 0; continue; } - ERET(wtext, NULL, WT_ERROR, - "kvs_del: %s", kvs_strerror(ret)); + ERET(wtext, NULL, ret, + "he_delete: %s", he_strerror(ret)); } else { r->val = cp->v; r->val_len = cp->len; - if ((ret = kvs_set(ws->kvs, r)) == 0) + /* + * If compression configured for this datastore, set the + * compression flag, we're updating the "real" store. + */ + if (ws->config_compress) + r->flags |= HE_I_COMPRESS; + ret = he_update(ws->he, r); + r->flags = 0; + if (ret == 0) continue; - ERET(wtext, NULL, WT_ERROR, - "kvs_set: %s", kvs_strerror(ret)); + + ERET(wtext, NULL, ret, + "he_update: %s", he_strerror(ret)); } } if (ret == WT_NOTFOUND) ret = 0; if (ret != 0) - ERET(wtext, NULL, WT_ERROR, - "kvs_next: %s", kvs_strerror(ret)); + ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret)); + + /* + * If we didn't move any keys from the cache to the primary, quit. It's + * possible we could still remove values from the cache, but not likely, + * and another pass would probably be wasted effort (especially locked). + */ + if (!pushed) + return (0); /* * Push the store to stable storage for correctness. (It doesn't matter - * what Memrata handle we push, so we just push one of them.) + * what Helium handle we commit, so we just commit one of them.) */ - if ((ret = kvs_commit(ws->kvs)) != 0) - ERET(wtext, NULL, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); + if ((ret = he_commit(ws->he)) != 0) + ERET(wtext, NULL, ret, "he_commit: %s", he_strerror(ret)); /* * If we're performing recovery, that's all we need to do, we're going @@ -2719,7 +2647,7 @@ cache_cleaner(WT_EXTENSION_API *wtext, locked = 1; for (r->key_len = 0; (ret = - kvs_call(wtcursor, "kvs_next", ws->kvscache, kvs_next)) == 0;) { + helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) { /* * Unmarshall the value, and if all of the updates are globally * visible, remove the cache entry. @@ -2727,9 +2655,9 @@ cache_cleaner(WT_EXTENSION_API *wtext, if ((ret = cache_value_unmarshall(wtcursor)) != 0) goto err; if (cache_value_visible_all(wtcursor, oldest)) { - if ((ret = kvs_del(ws->kvscache, r)) != 0) - EMSG_ERR(wtext, NULL, WT_ERROR, - "kvs_del: %s", kvs_strerror(ret)); + if ((ret = he_delete(ws->he_cache, r)) != 0) + EMSG_ERR(wtext, NULL, ret, + "he_delete: %s", he_strerror(ret)); continue; } @@ -2752,8 +2680,7 @@ cache_cleaner(WT_EXTENSION_API *wtext, if (ret == WT_NOTFOUND) ret = 0; if (ret != 0) - EMSG_ERR(wtext, NULL, WT_ERROR, - "kvs_next: %s", kvs_strerror(ret)); + EMSG_ERR(wtext, NULL, ret, "he_next: %s", he_strerror(ret)); err: if (locked) ESET(unlock(wtext, NULL, &ws->lock)); @@ -2766,11 +2693,11 @@ err: if (locked) * Discard no longer needed entries from the transaction store. */ static int -txn_cleaner(WT_CURSOR *wtcursor, kvs_t kvstxn, uint64_t txnmin) +txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin) { CURSOR *cursor; + HE_ITEM *r; WT_EXTENSION_API *wtext; - struct kvs_record *r; uint64_t txnid; int ret = 0; @@ -2783,23 +2710,23 @@ txn_cleaner(WT_CURSOR *wtcursor, kvs_t kvstxn, uint64_t txnmin) * oldest transaction ID that appears anywhere in any cache. */ for (r->key_len = 0; - (ret = kvs_call(wtcursor, "kvs_next", kvstxn, kvs_next)) == 0;) { + (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) { memcpy(&txnid, r->key, sizeof(txnid)); - if (txnid < txnmin && (ret = kvs_del(kvstxn, r)) != 0) - ERET(wtext, NULL, WT_ERROR, - "kvs_del: %s", kvs_strerror(ret)); + if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0) + ERET(wtext, NULL, ret, + "he_delete: %s", he_strerror(ret)); } if (ret == WT_NOTFOUND) ret = 0; if (ret != 0) - ERET(wtext, NULL, WT_ERROR, "kvs_next: %s", kvs_strerror(ret)); + ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret)); return (0); } /* * fake_cursor -- - * Fake up enough of a cursor to do KVS operations. + * Fake up enough of a cursor to do Helium operations. */ static int fake_cursor(WT_EXTENSION_API *wtext, WT_CURSOR **wtcursorp) @@ -2832,49 +2759,32 @@ fake_cursor(WT_EXTENSION_API *wtext, WT_CURSOR **wtcursorp) } /* - * kvs_cleaner -- + * cache_cleaner_worker -- * Thread to migrate data from the cache to the primary. */ static void * -kvs_cleaner(void *arg) +cache_cleaner_worker(void *arg) { struct timeval t; CURSOR *cursor; - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; + HE_STATS stats; WT_CURSOR *wtcursor; WT_EXTENSION_API *wtext; WT_SOURCE *ws; uint64_t oldest, txnmin, txntmp; int cleaner_stop, delay, ret = 0; - ks = (KVS_SOURCE *)arg; + hs = (HELIUM_SOURCE *)arg; cursor = NULL; - wtext = ks->wtext; + wtext = hs->wtext; if ((ret = fake_cursor(wtext, &wtcursor)) != 0) - EMSG_ERR(wtext, NULL, ret, "kvs_cleaner: %s", strerror(ret)); + EMSG_ERR(wtext, NULL, ret, "cleaner: %s", strerror(ret)); cursor = (CURSOR *)wtcursor; - for (delay = 1;;) { - /* - * Check the underlying caches for either a number of operations - * or a number of bytes. It's more expensive to return values - * from the cache (because we have to marshall/unmarshall them), - * but there's no information yet on how to tune the values. - * - * For now, use 10MB as the limit, and a corresponding number of - * operations, assuming roughly 40B per key/value pair. - */ -#undef BYTELIMIT -#define BYTELIMIT (10 * 1048576) -#undef OPLIMIT -#define OPLIMIT (BYTELIMIT / (2 * 20)) - for (ws = ks->ws_head; ws != NULL; ws = ws->next) - if (ws->cleaner_ops > OPLIMIT || - ws->cleaner_bytes > BYTELIMIT) - break; - + for (cleaner_stop = delay = 0; !cleaner_stop;) { /* * Check if this will be the final run; cleaner_stop is declared * volatile, and so the read will happen. We don't much care if @@ -2882,16 +2792,45 @@ kvs_cleaner(void *arg) * and finds the variable set. Store the read locally, reading * the variable twice might race. */ - cleaner_stop = ks->cleaner_stop; - if (ws == NULL && !cleaner_stop) { - if (delay < 5) /* At least every 5 seconds. */ - ++delay; + cleaner_stop = hs->cleaner_stop; + + /* + * Delay if this isn't the final run and the last pass didn't + * find any work to do. + */ + if (!cleaner_stop && delay != 0) { t.tv_sec = delay; t.tv_usec = 0; (void)select(0, NULL, NULL, NULL, &t); - continue; } + /* Run at least every 5 seconds. */ + if (delay < 5) + ++delay; + + /* + * Clean the datastore caches, depending on their size. It's + * both more and less expensive to return values from the cache: + * more because we have to marshall/unmarshall the values, less + * because there's only a single call, to the cache store rather + * one to the cache and one to the primary. I have no turning + * information, for now simply set the limit at 50MB. + */ +#undef CACHE_SIZE_TRIGGER +#define CACHE_SIZE_TRIGGER (50 * 1048576) + for (ws = hs->ws_head; ws != NULL; ws = ws->next) { + if ((ret = he_stats(ws->he_cache, &stats)) != 0) + EMSG_ERR(wtext, NULL, + ret, "he_stats: %s", he_strerror(ret)); + if (stats.size > CACHE_SIZE_TRIGGER) + break; + } + if (!cleaner_stop && ws == NULL) + continue; + + /* There was work to do, don't delay before checking again. */ + delay = 0; + /* * Get the oldest transaction ID not yet visible to a running * transaction. Do this before doing anything else, avoiding @@ -2900,11 +2839,14 @@ kvs_cleaner(void *arg) oldest = wtext->transaction_oldest(wtext); /* + * If any cache needs cleaning, clean them all, because we have + * to know the minimum transaction ID referenced by any cache. + * * For each cache/primary pair, migrate whatever records we can, * tracking the lowest transaction ID of any entry in any cache. */ txnmin = UINT64_MAX; - for (ws = ks->ws_head; ws != NULL; ws = ws->next) { + for (ws = hs->ws_head; ws != NULL; ws = ws->next) { cursor->ws = ws; if ((ret = cache_cleaner( wtext, wtcursor, oldest, &txntmp)) != 0) @@ -2923,11 +2865,8 @@ kvs_cleaner(void *arg) * problem here. */ cursor->ws = NULL; - if ((ret = txn_cleaner(wtcursor, ks->kvstxn, txnmin)) != 0) + if ((ret = txn_cleaner(wtcursor, hs->he_txn, txnmin)) != 0) goto err; - - if (cleaner_stop) - break; } err: cursor_destroy(cursor); @@ -2935,152 +2874,219 @@ err: cursor_destroy(cursor); } /* - * kvs_source_open -- - * Allocate and open a KVS source. + * helium_config_read -- + * Parse the Helium configuration. */ static int -kvs_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v) +helium_config_read(WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *config, + char **devicep, HE_ENV *envp, int *env_setp, int *flagsp) { - struct kvs_config kvs_config; - KVS_SOURCE *ks; + WT_CONFIG_ITEM k, v; + WT_CONFIG_SCAN *scan; + int ret = 0, tret; + + *env_setp = 0; + *flagsp = 0; + + /* Set up the scan of the configuration arguments list. */ + if ((ret = wtext->config_scan_begin( + wtext, NULL, config->str, config->len, &scan)) != 0) + ERET(wtext, NULL, ret, + "WT_EXTENSION_API.config_scan_begin: %s", + wtext->strerror(ret)); + while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) { + if (string_match("helium_devices", k.str, k.len)) { + if ((*devicep = calloc(1, v.len + 1)) == NULL) + return (os_errno()); + memcpy(*devicep, v.str, v.len); + continue; + } + if (string_match("helium_env_read_cache_size", k.str, k.len)) { + envp->read_cache_size = (uint64_t)v.val; + *env_setp = 1; + continue; + } + if (string_match("helium_env_write_cache_size", k.str, k.len)) { + envp->write_cache_size = (uint64_t)v.val; + *env_setp = 1; + continue; + } + if (string_match("helium_o_volume_truncate", k.str, k.len)) { + if (v.val != 0) + *flagsp |= HE_O_VOLUME_TRUNCATE; + continue; + } + EMSG_ERR(wtext, NULL, EINVAL, + "unknown configuration key value pair %.*s=%.*s", + (int)k.len, k.str, (int)v.len, v.str); + } + if (ret == WT_NOTFOUND) + ret = 0; + if (ret != 0) + EMSG_ERR(wtext, NULL, ret, + "WT_EXTENSION_API.config_scan_next: %s", + wtext->strerror(ret)); + +err: if ((tret = wtext->config_scan_end(wtext, scan)) != 0) + EMSG(wtext, NULL, tret, + "WT_EXTENSION_API.config_scan_end: %s", + wtext->strerror(tret)); + + return (ret); +} + +/* + * helium_source_open -- + * Allocate and open a Helium source. + */ +static int +helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v) +{ + struct he_env env; + HELIUM_SOURCE *hs; WT_EXTENSION_API *wtext; - int flags, ret = 0; - char **device_list, **p; + int env_set, flags, ret = 0; wtext = ds->wtext; + hs = NULL; - ks = NULL; - device_list = NULL; + VMSG(wtext, NULL, VERBOSE_L1, "volume %.*s=%.*s", + (int)k->len, k->str, (int)v->len, v->str); - /* Check for a KVS source we've already opened. */ - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if (STRING_MATCH(ks->name, k->str, k->len)) + /* + * Check for a Helium source we've already opened: we don't check the + * value (which implies you can open the same underlying stores using + * more than one name, but I don't know of any problems that causes), + * we only check the key, that is, the top-level WiredTiger name. + */ + for (hs = ds->hs_head; hs != NULL; hs = hs->next) + if (string_match(hs->name, k->str, k->len)) ERET(wtext, NULL, - EINVAL, "%s: device already open", ks->name); + EINVAL, "%s: device already open", hs->name); - /* Allocate and initialize a new underlying KVS source object. */ - if ((ks = calloc(1, sizeof(*ks))) == NULL || - (ks->name = calloc(1, k->len + 1)) == NULL) { - free(ks); + /* Allocate and initialize a new underlying Helium source object. */ + if ((hs = calloc(1, sizeof(*hs))) == NULL || + (hs->name = calloc(1, k->len + 1)) == NULL) { + free(hs); return (os_errno()); } - memcpy(ks->name, k->str, k->len); - - ks->txn_notify.notify = txn_notify; - ks->wtext = wtext; - - /* - * Read the configuration. We require a list of devices underlying the - * KVS source, parse the device list found in the configuration string - * into an array of paths. - */ - if ((ret = - kvs_config_read(wtext, v, &device_list, &kvs_config, &flags)) != 0) + memcpy(hs->name, k->str, k->len); + hs->txn_notify.notify = txn_notify; + hs->wtext = wtext; + + /* Read the configuration, require a device naming the Helium store. */ + memset(&env, 0, sizeof(env)); + if ((ret = helium_config_read( + wtext, v, &hs->device, &env, &env_set, &flags)) != 0) goto err; - if (device_list == NULL || device_list[0] == NULL) + if (hs->device == NULL) EMSG_ERR(wtext, NULL, - EINVAL, "%s: no devices specified", ks->name); + EINVAL, "%s: no Helium volumes specified", hs->name); - /* Open the underlying KVS store (creating it if necessary). */ - ks->kvs_device = - kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE); - if (ks->kvs_device == NULL) - EMSG_ERR(wtext, NULL, WT_ERROR, - "kvs_open: %s: %s", ks->name, kvs_strerror(os_errno())); + /* + * Open the Helium volume, creating it if necessary. We have to open + * an object at the same time, that's why we have object flags as well + * as volume flags. + */ + flags |= HE_O_CREATE | + HE_O_TRUNCATE | HE_O_VOLUME_CLEAN | HE_O_VOLUME_CREATE; + if ((hs->he_volume = he_open( + hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) { + ret = os_errno(); + EMSG_ERR(wtext, NULL, ret, + "he_open: %s: %s: %s", + hs->name, WT_NAME_INIT, he_strerror(ret)); + } /* Insert the new entry at the head of the list. */ - ks->next = ds->kvs_head; - ds->kvs_head = ks; + hs->next = ds->hs_head; + ds->hs_head = hs; if (0) { -err: if (ks != NULL) - ESET(kvs_source_close(wtext, NULL, ks)); +err: if (hs != NULL) + ESET(helium_source_close(wtext, NULL, hs)); } - - if (device_list != NULL) { - for (p = device_list; *p != NULL; ++p) - free(*p); - free(device_list); - } - return (ret); } /* - * kvs_source_open_txn -- + * helium_source_open_txn -- * Open the database-wide transaction store. */ static int -kvs_source_open_txn(DATA_SOURCE *ds) +helium_source_open_txn(DATA_SOURCE *ds) { - KVS_SOURCE *ks, *kstxn; + HELIUM_SOURCE *hs, *hs_txn; WT_EXTENSION_API *wtext; - kvs_t kvstxn, t; + he_t he_txn, t; int ret = 0; wtext = ds->wtext; /* - * The global txn namespace is per connection, it spans multiple KVS + * The global txn namespace is per connection, it spans multiple Helium * sources. * - * We've opened the KVS sources: check to see if any of them already + * We've opened the Helium sources: check to see if any of them already * have a transaction store, and make sure we only find one. */ - kstxn = NULL; - kvstxn = NULL; - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if ((t = kvs_open_namespace( - ks->kvs_device, WT_NAME_TXN, 0)) != NULL) { - if (kstxn != NULL) { - (void)kvs_close(t); - (void)kvs_close(kvstxn); - ERET(wtext, NULL, WT_ERROR, + hs_txn = NULL; + he_txn = NULL; + for (hs = ds->hs_head; hs != NULL; hs = hs->next) + if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) { + if (hs_txn != NULL) { + (void)he_close(t); + (void)he_close(hs_txn); + ERET(wtext, NULL, WT_PANIC, "found multiple transaction stores, " "unable to proceed"); } - kvstxn = t; - kstxn = ks; + he_txn = t; + hs_txn = hs; } /* * If we didn't find a transaction store, open a transaction store in - * the first KVS source we loaded. (It could just as easily be the - * last one we loaded, we're just picking one, but picking the first + * the first Helium source we loaded. (It could just as easily be + * the last one we loaded, we're just picking one, but picking the first * seems slightly less likely to make people wonder.) */ - if ((ks = kstxn) == NULL) { - for (ks = ds->kvs_head; ks->next != NULL; ks = ks->next) + if ((hs = hs_txn) == NULL) { + for (hs = ds->hs_head; hs->next != NULL; hs = hs->next) ; - if ((kvstxn = kvs_open_namespace( - ks->kvs_device, WT_NAME_TXN, KVS_O_CREATE)) == NULL) - ERET(wtext, NULL, WT_ERROR, - "kvs_open_namespace: %s: %s", - WT_NAME_TXN, kvs_strerror(os_errno())); + if ((he_txn = he_open( + hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) { + ret = os_errno(); + ERET(wtext, NULL, ret, + "he_open: %s: %s: %s", + hs->name, WT_NAME_TXN, he_strerror(ret)); + } /* Push the change. */ - if ((ret = kvs_commit(ks->kvs_device)) != 0) - ERET(wtext, NULL, WT_ERROR, - "kvs_commit: %s", kvs_strerror(ret)); + if ((ret = he_commit(he_txn)) != 0) + ERET(wtext, NULL, ret, + "he_commit: %s", he_strerror(ret)); } + VMSG(wtext, NULL, VERBOSE_L1, "%s" "transactional store on %s", + hs_txn == NULL ? "creating " : "", hs->name); - /* Set the owner field, this KVS source has to be closed last. */ - ks->kvsowner = 1; + /* Set the owner field, this Helium source has to be closed last. */ + hs->he_owner = 1; - /* Add a reference to the open transaction store in each KVS source. */ - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - ks->kvstxn = kvstxn; + /* Add a reference to the transaction store in each Helium source. */ + for (hs = ds->hs_head; hs != NULL; hs = hs->next) + hs->he_txn = he_txn; return (0); } /* - * kvs_source_recover_namespace -- - * Recover a single cache/primary pair in a KVS namespace. + * helium_source_recover_namespace -- + * Recover a single cache/primary pair in a Helium namespace. */ static int -kvs_source_recover_namespace(WT_DATA_SOURCE *wtds, - KVS_SOURCE *ks, const char *name, WT_CONFIG_ARG *config) +helium_source_recover_namespace(WT_DATA_SOURCE *wtds, + HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config) { CURSOR *cursor; DATA_SOURCE *ds; @@ -3099,17 +3105,17 @@ kvs_source_recover_namespace(WT_DATA_SOURCE *wtds, uri = NULL; /* - * The name we store on the Memrata device is a translation of the + * The name we store on the Helium device is a translation of the * WiredTiger name: do the reverse process here so we can use the * standard source-open function. */ - p = name + (sizeof(WT_NAME_PREFIX) - 1); - len = strlen("memrata:") + strlen(ks->name) + strlen(p) + 10; + p = name + strlen(WT_NAME_PREFIX); + len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10; if ((uri = malloc(len)) == NULL) { ret = os_errno(); goto err; } - (void)snprintf(uri, len, "memrata:%s/%s", ks->name, p); + (void)snprintf(uri, len, "helium:%s/%s", hs->name, p); /* * Open the cache/primary pair by going through the full open process, @@ -3122,21 +3128,20 @@ kvs_source_recover_namespace(WT_DATA_SOURCE *wtds, /* Fake up a cursor. */ if ((ret = fake_cursor(wtext, &wtcursor)) != 0) - EMSG_ERR(wtext, NULL, ret, - "kvs_source_recover_namespace: %s", strerror(ret)); + EMSG_ERR(wtext, NULL, ret, "recovery: %s", strerror(ret)); cursor = (CURSOR *)wtcursor; cursor->ws = ws; /* Process, then clear, the cache. */ if ((ret = cache_cleaner(wtext, wtcursor, 0, NULL)) != 0) goto err; - if ((ret = kvs_truncate(ws->kvscache)) != 0) - EMSG_ERR(wtext, NULL, WT_ERROR, - "kvs_truncate: %s(cache): %s", ws->uri, kvs_strerror(ret)); + if ((ret = he_truncate(ws->he_cache)) != 0) + EMSG_ERR(wtext, NULL, ret, + "he_truncate: %s(cache): %s", ws->uri, he_strerror(ret)); /* Close the underlying WiredTiger sources. */ -err: while ((ws = ks->ws_head) != NULL) { - ks->ws_head = ws->next; +err: while ((ws = hs->ws_head) != NULL) { + hs->ws_head = ws->next; ESET(ws_source_close(wtext, NULL, ws)); } @@ -3146,36 +3151,36 @@ err: while ((ws = ks->ws_head) != NULL) { return (ret); } -struct kvs_namespace_cookie { +struct helium_namespace_cookie { char **list; u_int list_cnt; u_int list_max; }; /* - * kvs_namespace_list -- + * helium_namespace_list -- * Get a list of the objects we're going to recover. */ static int -kvs_namespace_list(void *cookie, const char *name) +helium_namespace_list(void *cookie, const char *name) { - struct kvs_namespace_cookie *names; - const char *p; + struct helium_namespace_cookie *names; void *allocp; names = cookie; - /* Ignore any files without a WiredTiger prefix. */ - if (strncmp(name, WT_NAME_PREFIX, sizeof(WT_NAME_PREFIX) - 1) != 0) + /* + * Ignore any files without a WiredTiger prefix. + * Ignore the metadata and cache files. + */ + if (!prefix_match(name, WT_NAME_PREFIX)) + return (0); + if (strcmp(name, WT_NAME_INIT) == 0) return (0); - - /* Ignore the transaction store. */ if (strcmp(name, WT_NAME_TXN) == 0) return (0); - - /* Ignore the "cache" files. */ - p = name + (sizeof(WT_NAME_PREFIX) - 1); - if ((p = strchr(p, '.')) != NULL && strcmp(p, WT_NAME_CACHE) == 0) + if (string_match( + strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE))) return (0); if (names->list_cnt + 1 >= names->list_max) { @@ -3193,13 +3198,14 @@ kvs_namespace_list(void *cookie, const char *name) } /* - * kvs_source_recover -- - * Recover the KVS source. + * helium_source_recover -- + * Recover the HELIUM_SOURCE. */ static int -kvs_source_recover(WT_DATA_SOURCE *wtds, KVS_SOURCE *ks, WT_CONFIG_ARG *config) +helium_source_recover( + WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config) { - struct kvs_namespace_cookie names; + struct helium_namespace_cookie names; DATA_SOURCE *ds; WT_EXTENSION_API *wtext; u_int i; @@ -3207,25 +3213,27 @@ kvs_source_recover(WT_DATA_SOURCE *wtds, KVS_SOURCE *ks, WT_CONFIG_ARG *config) ds = (DATA_SOURCE *)wtds; wtext = ds->wtext; - memset(&names, 0, sizeof(names)); - /* Get a list of the cache/primary object pairs in the KVS source. */ - if ((ret = kvs_namespaces( - ks->kvs_device, kvs_namespace_list, &names)) != 0) - ERET(wtext, NULL, WT_ERROR, - "kvs_namespaces: %s: %s", ks->name, kvs_strerror(ret)); + VMSG(wtext, NULL, VERBOSE_L1, "recover %s", hs->name); + + /* Get a list of the cache/primary object pairs in the Helium source. */ + if ((ret = he_enumerate( + hs->device, helium_namespace_list, &names)) != 0) + ERET(wtext, NULL, ret, + "he_enumerate: %s: %s", hs->name, he_strerror(ret)); /* Recover the objects. */ for (i = 0; i < names.list_cnt; ++i) - if ((ret = kvs_source_recover_namespace( - wtds, ks, names.list[i], config)) != 0) + if ((ret = helium_source_recover_namespace( + wtds, hs, names.list[i], config)) != 0) goto err; /* Clear the transaction store. */ - if ((ret = kvs_truncate(ks->kvstxn)) != 0) - EMSG_ERR(wtext, NULL, WT_ERROR, - "kvs_truncate: %s: %s", WT_NAME_TXN, kvs_strerror(ret)); + if ((ret = he_truncate(hs->he_txn)) != 0) + EMSG_ERR(wtext, NULL, ret, + "he_truncate: %s: %s: %s", + hs->name, WT_NAME_TXN, he_strerror(ret)); err: for (i = 0; i < names.list_cnt; ++i) free(names.list[i]); @@ -3235,14 +3243,14 @@ err: for (i = 0; i < names.list_cnt; ++i) } /* - * kvs_terminate -- + * helium_terminate -- * Unload the data-source. */ static int -kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) +helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) { DATA_SOURCE *ds; - KVS_SOURCE *ks, *last; + HELIUM_SOURCE *hs, *last; WT_EXTENSION_API *wtext; int ret = 0; @@ -3254,20 +3262,20 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) ret = writelock(wtext, session, &ds->global_lock); /* - * Close the KVS sources, close the KVS source that "owns" the + * Close the Helium sources, close the Helium source that "owns" the * database transaction store last. */ last = NULL; - while ((ks = ds->kvs_head) != NULL) { - ds->kvs_head = ks->next; - if (ks->kvsowner) { - last = ks; + while ((hs = ds->hs_head) != NULL) { + ds->hs_head = hs->next; + if (hs->he_owner) { + last = hs; continue; } - ESET(kvs_source_close(wtext, session, ks)); + ESET(helium_source_close(wtext, session, hs)); } if (last != NULL) - ESET(kvs_source_close(wtext, session, last)); + ESET(helium_source_close(wtext, session, last)); /* Unlock and destroy the system. */ if (ds->lockinit) { @@ -3282,7 +3290,7 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) /* * wiredtiger_extension_init -- - * Initialize the KVS connector code. + * Initialize the Helium connector code. */ int wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) @@ -3291,44 +3299,47 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) * List of the WT_DATA_SOURCE methods -- it's static so it breaks at * compile-time should the structure change underneath us. */ - static WT_DATA_SOURCE wtds = { - kvs_session_create, /* session.create */ + static const WT_DATA_SOURCE wtds = { + helium_session_create, /* session.create */ NULL, /* No session.compaction */ - kvs_session_drop, /* session.drop */ - kvs_session_open_cursor, /* session.open_cursor */ - kvs_session_rename, /* session.rename */ + helium_session_drop, /* session.drop */ + helium_session_open_cursor, /* session.open_cursor */ + helium_session_rename, /* session.rename */ NULL, /* No session.salvage */ - kvs_session_truncate, /* session.truncate */ + helium_session_truncate, /* session.truncate */ NULL, /* No session.range_truncate */ - kvs_session_verify, /* session.verify */ - kvs_session_checkpoint, /* session.checkpoint */ - kvs_terminate /* termination */ + helium_session_verify, /* session.verify */ + helium_session_checkpoint, /* session.checkpoint */ + helium_terminate /* termination */ }; static const char *session_create_opts[] = { - "kvs_open_o_truncate=0", - "kvs_open_o_debug=0", + "helium_o_compress=0", /* HE_I_COMPRESS */ + "helium_o_truncate=0", /* HE_O_TRUNCATE */ NULL }; DATA_SOURCE *ds; - KVS_SOURCE *ks; + HELIUM_SOURCE *hs; WT_CONFIG_ITEM k, v; WT_CONFIG_SCAN *scan; WT_EXTENSION_API *wtext; - int ret = 0; + int vmajor, vminor, ret = 0; const char **p; - (void)config; /* Unused parameters */ - ds = NULL; - /* Acquire the extension API */ + wtext = connection->get_extension_api(connection); /* Check the library version */ -#if KVS_VERSION_MAJOR != 4 || KVS_VERSION_MINOR != 13 +#if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 2 ERET(wtext, NULL, EINVAL, - "unsupported KVS library version %d.%d, expected version 4.13", - KVS_VERSION_MAJOR, KVS_VERSION_MINOR); + "unsupported Levyx/Helium header file %d.%d, expected version 2.2", + HE_VERSION_MAJOR, HE_VERSION_MINOR); #endif + he_version(&vmajor, &vminor); + if (vmajor != 2 || vminor != 2) + ERET(wtext, NULL, EINVAL, + "unsupported Levyx/Helium library version %d.%d, expected " + "version 2.2", vmajor, vminor); /* Allocate and initialize the local data-source structure. */ if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL) @@ -3345,15 +3356,20 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) "WT_EXTENSION_API.config_get: config: %s", wtext->strerror(ret)); - /* Step through the list of KVS sources, opening each one. */ + /* Step through the list of Helium sources, opening each one. */ if ((ret = wtext->config_scan_begin(wtext, NULL, v.str, v.len, &scan)) != 0) EMSG_ERR(wtext, NULL, ret, "WT_EXTENSION_API.config_scan_begin: config: %s", wtext->strerror(ret)); - while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) - if ((ret = kvs_source_open(ds, &k, &v)) != 0) + while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) { + if (string_match("helium_verbose", k.str, k.len)) { + verbose = v.val == 0 ? 0 : 1; + continue; + } + if ((ret = helium_source_open(ds, &k, &v)) != 0) goto err; + } if (ret != WT_NOTFOUND) EMSG_ERR(wtext, NULL, ret, "WT_EXTENSION_API.config_scan_next: config: %s", @@ -3364,26 +3380,26 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) wtext->strerror(ret)); /* Find and open the database transaction store. */ - if ((ret = kvs_source_open_txn(ds)) != 0) + if ((ret = helium_source_open_txn(ds)) != 0) return (ret); - /* Recover each KVS source. */ - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) - if ((ret = kvs_source_recover(&ds->wtds, ks, config)) != 0) + /* Recover each Helium source. */ + for (hs = ds->hs_head; hs != NULL; hs = hs->next) + if ((ret = helium_source_recover(&ds->wtds, hs, config)) != 0) goto err; - /* Start each KVS source cleaner thread. */ - for (ks = ds->kvs_head; ks != NULL; ks = ks->next) + /* Start each Helium source cleaner thread. */ + for (hs = ds->hs_head; hs != NULL; hs = hs->next) if ((ret = pthread_create( - &ks->cleaner_id, NULL, kvs_cleaner, ks)) != 0) + &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0) EMSG_ERR(wtext, NULL, ret, "%s: pthread_create: cleaner thread: %s", - ks->name, strerror(ret)); + hs->name, strerror(ret)); - /* Add KVS-specific configuration options. */ + /* Add Helium-specific WT_SESSION.create configuration options. */ for (p = session_create_opts; *p != NULL; ++p) if ((ret = connection->configure_method(connection, - "session.create", "memrata:", *p, "boolean", NULL)) != 0) + "session.create", "helium:", *p, "boolean", NULL)) != 0) EMSG_ERR(wtext, NULL, ret, "WT_CONNECTION.configure_method: session.create: " "%s: %s", @@ -3391,19 +3407,19 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) /* Add the data source */ if ((ret = connection->add_data_source( - connection, "memrata:", (WT_DATA_SOURCE *)ds, NULL)) != 0) + connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0) EMSG_ERR(wtext, NULL, ret, "WT_CONNECTION.add_data_source: %s", wtext->strerror(ret)); return (0); err: if (ds != NULL) - ESET(kvs_terminate((WT_DATA_SOURCE *)ds, NULL)); + ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL)); return (ret); } /* * wiredtiger_extension_terminate -- - * Shutdown the KVS connector code. + * Shutdown the Helium connector code. */ int wiredtiger_extension_terminate(WT_CONNECTION *connection) diff --git a/ext/test/memrata/Makefile.am b/ext/test/memrata/Makefile.am deleted file mode 100644 index 1962680d3fe..00000000000 --- a/ext/test/memrata/Makefile.am +++ /dev/null @@ -1,12 +0,0 @@ -AM_CPPFLAGS = -I$(top_builddir) \ - -I$(top_srcdir)/src/include -I$(MEMRATA_PATH) - -noinst_LTLIBRARIES = libwiredtiger_memrata.la -libwiredtiger_memrata_la_SOURCES = memrata.c -libwiredtiger_memrata_la_LIBADD = \ - -L$(MEMRATA_PATH) -lkvs -L$(BERKELEY_DB_PATH)/lib -ldb - -# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well -# as installation, it will only build static libraries. As far as I can tell, -# the "approved" libtool way to turn them back on is by adding -rpath. -libwiredtiger_memrata_la_LDFLAGS = -avoid-version -module -rpath /nowhere |