summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/compressors/bzip2/bzip2_compress.c49
-rw-r--r--ext/compressors/zlib/Makefile.am6
-rw-r--r--ext/compressors/zlib/zlib_compress.c367
-rw-r--r--ext/datasources/helium/Makefile.am11
-rw-r--r--ext/datasources/helium/README (renamed from ext/test/memrata/README)62
-rw-r--r--ext/datasources/helium/helium.c (renamed from ext/test/memrata/memrata.c)1708
-rw-r--r--ext/test/memrata/Makefile.am12
7 files changed, 1308 insertions, 907 deletions
diff --git a/ext/compressors/bzip2/bzip2_compress.c b/ext/compressors/bzip2/bzip2_compress.c
index 73e9ef3a932..dd97e2abee3 100644
--- a/ext/compressors/bzip2/bzip2_compress.c
+++ b/ext/compressors/bzip2/bzip2_compress.c
@@ -42,12 +42,10 @@ bzip2_decompress(WT_COMPRESSOR *, WT_SESSION *,
uint8_t *, size_t, uint8_t *, size_t, size_t *);
static int
bzip2_terminate(WT_COMPRESSOR *, WT_SESSION *);
-#ifdef WIREDTIGER_TEST_COMPRESS_RAW
static int
bzip2_compress_raw(WT_COMPRESSOR *, WT_SESSION *, size_t, int,
size_t, uint8_t *, uint32_t *, uint32_t, uint8_t *, size_t, int,
size_t *, uint32_t *);
-#endif
/* Local compressor structure. */
typedef struct {
@@ -70,18 +68,26 @@ typedef struct {
WT_SESSION *session;
} BZIP_OPAQUE;
-int
-wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+/*
+ * bzip2_add_compressor --
+ * Add a bzip2 compressor.
+ */
+static int
+bzip2_add_compressor(WT_CONNECTION *connection, int raw, const char *name)
{
BZIP_COMPRESSOR *bzip_compressor;
- (void)config; /* Unused parameters */
-
+ /*
+ * There are two almost identical bzip2 compressors: one supporting raw
+ * compression (used by test/format to test raw compression), the other
+ * without raw compression, that might be useful for real applications.
+ */
if ((bzip_compressor = calloc(1, sizeof(BZIP_COMPRESSOR))) == NULL)
return (errno);
bzip_compressor->compressor.compress = bzip2_compress;
- bzip_compressor->compressor.compress_raw = NULL;
+ bzip_compressor->
+ compressor.compress_raw = raw ? bzip2_compress_raw : NULL;
bzip_compressor->compressor.decompress = bzip2_decompress;
bzip_compressor->compressor.pre_size = NULL;
bzip_compressor->compressor.terminate = bzip2_terminate;
@@ -109,15 +115,22 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
*/
bzip_compressor->bz_small = 0;
- /* Load the compressor */
-#ifdef WIREDTIGER_TEST_COMPRESS_RAW
- bzip_compressor->compressor.compress_raw = bzip2_compress_raw;
- return (connection->add_compressor(
- connection, "raw", (WT_COMPRESSOR *)bzip_compressor, NULL));
-#else
- return (connection->add_compressor(
- connection, "bzip2", (WT_COMPRESSOR *)bzip_compressor, NULL));
-#endif
+ return (connection->add_compressor( /* Load the compressor */
+ connection, name, (WT_COMPRESSOR *)bzip_compressor, NULL));
+}
+
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ int ret;
+
+ (void)config; /* Unused parameters */
+
+ if ((ret = bzip2_add_compressor(connection, 0, "bzip2")) != 0)
+ return (ret);
+ if ((ret = bzip2_add_compressor(connection, 1, "bzip2-raw-test")) != 0)
+ return (ret);
+ return (0);
}
/*
@@ -238,7 +251,6 @@ bzip2_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
return (0);
}
-#ifdef WIREDTIGER_TEST_COMPRESS_RAW
/*
* __bzip2_compress_raw_random --
* Return a 32-bit pseudo-random number.
@@ -328,9 +340,8 @@ bzip2_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session,
(uintmax_t)page_max, split_pct, (uintmax_t)extra,
slots, take, offsets[take], (uintmax_t)*result_lenp);
#endif
- return (0);
+ return (take == 0 ? EAGAIN : 0);
}
-#endif
static int
bzip2_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
diff --git a/ext/compressors/zlib/Makefile.am b/ext/compressors/zlib/Makefile.am
new file mode 100644
index 00000000000..373277c92c2
--- /dev/null
+++ b/ext/compressors/zlib/Makefile.am
@@ -0,0 +1,6 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+lib_LTLIBRARIES = libwiredtiger_zlib.la
+libwiredtiger_zlib_la_SOURCES = zlib_compress.c
+libwiredtiger_zlib_la_LDFLAGS = -avoid-version -module
+libwiredtiger_zlib_la_LIBADD = -lz
diff --git a/ext/compressors/zlib/zlib_compress.c b/ext/compressors/zlib/zlib_compress.c
new file mode 100644
index 00000000000..a48037c8526
--- /dev/null
+++ b/ext/compressors/zlib/zlib_compress.c
@@ -0,0 +1,367 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <zlib.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+/* Local compressor structure. */
+typedef struct {
+ WT_COMPRESSOR compressor; /* Must come first */
+
+ WT_EXTENSION_API *wt_api; /* Extension API */
+
+ int zlib_level; /* Configuration */
+} ZLIB_COMPRESSOR;
+
+/*
+ * zlib gives us a cookie to pass to the underlying allocation functions; we
+ * need two handles, package them up.
+ */
+typedef struct {
+ WT_COMPRESSOR *compressor;
+ WT_SESSION *session;
+} ZLIB_OPAQUE;
+
+/*
+ * zlib_error --
+ * Output an error message, and return a standard error code.
+ */
+static int
+zlib_error(
+ WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int zret)
+{
+ WT_EXTENSION_API *wt_api;
+
+ wt_api = ((ZLIB_COMPRESSOR *)compressor)->wt_api;
+
+ (void)wt_api->err_printf(wt_api, session,
+ "zlib error: %s: %s: %d", call, zError(zret), zret);
+ return (WT_ERROR);
+}
+
+static void *
+zalloc(void *cookie, u_int number, u_int size)
+{
+ ZLIB_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api;
+ return (wt_api->scr_alloc(
+ wt_api, opaque->session, (size_t)(number * size)));
+}
+
+static void
+zfree(void *cookie, void *p)
+{
+ ZLIB_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api;
+ wt_api->scr_free(wt_api, opaque->session, p);
+}
+
+static int
+zlib_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp, int *compression_failed)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+ ZLIB_OPAQUE opaque;
+ z_stream zs;
+ int ret;
+
+ zlib_compressor = (ZLIB_COMPRESSOR *)compressor;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = deflateInit(&zs, zlib_compressor->zlib_level)) != Z_OK)
+ return (zlib_error(
+ compressor, session, "deflateInit", ret));
+
+ zs.next_in = src;
+ zs.avail_in = (uint32_t)src_len;
+ zs.next_out = dst;
+ zs.avail_out = (uint32_t)dst_len - 1;
+ while ((ret = deflate(&zs, Z_FINISH)) == Z_OK)
+ ;
+ if (ret == Z_STREAM_END) {
+ *compression_failed = 0;
+ *result_lenp = zs.total_out;
+ } else
+ *compression_failed = 1;
+
+ if ((ret = deflateEnd(&zs)) != Z_OK)
+ return (
+ zlib_error(compressor, session, "deflateEnd", ret));
+
+ return (0);
+}
+
+/*
+ * zlib_find_slot --
+ * Find the slot containing the target offset (binary search).
+ */
+static inline uint32_t
+zlib_find_slot(uint32_t target, uint32_t *offsets, uint32_t slots)
+{
+ uint32_t base, indx, limit;
+
+ indx = 1;
+
+ /* Figure out which slot we got to: binary search */
+ if (target >= offsets[slots])
+ indx = slots;
+ else if (target > offsets[1])
+ for (base = 2, limit = slots - base; limit != 0; limit >>= 1) {
+ indx = base + (limit >> 1);
+ if (target < offsets[indx])
+ continue;
+ base = indx + 1;
+ --limit;
+ }
+
+ return (indx);
+}
+
+/*
+ * zlib_compress_raw --
+ * Pack records into a specified on-disk page size.
+ */
+static int
+zlib_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ size_t page_max, int split_pct, size_t extra,
+ uint8_t *src, uint32_t *offsets, uint32_t slots,
+ uint8_t *dst, size_t dst_len, int final,
+ size_t *result_lenp, uint32_t *result_slotsp)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+ ZLIB_OPAQUE opaque;
+ z_stream last_zs, zs;
+ uint32_t curr_slot, last_slot;
+ int ret;
+
+ curr_slot = last_slot = 0;
+ (void)split_pct;
+ (void)dst_len;
+ (void)final;
+
+ zlib_compressor = (ZLIB_COMPRESSOR *)compressor;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = deflateInit(&zs,
+ zlib_compressor->zlib_level)) != Z_OK)
+ return (zlib_error(
+ compressor, session, "deflateInit", ret));
+
+ zs.next_in = src;
+ zs.next_out = dst;
+ /*
+ * Experimentally derived, reserve this many bytes for zlib to finish
+ * up a buffer. If this isn't sufficient, we don't fail but we will be
+ * inefficient.
+ */
+#define WT_ZLIB_RESERVED 12
+ zs.avail_out = (uint32_t)(page_max - extra - WT_ZLIB_RESERVED);
+ last_zs = zs;
+
+ /*
+ * Strategy: take the available output size and compress that much
+ * input. Continue until there is no input small enough or the
+ * compression fails to fit.
+ */
+ while (zs.avail_out > 0) {
+ /* Find the slot we will try to compress up to. */
+ if ((curr_slot = zlib_find_slot(
+ zs.total_in + zs.avail_out, offsets, slots)) <= last_slot)
+ break;
+
+ zs.avail_in = offsets[curr_slot] - offsets[last_slot];
+ /* Save the stream state in case the chosen data doesn't fit. */
+ last_zs = zs;
+
+ while (zs.avail_in > 0 && zs.avail_out > 0)
+ if ((ret = deflate(&zs, Z_SYNC_FLUSH)) != Z_OK)
+ return (zlib_error(
+ compressor, session, "deflate", ret));
+
+ /* Roll back the if the last deflate didn't complete. */
+ if (zs.avail_in > 0) {
+ zs = last_zs;
+ break;
+ } else
+ last_slot = curr_slot;
+ }
+
+ zs.avail_out += WT_ZLIB_RESERVED;
+ while ((ret = deflate(&zs, Z_FINISH)) == Z_OK)
+ ;
+ /*
+ * If the end marker didn't fit, report that we got no work done. WT
+ * will compress the (possibly large) page image using ordinary
+ * compression instead.
+ */
+ if (ret == Z_BUF_ERROR)
+ last_slot = 0;
+ else if (ret != Z_STREAM_END)
+ return (
+ zlib_error(compressor, session, "deflate end block", ret));
+
+ if ((ret = deflateEnd(&zs)) != Z_OK && ret != Z_DATA_ERROR)
+ return (
+ zlib_error(compressor, session, "deflateEnd", ret));
+
+ if (last_slot > 0) {
+ *result_slotsp = last_slot;
+ *result_lenp = zs.total_out;
+ } else {
+ /* We didn't manage to compress anything: don't retry. */
+ *result_slotsp = 0;
+ *result_lenp = 1;
+ }
+
+#if 0
+ fprintf(stderr,
+ "zlib_compress_raw (%s): page_max %" PRIuMAX ", slots %" PRIu32
+ ", take %" PRIu32 ": %" PRIu32 " -> %" PRIuMAX "\n",
+ final ? "final" : "not final", (uintmax_t)page_max,
+ slots, last_slot, offsets[last_slot], (uintmax_t)*result_lenp);
+#endif
+ return (0);
+}
+
+static int
+zlib_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp)
+{
+ ZLIB_OPAQUE opaque;
+ z_stream zs;
+ int ret, tret;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = inflateInit(&zs)) != Z_OK)
+ return (zlib_error(
+ compressor, session, "inflateInit", ret));
+
+ zs.next_in = src;
+ zs.avail_in = (uint32_t)src_len;
+ zs.next_out = dst;
+ zs.avail_out = (uint32_t)dst_len;
+ while ((ret = inflate(&zs, Z_FINISH)) == Z_OK)
+ ;
+ if (ret == Z_STREAM_END) {
+ *result_lenp = zs.total_out;
+ ret = Z_OK;
+ }
+
+ if ((tret = inflateEnd(&zs)) != Z_OK && ret == Z_OK)
+ ret = tret;
+
+ return (ret == Z_OK ?
+ 0 : zlib_error(compressor, session, "inflate", ret));
+}
+
+static int
+zlib_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
+{
+ (void)session; /* Unused parameters */
+
+ free(compressor);
+ return (0);
+}
+
+static int
+zlib_add_compressor(WT_CONNECTION *connection, int raw, const char *name)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+
+ /*
+ * There are two almost identical zlib compressors: one supporting raw
+ * compression, and one without.
+ */
+ if ((zlib_compressor = calloc(1, sizeof(ZLIB_COMPRESSOR))) == NULL)
+ return (errno);
+
+ zlib_compressor->compressor.compress = zlib_compress;
+ zlib_compressor->compressor.compress_raw = raw ?
+ zlib_compress_raw : NULL;
+ zlib_compressor->compressor.decompress = zlib_decompress;
+ zlib_compressor->compressor.pre_size = NULL;
+ zlib_compressor->compressor.terminate = zlib_terminate;
+
+ zlib_compressor->wt_api = connection->get_extension_api(connection);
+
+ /*
+ * between 0-10: level: see zlib manual.
+ */
+ zlib_compressor->zlib_level = Z_DEFAULT_COMPRESSION;
+
+ /* Load the standard compressor. */
+ return (connection->add_compressor(
+ connection, name, &zlib_compressor->compressor, NULL));
+}
+
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ int ret;
+
+ (void)config; /* Unused parameters */
+
+ if ((ret = zlib_add_compressor(connection, 1, "zlib")) != 0)
+ return (ret);
+ if ((ret = zlib_add_compressor(connection, 0, "zlib-noraw")) != 0)
+ return (ret);
+ return (0);
+}
diff --git a/ext/datasources/helium/Makefile.am b/ext/datasources/helium/Makefile.am
new file mode 100644
index 00000000000..b4e6e67e2cd
--- /dev/null
+++ b/ext/datasources/helium/Makefile.am
@@ -0,0 +1,11 @@
+AM_CPPFLAGS = -I$(top_builddir) \
+ -I$(top_srcdir)/src/include -I$(HELIUM_PATH)
+
+noinst_LTLIBRARIES = libwiredtiger_helium.la
+libwiredtiger_helium_la_SOURCES = helium.c
+libwiredtiger_helium_la_LIBADD = -L$(HELIUM_PATH) -lhe
+
+# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well
+# as installation, it will only build static libraries. As far as I can tell,
+# the "approved" libtool way to turn them back on is by adding -rpath.
+libwiredtiger_helium_la_LDFLAGS = -avoid-version -module -rpath /nowhere
diff --git a/ext/test/memrata/README b/ext/datasources/helium/README
index ee338474566..e78ba58c71d 100644
--- a/ext/test/memrata/README
+++ b/ext/datasources/helium/README
@@ -1,15 +1,15 @@
-Memrata README.
+Helium README.
-The data structures are "KVS sources" which map to one or more physical
-devices; each KVS source supports any number of "WiredTiger sources",
-where a WiredTiger source will be an object similar to a Btree "file:"
-object. Each WiredTiger source supports any number of WiredTiger cursors.
+The data structures are "Helium sources" which map to one or more physical
+volumes; each Helium source supports any number of "WiredTiger sources",
+where a WiredTiger source is an object similar to a Btree "file:" object.
+Each WiredTiger source supports any number of WiredTiger cursors.
-Each KVS source is given a logical name when the Memrata device is loaded,
-and that logical name is subsequently used when a WiredTiger source is
-created. For example, a KVS source might be named "dev1", and correspond
-to /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create calls would specify
-a URI like "table:dev1/my_table".
+Each Helium source is given a logical name when first referenced, and that
+logical name is subsequently used when a WiredTiger source is created. For
+example, the logical name for a Helium source might be "dev1", and it would
+map to the Helium volumes /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create
+calls specify a URI like "table:dev1/my_table".
For each WiredTiger source, we create two namespaces on the underlying device,
a "cache" and a "primary".
@@ -51,23 +51,23 @@ When a next/prev is done:
move to the next/prev visible item in the primary
return the one closest to the starting position
-Note locks are not acquired for read operations, and no flushes are done for
-any of these operations.
+Locks are not acquired for read operations, and no flushes are done for any of
+these operations.
-We also create one additional namespace, the "txn" name space, which serves
-all of the WiredTiger and KVS sources. Whenever a transaction commits, we
-insert a commit record into the txn name space and flush the device. When a
-transaction rolls back, we insert an abort record into the txn name space,
-but don't flush the device.
+We also create one additional object, the transaction name space, which serves
+all of the WiredTiger and Helium objects in a WiredTiger connection. Whenever
+a transaction involving a Helium source commits, we insert a commit record into
+the transaction name space and flush the device. When a transaction rolls back,
+we insert an abort record into the txn name space, but don't flush the device.
The visibility check is slightly different than the rest of WiredTiger: we do
not reset anything when a transaction aborts, and so we have to check if the
transaction has been aborted as well as check the transaction ID for visibility.
-We create a "cleanup" thread for every KVS source. The job of this thread is
-to migrate rows from the cache into the primary. Any committed, globally
-visible change in the cache can be copied into the primary and removed from
-the cache:
+We create a "cleanup" thread for every underlying Helium source. The job of
+this thread is to migrate rows from the cache object into the primary. Any
+committed, globally visible change in the cache can be copied into the primary
+and removed from the cache:
set BaseTxnID to the oldest transaction ID
not yet visible to a running transaction
@@ -95,24 +95,26 @@ to the primary.
No lock is required when removing rows from the transaction store, once the
transaction ID is less than the BaseTxnID, it will never be read.
-Memrata recovery is almost identical to the cleanup thread, which migrates
-rows from the cache into the primary. For every cache/primary name space,
-we migrate every commit to the primary (by definition, at recovery time it
-must be globally visible), and discard everything (by defintion, at recovery
-time anything not committed has been aborted.
+Helium recovery is almost identical to the cleanup thread, which migrates rows
+from the cache into the primary. For every cache/primary pair, migrate every
+commit to the primary (by definition, at recovery time it must be globally
+visible), and discard everything else (by definition, at recovery time anything
+not committed has been aborted.
=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
Questions, problems, whatever:
-* This implementation is endian-specific, that is, a store created on a
-little-endian machine is not portable to a big-endian machine.
+* The implementation is endian-specific, that is, the WiredTiger metadata
+stored on the Helium device is on not portable to a big-endian machine.
+Helium's metadata is portable between different endian machines, so this
+should probably be fixed.
* There's a problem with transactions in WiredTiger that span more than a
single data source. For example, consider a transaction that modifies
-both a KVS object and a Btree object. If we commit and push the KVS
+both a Helium object and a Btree object. If we commit and push the Helium
commit record to stable storage, and then crash before committing the Btree
change, the enclosing WiredTiger transaction will/should end up aborting,
-and there's no way for us to back out the change in KVS. I'm leaving
+and there's no way for us to back out the change in Helium. I'm leaving
this problem alone until WiredTiger fine-grained durability is complete,
we're going to need WiredTiger support for some kind of 2PC to solve this.
diff --git a/ext/test/memrata/memrata.c b/ext/datasources/helium/helium.c
index 442c078b5a2..1239c88befa 100644
--- a/ext/test/memrata/memrata.c
+++ b/ext/datasources/helium/helium.c
@@ -24,6 +24,7 @@
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
+#include <sys/select.h>
#include <ctype.h>
#include <errno.h>
@@ -33,56 +34,80 @@
#include <stdlib.h>
#include <string.h>
-#include <kvs.h>
+#include <he.h>
+
#include <wiredtiger.h>
#include <wiredtiger_ext.h>
+typedef struct he_env HE_ENV;
+typedef struct he_item HE_ITEM;
+typedef struct he_stats HE_STATS;
+
+static int verbose = 0; /* Verbose messages */
+
/*
- * Macros to output an error message and set or return an error.
- * Requires local variables:
- * int ret;
+ * Macros to output error and verbose messages, and set or return an error.
+ * Error macros require local "ret" variable.
*
* ESET: update an error value, handling more/less important errors.
- * ERET: output a message and return the error.
- * EMSG, EMSG_ERR:
- * output a message and set the local error value, optionally jump to the
- * err label.
+ * ERET: output a message, return the error.
+ * EMSG: output a message, set the local error value.
+ * EMSG_ERR:
+ * output a message, set the local error value, jump to the err label.
+ * VMSG: verbose message.
*/
#undef ESET
#define ESET(a) do { \
- int __ret; \
- if ((__ret = (a)) != 0 && \
- (__ret == WT_PANIC || \
- ret == 0 || ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND)) \
- ret = __ret; \
+ int __v; \
+ if ((__v = (a)) != 0) { \
+ /* \
+ * On error, check for a panic (it overrides all other \
+ * returns). Else, if there's no return value or the \
+ * return value is not strictly an error, override it \
+ * with the error. \
+ */ \
+ if (__v == WT_PANIC || \
+ ret == 0 || \
+ ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND) \
+ ret = __v; \
+ /* \
+ * If we're set to a Helium error at the end of the day,\
+ * switch to a generic WiredTiger error. \
+ */ \
+ if (ret < 0 && ret > -31,800) \
+ ret = WT_ERROR; \
+ } \
} while (0)
#undef ERET
#define ERET(wtext, session, v, ...) do { \
(void) \
- wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \
- return (v); \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
+ ESET(v); \
+ return (ret); \
} while (0)
#undef EMSG
#define EMSG(wtext, session, v, ...) do { \
(void) \
- wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
ESET(v); \
} while (0)
#undef EMSG_ERR
#define EMSG_ERR(wtext, session, v, ...) do { \
(void) \
- wtext->err_printf(wtext, session, "memrata: " __VA_ARGS__); \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
ESET(v); \
goto err; \
} while (0)
-
-/*
- * STRING_MATCH --
- * Return if a string matches a bytestring of a specified length.
- */
-#undef STRING_MATCH
-#define STRING_MATCH(str, bytes, len) \
- (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0')
+#undef VERBOSE_L1
+#define VERBOSE_L1 1
+#undef VERBOSE_L2
+#define VERBOSE_L2 2
+#undef VMSG
+#define VMSG(wtext, session, v, ...) do { \
+ if (verbose >= v) \
+ (void)wtext-> \
+ msg_printf(wtext, session, "helium: " __VA_ARGS__); \
+} while (0)
/*
* OVERWRITE_AND_FREE --
@@ -95,20 +120,29 @@
} while (0)
/*
- * Version each file, out of sheer raging paranoia.
+ * Version each object, out of sheer raging paranoia.
*/
-#define KVS_MAJOR 1 /* KVS major, minor version */
-#define KVS_MINOR 0
+#define WIREDTIGER_HELIUM_MAJOR 1 /* Major, minor version */
+#define WIREDTIGER_HELIUM_MINOR 0
/*
- * WiredTiger name space on the memrata device: all primary store objects are
- * named "WiredTiger.XXX", the cache store object is "WiredTiger.XXX.cache",
- * and the per-device transaction file is "WiredTiger.txn".
+ * WiredTiger name space on the Helium store: all objects are named with the
+ * WiredTiger prefix (we don't require the Helium store be exclusive to our
+ * files). Primary objects are named "WiredTiger.[name]", associated cache
+ * objects are "WiredTiger.[name].cache". The per-connection transaction
+ * object is "WiredTiger.WiredTigerTxn". When we first open a Helium volume,
+ * we open/close a file in order to apply flags for the first open of the
+ * volume, that's "WiredTiger.WiredTigerInit".
*/
#define WT_NAME_PREFIX "WiredTiger."
-#define WT_NAME_TXN "WiredTiger.txn"
+#define WT_NAME_INIT "WiredTiger.WiredTigerInit"
+#define WT_NAME_TXN "WiredTiger.WiredTigerTxn"
#define WT_NAME_CACHE ".cache"
+/*
+ * WT_SOURCE --
+ * A WiredTiger source, supporting one or more cursors.
+ */
typedef struct __wt_source {
char *uri; /* Unique name */
@@ -120,68 +154,79 @@ typedef struct __wt_source {
uint64_t append_recno; /* Allocation record number */
- int config_recno; /* config "key_format=r" */
int config_bitfield; /* config "value_format=#t" */
+ int config_compress; /* config "helium_o_compress" */
+ int config_recno; /* config "key_format=r" */
/*
- * Each WiredTiger object has a "primary" namespace in a KVS store plus
- * a "cache" namespace, which has not-yet-resolved updates. There's a
- * dirty flag so we can ignore the cache until it's used.
+ * Each WiredTiger object has a "primary" namespace in a Helium store
+ * plus a "cache" namespace, which has not-yet-resolved updates. There
+ * is a dirty flag so read-only data sets can ignore the cache.
*/
- kvs_t kvs; /* Underlying KVS object */
- kvs_t kvscache; /* Underlying KVS cache */
- int kvscache_inuse;
+ he_t he; /* Underlying Helium object */
+ he_t he_cache; /* Underlying Helium cache */
+ int he_cache_inuse;
- uint64_t cleaner_bytes; /* Bytes since clean */
- uint64_t cleaner_ops; /* Operations since clean */
-
- struct __kvs_source *ks; /* Underlying KVS source */
+ struct __he_source *hs; /* Underlying Helium source */
struct __wt_source *next; /* List of WiredTiger objects */
} WT_SOURCE;
-typedef struct __kvs_source {
+/*
+ * HELIUM_SOURCE --
+ * A Helium volume, supporting one or more WT_SOURCE objects.
+ */
+typedef struct __he_source {
/*
* XXX
* The transaction commit handler must appear first in the structure.
*/
WT_TXN_NOTIFY txn_notify; /* Transaction commit handler */
- char *name; /* Unique name */
+ WT_EXTENSION_API *wtext; /* Extension functions */
+
+ char *name; /* Unique WiredTiger name */
+ char *device; /* Unique Helium volume name */
- kvs_t kvs_device; /* Underlying KVS store */
+ /*
+ * Maintain a handle for each underlying Helium source so checkpoint is
+ * faster, we can "commit" a single handle per source, regardless of the
+ * number of objects.
+ */
+ he_t he_volume;
struct __wt_source *ws_head; /* List of WiredTiger sources */
/*
- * Each KVS source has a cleaner thread to migrate WiredTiger source
+ * Each Helium source has a cleaner thread to migrate WiredTiger source
* updates from the cache namespace to the primary namespace, based on
- * the number of bytes or the number of operations. We read these
- * fields without a lock, but serialize writes to minimize races (and
- * because it costs us nothing).
- *
- * There's a cleaner thread per KVS store because migration operations
- * can overlap.
+ * the number of bytes or the number of operations. (There's a cleaner
+ * thread per Helium store so migration operations can overlap.) We
+ * read these fields without a lock, but serialize writes to minimize
+ * races (and because it costs us nothing).
*/
- WT_EXTENSION_API *wtext; /* Extension functions */
pthread_t cleaner_id; /* Cleaner thread ID */
volatile int cleaner_stop; /* Cleaner thread quit flag */
/*
* Each WiredTiger connection has a transaction namespace which lists
* resolved transactions with their committed or aborted state as a
- * value. We create that namespace in the first KVS store created,
- * and then simply reference it from other, subsequently created KVS
- * stores.
+ * value. That namespace appears in a single Helium store (the first
+ * one created, if it doesn't already exist), and then it's referenced
+ * from other Helium stores.
*/
#define TXN_ABORTED 'A'
#define TXN_COMMITTED 'C'
#define TXN_UNRESOLVED 0
- kvs_t kvstxn; /* Underlying KVS txn store */
- int kvsowner; /* Owns transaction store */
+ he_t he_txn; /* Helium txn store */
+ int he_owner; /* Owns transaction store */
- struct __kvs_source *next; /* List of KVS sources */
-} KVS_SOURCE;
+ struct __he_source *next; /* List of Helium sources */
+} HELIUM_SOURCE;
+/*
+ * DATA_SOURCE --
+ * A WiredTiger data source, supporting one or more HELIUM_SOURCE objects.
+ */
typedef struct __data_source {
WT_DATA_SOURCE wtds; /* Must come first */
@@ -190,10 +235,13 @@ typedef struct __data_source {
pthread_rwlock_t global_lock; /* Global lock */
int lockinit; /* Lock created */
- KVS_SOURCE *kvs_head; /* List of KVS sources */
+ struct __he_source *hs_head; /* List of Helium sources */
} DATA_SOURCE;
/*
+ * CACHE_RECORD --
+ * An array of updates from the cache object.
+ *
* Values in the cache store are marshalled/unmarshalled to/from the store,
* using a simple encoding:
* {N records: 4B}
@@ -203,7 +251,7 @@ typedef struct __data_source {
* {record#1 data}
* ...
*
- * Each KVS cursor potentially has a single set of these values.
+ * Each cursor potentially has a single set of these values.
*/
typedef struct __cache_record {
uint8_t *v; /* Value */
@@ -213,15 +261,19 @@ typedef struct __cache_record {
int remove; /* 1/0 remove flag */
} CACHE_RECORD;
+/*
+ * CURSOR --
+ * A cursor, supporting a single WiredTiger cursor.
+ */
typedef struct __cursor {
WT_CURSOR wtcursor; /* Must come first */
WT_EXTENSION_API *wtext; /* Extension functions */
- WT_SOURCE *ws; /* WiredTiger source */
+ WT_SOURCE *ws; /* Underlying source */
- struct kvs_record record; /* Record */
- uint8_t __key[KVS_MAX_KEY_LEN]; /* Record.key, Record.value */
+ HE_ITEM record; /* Record */
+ uint8_t __key[HE_MAX_KEY_LEN]; /* Record.key, Record.value */
uint8_t *v;
size_t len;
size_t mem_len;
@@ -241,6 +293,26 @@ typedef struct __cursor {
} CURSOR;
/*
+ * prefix_match --
+ * Return if a string matches a prefix.
+ */
+static inline int
+prefix_match(const char *str, const char *pfx)
+{
+ return (strncmp(str, pfx, strlen(pfx)) == 0);
+}
+
+/*
+ * string_match --
+ * Return if a string matches a byte string of len bytes.
+ */
+static inline int
+string_match(const char *str, const char *bytes, size_t len)
+{
+ return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0');
+}
+
+/*
* cursor_destroy --
* Free a cursor's memory, and optionally the cursor itself.
*/
@@ -259,7 +331,7 @@ cursor_destroy(CURSOR *cursor)
/*
* os_errno --
- * Limit our use of errno so it's easy to remove.
+ * Limit our use of errno so it's easy to find/remove.
*/
static int
os_errno(void)
@@ -272,8 +344,7 @@ os_errno(void)
* Initialize a lock.
*/
static int
-lock_init(
- WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+lock_init(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
{
int ret = 0;
@@ -304,8 +375,7 @@ lock_destroy(
* Acquire a write lock.
*/
static inline int
-writelock(
- WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+writelock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
{
int ret = 0;
@@ -331,71 +401,90 @@ unlock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
}
#if 0
-static int
-kvs_dump_print(uint8_t *p, size_t len, FILE *fp)
+static void
+helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp)
{
+ (void)fprintf(stderr, "%s %3zu: ", pfx, len);
for (; len > 0; --len, ++p)
if (!isspace(*p) && isprint(*p))
- putc(*p, fp);
+ (void)putc(*p, fp);
else if (len == 1 && *p == '\0') /* Skip string nuls. */
continue;
else
- fprintf(fp, "%#x", *p);
+ (void)fprintf(fp, "%#x", *p);
+ (void)putc('\n', fp);
}
/*
- * kvs_dump --
- * Dump the records in a KVS store.
+ * he_dump --
+ * Dump the records in a Helium store.
*/
static int
-kvs_dump(kvs_t kvs, const char *tag)
+helium_dump(WT_EXTENSION_API *wtext, he_t he, const char *tag)
{
- FILE *fp;
- struct kvs_record *r, _r;
- size_t maxbuf = 4 * 1024;
+ HE_ITEM *r, _r;
+ uint8_t k[4 * 1024], v[4 * 1024];
int ret = 0;
r = &_r;
memset(r, 0, sizeof(*r));
- r->key = malloc(maxbuf);
- r->key_len = 0;
- r->val = malloc(maxbuf);
- r->val_len = maxbuf;
-
- (void)snprintf(r->val, maxbuf, "dump.%s", tag);
- fp = fopen(r->val, "w");
- fprintf(fp, "== %s\n", tag);
-
- while ((ret = kvs_next(kvs, r, 0UL, (unsigned long)maxbuf)) == 0) {
- kvs_dump_print(r->key, r->key_len, fp);
- putc('\t', fp);
- kvs_dump_print(r->val, r->val_len, fp);
- putc('\n', fp);
+ r->key = k;
+ r->val = v;
- r->val_len = maxbuf;
+ (void)fprintf(stderr, "== %s\n", tag);
+ while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) {
+#if 0
+ uint64_t recno;
+ if ((ret = wtext->struct_unpack(wtext,
+ NULL, r->key, r->key_len, "r", &recno)) != 0)
+ return (ret);
+ fprintf(stderr, "K: %" PRIu64, recno);
+#else
+ helium_dump_kv("K: ", r->key, r->key_len, stderr);
+#endif
+ helium_dump_kv("V: ", r->val, r->val_len, stderr);
}
- if (ret == KVS_E_KEY_NOT_FOUND)
- ret = 0;
- fprintf(fp, "========================== (%d)\n", ret);
- fclose(fp);
+ if (ret != HE_ERR_ITEM_NOT_FOUND) {
+ fprintf(stderr, "he_next: %s\n", he_strerror(ret));
+ ret = WT_ERROR;
+ }
+ return (ret);
+}
- free(r->key);
- free(r->val);
+/*
+ * helium_stats --
+ * Display Helium statistics for a datastore.
+ */
+static int
+helium_stats(
+ WT_EXTENSION_API *wtext, WT_SESSION *session, he_t he, const char *tag)
+{
+ HE_STATS stats;
+ int ret = 0;
- return (ret);
+ if ((ret = he_stats(he, &stats)) != 0)
+ ERET(wtext, session, ret, "he_stats: %s", he_strerror(ret));
+ fprintf(stderr, "== %s\n", tag);
+ fprintf(stderr, "name=%s\n", stats.name);
+ fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items);
+ fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items);
+ fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items);
+ fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity);
+ fprintf(stderr, "size=%" PRIu64 "B\n", stats.size);
+ return (0);
}
#endif
/*
- * kvs_call --
- * Call a KVS key retrieval function, handling overflow.
+ * helium_call --
+ * Call a Helium key retrieval function, handling overflow.
*/
static inline int
-kvs_call(WT_CURSOR *wtcursor, const char *fname, kvs_t kvs,
- int (*f)(kvs_t, struct kvs_record *, unsigned long, unsigned long))
+helium_call(WT_CURSOR *wtcursor, const char *fname,
+ he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t))
{
- struct kvs_record *r;
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
int ret = 0;
@@ -409,18 +498,17 @@ kvs_call(WT_CURSOR *wtcursor, const char *fname, kvs_t kvs,
r->val = cursor->v;
restart:
- if ((ret = f(kvs, r, 0UL, (unsigned long)cursor->mem_len)) != 0) {
- if (ret == KVS_E_KEY_NOT_FOUND)
+ if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) {
+ if (ret == HE_ERR_ITEM_NOT_FOUND)
return (WT_NOTFOUND);
- ERET(wtext,
- session, WT_ERROR, "%s: %s", fname, kvs_strerror(ret));
+ ERET(wtext, session, ret, "%s: %s", fname, he_strerror(ret));
}
/*
* If the returned length is larger than our passed-in length, we didn't
- * get the complete value. Grow the buffer and use kvs_get to complete
- * the retrieval (kvs_get because the call succeeded and the key was
- * copied out, so calling kvs_next/kvs_prev again would skip key/value
+ * get the complete value. Grow the buffer and use he_lookup to do the
+ * retrieval (he_lookup because the call succeeded and the key was
+ * copied out, so calling he_next/he_prev again would skip key/value
* pairs).
*
* We have to loop, another thread of control might change the length of
@@ -441,12 +529,11 @@ restart:
cursor->v = r->val = p;
cursor->mem_len = r->val_len + 32;
- if ((ret = kvs_get(
- kvs, r, 0UL, (unsigned long)cursor->mem_len)) != 0) {
- if (ret == KVS_E_KEY_NOT_FOUND)
+ if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) {
+ if (ret == HE_ERR_ITEM_NOT_FOUND)
goto restart;
- ERET(wtext, session,
- WT_ERROR, "kvs_get: %s", kvs_strerror(ret));
+ ERET(wtext,
+ session, ret, "he_lookup: %s", he_strerror(ret));
}
}
/* NOTREACHED */
@@ -458,47 +545,46 @@ restart:
*/
static int
txn_state_set(WT_EXTENSION_API *wtext,
- WT_SESSION *session, KVS_SOURCE *ks, uint64_t txnid, int commit)
+ WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit)
{
- struct kvs_record txn;
+ HE_ITEM txn;
uint8_t val;
int ret = 0;
- /* Update the store -- commits must be durable, flush the device. */
- memset(&txn, 0, sizeof(txn));
- txn.key = &txnid;
- txn.key_len = sizeof(txnid);
-
/*
+ * Update the store -- commits must be durable, flush the volume.
+ *
+ * XXX
* Not endian-portable, we're writing a native transaction ID to the
* store.
*/
+ memset(&txn, 0, sizeof(txn));
+ txn.key = &txnid;
+ txn.key_len = sizeof(txnid);
val = commit ? TXN_COMMITTED : TXN_ABORTED;
txn.val = &val;
- txn.val_len = 1;
+ txn.val_len = sizeof(val);
- if ((ret = kvs_set(ks->kvstxn, &txn)) != 0)
- ERET(wtext, session,
- WT_ERROR, "kvs_set: %s", kvs_strerror(ret));
+ if ((ret = he_update(hs->he_txn, &txn)) != 0)
+ ERET(wtext, session, ret, "he_update: %s", he_strerror(ret));
- if (commit && (ret = kvs_commit(ks->kvs_device)) != 0)
- ERET(wtext, session,
- WT_ERROR, "kvs_commit: %s", kvs_strerror(ret));
+ if (commit && (ret = he_commit(hs->he_txn)) != 0)
+ ERET(wtext, session, ret, "he_commit: %s", he_strerror(ret));
return (0);
}
/*
* txn_notify --
- * Resolve a transaction.
+ * Resolve a transaction; called from WiredTiger during commit/abort.
*/
static int
txn_notify(WT_TXN_NOTIFY *handler,
WT_SESSION *session, uint64_t txnid, int committed)
{
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
- ks = (KVS_SOURCE *)handler;
- return (txn_state_set(ks->wtext, session, ks, txnid, committed));
+ hs = (HELIUM_SOURCE *)handler;
+ return (txn_state_set(hs->wtext, session, hs, txnid, committed));
}
/*
@@ -508,13 +594,13 @@ txn_notify(WT_TXN_NOTIFY *handler,
static int
txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
{
- struct kvs_record txn;
CURSOR *cursor;
- KVS_SOURCE *ks;
+ HE_ITEM txn;
+ HELIUM_SOURCE *hs;
uint8_t val_buf[16];
cursor = (CURSOR *)wtcursor;
- ks = cursor->ws->ks;
+ hs = cursor->ws->hs;
memset(&txn, 0, sizeof(txn));
txn.key = &txnid;
@@ -522,7 +608,7 @@ txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
txn.val = val_buf;
txn.val_len = sizeof(val_buf);
- if (kvs_get(ks->kvstxn, &txn, 0UL, (unsigned long)sizeof(val_buf)) == 0)
+ if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0)
return (val_buf[0]);
return (TXN_UNRESOLVED);
}
@@ -534,8 +620,8 @@ txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
static int
cache_value_append(WT_CURSOR *wtcursor, int remove_op)
{
- struct kvs_record *r;
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
uint64_t txnid;
@@ -587,6 +673,7 @@ cache_value_append(WT_CURSOR *wtcursor, int remove_op)
* Copy the WiredTiger cursor's data into place: txn ID, remove
* tombstone, data length, data.
*
+ * XXX
* Not endian-portable, we're writing a native transaction ID to the
* store.
*/
@@ -604,7 +691,7 @@ cache_value_append(WT_CURSOR *wtcursor, int remove_op)
}
cursor->len = (size_t)(p - cursor->v);
- /* Update the underlying KVS record. */
+ /* Update the underlying Helium record. */
r->val = cursor->v;
r->val_len = cursor->len;
@@ -764,10 +851,8 @@ cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest)
{
CACHE_RECORD *cp;
CURSOR *cursor;
- WT_SESSION *session;
u_int i;
- session = wtcursor->session;
cursor = (CURSOR *)wtcursor;
/*
@@ -882,21 +967,23 @@ cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp)
static int
key_max_err(WT_EXTENSION_API *wtext, WT_SESSION *session, size_t len)
{
+ int ret = 0;
+
ERET(wtext, session, EINVAL,
- "key length (%" PRIuMAX " bytes) larger than the maximum Memrata "
+ "key length (%zu bytes) larger than the maximum Helium "
"key length of %d bytes",
- (uintmax_t)len, KVS_MAX_KEY_LEN);
+ len, HE_MAX_KEY_LEN);
}
/*
* copyin_key --
- * Copy a WT_CURSOR key to a struct kvs_record key.
+ * Copy a WT_CURSOR key to a HE_ITEM key.
*/
static inline int
copyin_key(WT_CURSOR *wtcursor, int allocate_key)
{
- struct kvs_record *r;
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
@@ -944,14 +1031,14 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
if ((ret = wtext->struct_size(wtext, session,
&size, "r", wtcursor->recno)) != 0 ||
(ret = wtext->struct_pack(wtext, session,
- r->key, KVS_MAX_KEY_LEN, "r", wtcursor->recno)) != 0)
+ r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno)) != 0)
return (ret);
r->key_len = size;
} else {
/* I'm not sure this test is necessary, but it's cheap. */
- if (wtcursor->key.size > KVS_MAX_KEY_LEN)
- return (key_max_err(
- wtext, session, (size_t)wtcursor->key.size));
+ if (wtcursor->key.size > HE_MAX_KEY_LEN)
+ return (
+ key_max_err(wtext, session, wtcursor->key.size));
/*
* A set cursor key might reference application memory, which
@@ -969,13 +1056,13 @@ copyin_key(WT_CURSOR *wtcursor, int allocate_key)
/*
* copyout_key --
- * Copy a struct kvs_record key to a WT_CURSOR key.
+ * Copy a HE_ITEM key to a WT_CURSOR key.
*/
static inline int
copyout_key(WT_CURSOR *wtcursor)
{
- struct kvs_record *r;
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
@@ -993,14 +1080,14 @@ copyout_key(WT_CURSOR *wtcursor)
return (ret);
} else {
wtcursor->key.data = r->key;
- wtcursor->key.size = (uint32_t)r->key_len;
+ wtcursor->key.size = (size_t)r->key_len;
}
return (0);
}
/*
* copyout_val --
- * Copy a kvs store's struct kvs_record value to a WT_CURSOR value.
+ * Copy a Helium store's HE_ITEM value to a WT_CURSOR value.
*/
static inline int
copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
@@ -1011,7 +1098,7 @@ copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
if (cp == NULL) {
wtcursor->value.data = cursor->v;
- wtcursor->value.size = (uint32_t)cursor->len;
+ wtcursor->value.size = cursor->len;
} else {
wtcursor->value.data = cp->v;
wtcursor->value.size = cp->len;
@@ -1025,11 +1112,11 @@ copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
*/
static int
nextprev(WT_CURSOR *wtcursor, const char *fname,
- int (*f)(kvs_t, struct kvs_record *, unsigned long, unsigned long))
+ int (*f)(he_t, HE_ITEM *, size_t, size_t))
{
- struct kvs_record *r;
CACHE_RECORD *cp;
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
WT_ITEM a, b;
WT_SESSION *session;
@@ -1050,7 +1137,7 @@ nextprev(WT_CURSOR *wtcursor, const char *fname,
* the store. We don't care if we race, we're not guaranteeing any
* special behavior with respect to phantoms.
*/
- if (ws->kvscache_inuse == 0) {
+ if (ws->he_cache_inuse == 0) {
cache_ret = WT_NOTFOUND;
goto cache_clean;
}
@@ -1079,7 +1166,7 @@ skip_deleted:
* entry, or we reach the end/beginning.
*/
for (cache_rm = 0;;) {
- if ((ret = kvs_call(wtcursor, fname, ws->kvscache, f)) != 0)
+ if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0)
break;
if ((ret = cache_value_unmarshall(wtcursor)) != 0)
return (ret);
@@ -1134,7 +1221,7 @@ skip_deleted:
cache_clean:
/* Get the next/prev entry from the store. */
- ret = kvs_call(wtcursor, fname, ws->kvs, f);
+ ret = helium_call(wtcursor, fname, ws->he, f);
if (ret != 0 && ret != WT_NOTFOUND)
return (ret);
@@ -1154,7 +1241,7 @@ cache_clean:
if ((ret = wtext->collate(wtext, session, &a, &b, &cmp)) != 0)
return (ret);
- if (f == kvs_next) {
+ if (f == he_next) {
if (cmp >= 0)
ret = WT_NOTFOUND;
else
@@ -1196,34 +1283,34 @@ cache_clean:
}
/*
- * kvs_cursor_next --
+ * helium_cursor_next --
* WT_CURSOR.next method.
*/
static int
-kvs_cursor_next(WT_CURSOR *wtcursor)
+helium_cursor_next(WT_CURSOR *wtcursor)
{
- return (nextprev(wtcursor, "kvs_next", kvs_next));
+ return (nextprev(wtcursor, "he_next", he_next));
}
/*
- * kvs_cursor_prev --
+ * helium_cursor_prev --
* WT_CURSOR.prev method.
*/
static int
-kvs_cursor_prev(WT_CURSOR *wtcursor)
+helium_cursor_prev(WT_CURSOR *wtcursor)
{
- return (nextprev(wtcursor, "kvs_prev", kvs_prev));
+ return (nextprev(wtcursor, "he_prev", he_prev));
}
/*
- * kvs_cursor_reset --
+ * helium_cursor_reset --
* WT_CURSOR.reset method.
*/
static int
-kvs_cursor_reset(WT_CURSOR *wtcursor)
+helium_cursor_reset(WT_CURSOR *wtcursor)
{
- struct kvs_record *r;
CURSOR *cursor;
+ HE_ITEM *r;
cursor = (CURSOR *)wtcursor;
r = &cursor->record;
@@ -1237,11 +1324,11 @@ kvs_cursor_reset(WT_CURSOR *wtcursor)
}
/*
- * kvs_cursor_search --
+ * helium_cursor_search --
* WT_CURSOR.search method.
*/
static int
-kvs_cursor_search(WT_CURSOR *wtcursor)
+helium_cursor_search(WT_CURSOR *wtcursor)
{
CACHE_RECORD *cp;
CURSOR *cursor;
@@ -1259,7 +1346,8 @@ kvs_cursor_search(WT_CURSOR *wtcursor)
* Check for an entry in the cache. If we find one, unmarshall it
* and check for a visible entry we can return.
*/
- if ((ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) == 0) {
+ if ((ret =
+ helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) {
if ((ret = cache_value_unmarshall(wtcursor)) != 0)
return (ret);
if (cache_value_visible(wtcursor, &cp))
@@ -1269,18 +1357,18 @@ kvs_cursor_search(WT_CURSOR *wtcursor)
return (ret);
/* Check for an entry in the primary store. */
- if ((ret = kvs_call(wtcursor, "kvs_get", ws->kvs, kvs_get)) != 0)
+ if ((ret = helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0)
return (ret);
return (copyout_val(wtcursor, NULL));
}
/*
- * kvs_cursor_search_near --
+ * helium_cursor_search_near --
* WT_CURSOR.search_near method.
*/
static int
-kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
+helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
{
int ret = 0;
@@ -1294,7 +1382,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
*/
/* Search for an exact match. */
- if ((ret = kvs_cursor_search(wtcursor)) == 0) {
+ if ((ret = helium_cursor_search(wtcursor)) == 0) {
*exact = 0;
return (0);
}
@@ -1302,7 +1390,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
return (ret);
/* Search for a key that's larger. */
- if ((ret = kvs_cursor_next(wtcursor)) == 0) {
+ if ((ret = helium_cursor_next(wtcursor)) == 0) {
*exact = 1;
return (0);
}
@@ -1310,7 +1398,7 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
return (ret);
/* Search for a key that's smaller. */
- if ((ret = kvs_cursor_prev(wtcursor)) == 0) {
+ if ((ret = helium_cursor_prev(wtcursor)) == 0) {
*exact = -1;
return (0);
}
@@ -1319,16 +1407,16 @@ kvs_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
}
/*
- * kvs_cursor_insert --
+ * helium_cursor_insert --
* WT_CURSOR.insert method.
*/
static int
-kvs_cursor_insert(WT_CURSOR *wtcursor)
+helium_cursor_insert(WT_CURSOR *wtcursor)
{
- struct kvs_record *r;
CACHE_RECORD *cp;
CURSOR *cursor;
- KVS_SOURCE *ks;
+ HE_ITEM *r;
+ HELIUM_SOURCE *hs;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
@@ -1338,13 +1426,16 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
ws = cursor->ws;
- ks = ws->ks;
+ hs = ws->hs;
r = &cursor->record;
/* Get the WiredTiger cursor's key. */
if ((ret = copyin_key(wtcursor, 1)) != 0)
return (ret);
+ VMSG(wtext, session, VERBOSE_L2,
+ "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val);
+
/* Clear the value, assume we're adding the first cache entry. */
cursor->len = 0;
@@ -1353,7 +1444,8 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
return (ret);
/* Read the record from the cache store. */
- switch (ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) {
+ switch (ret = helium_call(
+ wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
case 0:
/* Crack the record. */
if ((ret = cache_value_unmarshall(wtcursor)) != 0)
@@ -1385,8 +1477,8 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
break;
/* If overwrite is false, an entry is an error. */
- if ((ret = kvs_call(
- wtcursor, "kvs_get", ws->kvs, kvs_get)) != WT_NOTFOUND) {
+ if ((ret = helium_call(
+ wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) {
if (ret == 0)
ret = WT_DUPLICATE_KEY;
goto err;
@@ -1398,21 +1490,17 @@ kvs_cursor_insert(WT_CURSOR *wtcursor)
}
/*
- * Create a new cache value based on the current cache record plus the
- * WiredTiger cursor's value.
+ * Create a new value using the current cache record plus the WiredTiger
+ * cursor's value, and update the cache.
*/
if ((ret = cache_value_append(wtcursor, 0)) != 0)
goto err;
-
- /* Push the record into the cache. */
- if ((ret = kvs_set(ws->kvscache, r)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_set: %s", kvs_strerror(ret));
+ if ((ret = he_update(ws->he_cache, r)) != 0)
+ EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret));
/* Update the state while still holding the lock. */
- ws->kvscache_inuse = 1;
- ws->cleaner_bytes += wtcursor->value.size;
- ++ws->cleaner_ops;
+ if (ws->he_cache_inuse == 0)
+ ws->he_cache_inuse = 1;
/* Discard the lock. */
err: ESET(unlock(wtext, session, &ws->lock));
@@ -1420,7 +1508,7 @@ err: ESET(unlock(wtext, session, &ws->lock));
/* If successful, request notification at transaction resolution. */
if (ret == 0)
ESET(
- wtext->transaction_notify(wtext, session, &ks->txn_notify));
+ wtext->transaction_notify(wtext, session, &hs->txn_notify));
return (ret);
}
@@ -1432,10 +1520,10 @@ err: ESET(unlock(wtext, session, &ws->lock));
static int
update(WT_CURSOR *wtcursor, int remove_op)
{
- struct kvs_record *r;
CACHE_RECORD *cp;
CURSOR *cursor;
- KVS_SOURCE *ks;
+ HE_ITEM *r;
+ HELIUM_SOURCE *hs;
WT_EXTENSION_API *wtext;
WT_SESSION *session;
WT_SOURCE *ws;
@@ -1445,13 +1533,18 @@ update(WT_CURSOR *wtcursor, int remove_op)
cursor = (CURSOR *)wtcursor;
wtext = cursor->wtext;
ws = cursor->ws;
- ks = ws->ks;
+ hs = ws->hs;
r = &cursor->record;
/* Get the WiredTiger cursor's key. */
if ((ret = copyin_key(wtcursor, 0)) != 0)
return (ret);
+ VMSG(wtext, session, VERBOSE_L2,
+ "%c %.*s.%.*s",
+ remove_op ? 'R' : 'U',
+ (int)r->key_len, r->key, (int)r->val_len, r->val);
+
/* Clear the value, assume we're adding the first cache entry. */
cursor->len = 0;
@@ -1460,7 +1553,8 @@ update(WT_CURSOR *wtcursor, int remove_op)
return (ret);
/* Read the record from the cache store. */
- switch (ret = kvs_call(wtcursor, "kvs_get", ws->kvscache, kvs_get)) {
+ switch (ret = helium_call(
+ wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
case 0:
/* Crack the record. */
if ((ret = cache_value_unmarshall(wtcursor)) != 0)
@@ -1491,8 +1585,8 @@ update(WT_CURSOR *wtcursor, int remove_op)
break;
/* If overwrite is false, no entry is an error. */
- if ((ret = kvs_call(
- wtcursor, "kvs_get", ws->kvs, kvs_get)) != 0)
+ if ((ret =
+ helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0)
goto err;
/*
@@ -1513,10 +1607,12 @@ update(WT_CURSOR *wtcursor, int remove_op)
goto err;
/* Push the record into the cache. */
- if ((ret = kvs_set(ws->kvscache, r)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_set: %s", kvs_strerror(ret));
- ws->kvscache_inuse = 1;
+ if ((ret = he_update(ws->he_cache, r)) != 0)
+ EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret));
+
+ /* Update the state while still holding the lock. */
+ if (ws->he_cache_inuse == 0)
+ ws->he_cache_inuse = 1;
/* Discard the lock. */
err: ESET(unlock(wtext, session, &ws->lock));
@@ -1524,27 +1620,27 @@ err: ESET(unlock(wtext, session, &ws->lock));
/* If successful, request notification at transaction resolution. */
if (ret == 0)
ESET(
- wtext->transaction_notify(wtext, session, &ks->txn_notify));
+ wtext->transaction_notify(wtext, session, &hs->txn_notify));
return (ret);
}
/*
- * kvs_cursor_update --
+ * helium_cursor_update --
* WT_CURSOR.update method.
*/
static int
-kvs_cursor_update(WT_CURSOR *wtcursor)
+helium_cursor_update(WT_CURSOR *wtcursor)
{
return (update(wtcursor, 0));
}
/*
- * kvs_cursor_remove --
+ * helium_cursor_remove --
* WT_CURSOR.remove method.
*/
static int
-kvs_cursor_remove(WT_CURSOR *wtcursor)
+helium_cursor_remove(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
WT_SOURCE *ws;
@@ -1558,18 +1654,18 @@ kvs_cursor_remove(WT_CURSOR *wtcursor)
*/
if (ws->config_bitfield) {
wtcursor->value.size = 1;
- wtcursor->value.data = "\0";
+ wtcursor->value.data = "";
return (update(wtcursor, 0));
}
return (update(wtcursor, 1));
}
/*
- * kvs_cursor_close --
+ * helium_cursor_close --
* WT_CURSOR.close method.
*/
static int
-kvs_cursor_close(WT_CURSOR *wtcursor)
+helium_cursor_close(WT_CURSOR *wtcursor)
{
CURSOR *cursor;
WT_EXTENSION_API *wtext;
@@ -1595,27 +1691,27 @@ kvs_cursor_close(WT_CURSOR *wtcursor)
* ws_source_name --
* Build a namespace name.
*/
-static inline int
+static int
ws_source_name(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, const char *suffix, char **pp)
{
DATA_SOURCE *ds;
WT_EXTENSION_API *wtext;
size_t len;
+ int ret = 0;
const char *p;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
/*
- * Create the store's name. Application URIs are "memrata:device/XXX";
- * we want the names on the memrata device to be obviously WiredTiger's,
- * and the device name isn't interesting. Convert to "WiredTiger:XXX",
+ * Create the store's name. Application URIs are "helium:device/name";
+ * we want the names on the Helium device to be obviously WiredTiger's,
+ * and the device name isn't interesting. Convert to "WiredTiger:name",
* and add an optional suffix.
*/
- if (strncmp(uri, "memrata:", sizeof("memrata:") - 1) != 0 ||
- (p = strchr(uri, '/')) == NULL)
- ERET(wtext, session, EINVAL, "%s: illegal memrata URI", uri);
+ if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL)
+ ERET(wtext, session, EINVAL, "%s: illegal Helium URI", uri);
++p;
len = strlen(WT_NAME_PREFIX) +
@@ -1628,85 +1724,39 @@ ws_source_name(WT_DATA_SOURCE *wtds,
}
/*
- * ws_source_drop_namespace --
- * Drop a namespace.
- */
-static int
-ws_source_drop_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session,
- const char *uri, const char *suffix, kvs_t kvs_device)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- char *p;
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
- p = NULL;
-
- /* Drop the underlying KVS namespace. */
- if ((ret = ws_source_name(wtds, session, uri, suffix, &p)) != 0)
- return (ret);
- if ((ret = kvs_delete_namespace(kvs_device, p)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_delete_namespace: %s: %s", p, kvs_strerror(ret));
-
- free(p);
- return (ret);
-}
-
-/*
- * ws_source_rename_namespace --
- * Rename a namespace.
- */
-static int
-ws_source_rename_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session,
- const char *uri, const char *newuri, const char *suffix, kvs_t kvs_device)
-{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
- int ret = 0;
- char *p, *pnew;
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
- p = pnew = NULL;
-
- /* Rename the underlying KVS namespace. */
- ret = ws_source_name(wtds, session, uri, suffix, &p);
- if (ret == 0)
- ret = ws_source_name(wtds, session, newuri, suffix, &pnew);
- if (ret == 0 && (ret = kvs_rename_namespace(kvs_device, p, pnew)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_rename_namespace: %s: %s", p, kvs_strerror(ret));
-
- free(p);
- free(pnew);
- return (ret);
-}
-
-/*
* ws_source_close --
- * Kill a WT_SOURCE structure.
+ * Close a WT_SOURCE reference.
*/
static int
ws_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, WT_SOURCE *ws)
{
- int ret = 0;
+ int ret = 0, tret;
+ /*
+ * Warn if open cursors: it shouldn't happen because the upper layers of
+ * WiredTiger prevent it, so we don't do anything more than warn.
+ */
if (ws->ref != 0)
EMSG(wtext, session, WT_ERROR,
"%s: open object with %u open cursors being closed",
ws->uri, ws->ref);
- if (ws->kvs != NULL && (ret = kvs_close(ws->kvs)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_close: %s: %s", ws->uri, kvs_strerror(ret));
- ws->kvs = NULL;
- if (ws->kvscache != NULL && (ret = kvs_close(ws->kvscache)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_close: %s(cache): %s", ws->uri, kvs_strerror(ret));
- ws->kvscache = NULL;
+ if (ws->he != NULL) {
+ if ((tret = he_commit(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_commit: %s: %s", ws->uri, he_strerror(tret));
+ if ((tret = he_close(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s", ws->uri, he_strerror(tret));
+ ws->he = NULL;
+ }
+ if (ws->he_cache != NULL) {
+ if ((tret = he_close(ws->he_cache)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s(cache): %s",
+ ws->uri, he_strerror(tret));
+ ws->he_cache = NULL;
+ }
if (ws->lockinit)
ESET(lock_destroy(wtext, session, &ws->lock));
@@ -1718,33 +1768,36 @@ ws_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, WT_SOURCE *ws)
}
/*
- * ws_source_open_namespace --
- * Open a namespace.
+ * ws_source_open_object --
+ * Open an object in the Helium store.
*/
static int
-ws_source_open_namespace(WT_DATA_SOURCE *wtds, WT_SESSION *session,
- const char *uri, const char *suffix, kvs_t kvs_device, int flags,
- kvs_t *kvsp)
+ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ HELIUM_SOURCE *hs,
+ const char *uri, const char *suffix, int flags, he_t *hep)
{
DATA_SOURCE *ds;
WT_EXTENSION_API *wtext;
- kvs_t kvs;
+ he_t he;
char *p;
int ret = 0;
- *kvsp = NULL;
+ *hep = NULL;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
p = NULL;
- /* Open the underlying KVS namespace. */
+ /* Open the underlying Helium object. */
if ((ret = ws_source_name(wtds, session, uri, suffix, &p)) != 0)
return (ret);
- if ((kvs = kvs_open_namespace(kvs_device, p, flags)) == NULL)
- EMSG(wtext, session, WT_ERROR,
- "kvs_open_namespace: %s: %s", p, kvs_strerror(os_errno()));
- *kvsp = kvs;
+ VMSG(wtext, session, VERBOSE_L1, "open %s/%s", hs->name, p);
+ if ((he = he_open(hs->device, p, flags, NULL)) == NULL) {
+ ret = os_errno();
+ EMSG(wtext, session, ret,
+ "he_open: %s/%s: %s", hs->name, p, he_strerror(ret));
+ }
+ *hep = he;
free(p);
return (ret);
@@ -1763,7 +1816,7 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp)
{
DATA_SOURCE *ds;
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
WT_CONFIG_ITEM a;
WT_EXTENSION_API *wtext;
WT_SOURCE *ws;
@@ -1778,26 +1831,26 @@ ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
ws = NULL;
/*
- * The URI will be "memrata:" followed by a KVS name and object name
- * pair separated by a slash, for example, "memrata:dev/object".
+ * The URI will be "helium:" followed by a Helium name and object name
+ * pair separated by a slash, for example, "helium:volume/object".
*/
- if (strncmp(uri, "memrata:", strlen("memrata:")) != 0)
+ if (!prefix_match(uri, "helium:"))
goto bad_name;
- p = uri + strlen("memrata:");
+ p = uri + strlen("helium:");
if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0')
bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri);
len = (size_t)(t - p);
- /* Find a matching KVS device. */
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if (STRING_MATCH(ks->name, p, len))
+ /* Find a matching Helium device. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if (string_match(hs->name, p, len))
break;
- if (ks == NULL)
+ if (hs == NULL)
ERET(wtext, NULL,
- EINVAL, "%s: no matching Memrata store found", uri);
+ EINVAL, "%s: no matching Helium store found", uri);
/*
- * We're about to walk the KVS device's list of files, acquire the
+ * We're about to walk the Helium device's list of files, acquire the
* global lock.
*/
if ((ret = writelock(wtext, session, &ds->global_lock)) != 0)
@@ -1808,7 +1861,7 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri);
* for the object's lock, optionally check if the object is busy, and
* return.
*/
- for (ws = ks->ws_head; ws != NULL; ws = ws->next)
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next)
if (strcmp(ws->uri, uri) == 0) {
/* Check to see if the object is busy. */
if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) {
@@ -1836,45 +1889,35 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri);
if ((ret = lock_init(wtext, session, &ws->lock)) != 0)
goto err;
ws->lockinit = 1;
- ws->ks = ks;
+ ws->hs = hs;
/*
- * Open the underlying KVS namespaces, then push the change.
+ * Open the underlying Helium objects, then push the change.
*
* The naming scheme is simple: the URI names the primary store, and the
* URI with a trailing suffix names the associated caching store.
*
- * We can set debug and truncate flags, we always set the create flag,
- * our caller handles attempts to create existing objects.
+ * We can set truncate flag, we always set the create flag, our caller
+ * handles attempts to create existing objects.
*/
- oflags = KVS_O_CREATE;
+ oflags = HE_O_CREATE;
if ((ret = wtext->config_get(wtext,
- session, config, "kvs_open_o_debug", &a)) == 0 && a.val != 0)
- oflags |= KVS_O_DEBUG;
- if (ret != 0 && ret != WT_NOTFOUND) {
- EMSG(wtext, session, ret,
- "kvs_open_o_debug configuration: %s", wtext->strerror(ret));
- goto err;
- }
- if ((ret = wtext->config_get(wtext,
- session, config, "kvs_open_o_truncate", &a)) == 0 && a.val != 0)
- oflags |= KVS_O_TRUNCATE;
- if (ret != 0 && ret != WT_NOTFOUND) {
- EMSG(wtext, session, ret,
- "kvs_open_o_truncate configuration: %s",
+ session, config, "helium_o_truncate", &a)) == 0 && a.val != 0)
+ oflags |= HE_O_TRUNCATE;
+ if (ret != 0 && ret != WT_NOTFOUND)
+ EMSG_ERR(wtext, session, ret,
+ "helium_o_truncate configuration: %s",
wtext->strerror(ret));
- goto err;
- }
- if ((ret = ws_source_open_namespace(wtds,
- session, uri, NULL, ks->kvs_device, oflags, &ws->kvs)) != 0)
+ if ((ret = ws_source_open_object(
+ wtds, session, hs, uri, NULL, oflags, &ws->he)) != 0)
goto err;
- if ((ret = ws_source_open_namespace(wtds, session,
- uri, WT_NAME_CACHE, ks->kvs_device, oflags, &ws->kvscache)) != 0)
+ if ((ret = ws_source_open_object(
+ wtds, session, hs, uri, WT_NAME_CACHE, oflags, &ws->he_cache)) != 0)
goto err;
- if ((ret = kvs_commit(ws->kvs)) != 0)
- EMSG_ERR(wtext, session, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
+ if ((ret = he_commit(ws->he)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "he_commit: %s", he_strerror(ret));
/* Optionally trade the global lock for the object lock. */
if (!(flags & WS_SOURCE_OPEN_GLOBAL) &&
@@ -1882,8 +1925,8 @@ bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri);
goto err;
/* Insert the new entry at the head of the list. */
- ws->next = ks->ws_head;
- ks->ws_head = ws;
+ ws->next = hs->ws_head;
+ hs->ws_head = ws;
*refp = ws;
ws = NULL;
@@ -1905,7 +1948,7 @@ err: if (ws != NULL)
/*
* master_uri_get --
- * Get the KVS master record for a URI.
+ * Get the Helium master record for a URI.
*/
static int
master_uri_get(WT_DATA_SOURCE *wtds,
@@ -1922,7 +1965,7 @@ master_uri_get(WT_DATA_SOURCE *wtds,
/*
* master_uri_drop --
- * Drop the KVS master record for a URI.
+ * Drop the Helium master record for a URI.
*/
static int
master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri)
@@ -1938,7 +1981,7 @@ master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri)
/*
* master_uri_rename --
- * Rename the KVS master record for a URI.
+ * Rename the Helium master record for a URI.
*/
static int
master_uri_rename(WT_DATA_SOURCE *wtds,
@@ -1971,14 +2014,14 @@ err: free((void *)value);
/*
* master_uri_set --
- * Set the KVS master record for a URI.
+ * Set the Helium master record for a URI.
*/
static int
master_uri_set(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- WT_CONFIG_ITEM a, b;
+ WT_CONFIG_ITEM a, b, c;
WT_EXTENSION_API *wtext;
int exclusive, ret = 0;
char value[1024];
@@ -2016,14 +2059,27 @@ master_uri_set(WT_DATA_SOURCE *wtds,
wtext->strerror(ret));
}
+ /* Get the compression configuration. */
+ if ((ret = wtext->config_get(
+ wtext, session, config, "helium_o_compress", &c)) != 0) {
+ if (ret == WT_NOTFOUND)
+ c.val = 0;
+ else
+ ERET(wtext, session, ret,
+ "helium_o_compress configuration: %s",
+ wtext->strerror(ret));
+ }
+
/*
* Create a new reference using insert (which fails if the record
- * already exists). If that succeeds, we just used up a unique ID,
- * update the master ID record.
+ * already exists).
*/
(void)snprintf(value, sizeof(value),
- "version=(major=%d,minor=%d),key_format=%.*s,value_format=%.*s",
- KVS_MAJOR, KVS_MINOR, (int)a.len, a.str, (int)b.len, b.str);
+ "wiredtiger_helium_version=(major=%d,minor=%d),"
+ "key_format=%.*s,value_format=%.*s,"
+ "helium_o_compress=%d",
+ WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR,
+ (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0);
if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0)
return (0);
if (ret == WT_DUPLICATE_KEY)
@@ -2032,11 +2088,11 @@ master_uri_set(WT_DATA_SOURCE *wtds,
}
/*
- * kvs_session_open_cursor --
+ * helium_session_open_cursor --
* WT_SESSION.open_cursor method.
*/
static int
-kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor)
{
CURSOR *cursor;
@@ -2078,15 +2134,15 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
"collator configuration: %s", wtext->strerror(ret));
/* Finish initializing the cursor. */
- cursor->wtcursor.close = kvs_cursor_close;
- cursor->wtcursor.insert = kvs_cursor_insert;
- cursor->wtcursor.next = kvs_cursor_next;
- cursor->wtcursor.prev = kvs_cursor_prev;
- cursor->wtcursor.remove = kvs_cursor_remove;
- cursor->wtcursor.reset = kvs_cursor_reset;
- cursor->wtcursor.search = kvs_cursor_search;
- cursor->wtcursor.search_near = kvs_cursor_search_near;
- cursor->wtcursor.update = kvs_cursor_update;
+ cursor->wtcursor.close = helium_cursor_close;
+ cursor->wtcursor.insert = helium_cursor_insert;
+ cursor->wtcursor.next = helium_cursor_next;
+ cursor->wtcursor.prev = helium_cursor_prev;
+ cursor->wtcursor.remove = helium_cursor_remove;
+ cursor->wtcursor.reset = helium_cursor_reset;
+ cursor->wtcursor.search = helium_cursor_search;
+ cursor->wtcursor.search_near = helium_cursor_search_near;
+ cursor->wtcursor.update = helium_cursor_update;
cursor->wtext = wtext;
cursor->record.key = cursor->__key;
@@ -2123,21 +2179,28 @@ kvs_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
ws->config_bitfield =
v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't';
+ if ((ret = wtext->config_strget(
+ wtext, session, value, "helium_o_compress", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "helium_o_compress configuration: %s",
+ wtext->strerror(ret));
+ ws->config_compress = v.val ? 1 : 0;
+
/*
* If it's a record-number key, read the last record from the
* object and set the allocation record value.
*/
if (ws->config_recno) {
wtcursor = (WT_CURSOR *)cursor;
- if ((ret = kvs_cursor_reset(wtcursor)) != 0)
+ if ((ret = helium_cursor_reset(wtcursor)) != 0)
goto err;
- if ((ret = kvs_cursor_prev(wtcursor)) == 0)
+ if ((ret = helium_cursor_prev(wtcursor)) == 0)
ws->append_recno = wtcursor->recno;
else if (ret != WT_NOTFOUND)
goto err;
- if ((ret = kvs_cursor_reset(wtcursor)) != 0)
+ if ((ret = helium_cursor_reset(wtcursor)) != 0)
goto err;
}
@@ -2161,11 +2224,11 @@ err: if (ws != NULL && locked)
}
/*
- * kvs_session_create --
+ * helium_session_create --
* WT_SESSION.create method.
*/
static int
-kvs_session_create(WT_DATA_SOURCE *wtds,
+helium_session_create(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
@@ -2191,7 +2254,7 @@ kvs_session_create(WT_DATA_SOURCE *wtds,
* We've discarded the lock, but that's OK, creates are single-threaded
* at the WiredTiger level, it's not our problem to solve.
*
- * If unable to enter a WiredTiger record, leave the KVS store alone.
+ * If unable to enter a WiredTiger record, leave the Helium store alone.
* A subsequent create should do the right thing, we aren't leaving
* anything in an inconsistent state.
*/
@@ -2199,15 +2262,15 @@ kvs_session_create(WT_DATA_SOURCE *wtds,
}
/*
- * kvs_session_drop --
+ * helium_session_drop --
* WT_SESSION.drop method.
*/
static int
-kvs_session_drop(WT_DATA_SOURCE *wtds,
+helium_session_drop(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
WT_EXTENSION_API *wtext;
WT_SOURCE **p, *ws;
int ret = 0;
@@ -2217,7 +2280,7 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
/*
* Get a locked reference to the data source: hold the global lock,
- * we are going to change the list of objects for a KVS store.
+ * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects.
*
* Remove the entry from the WT_SOURCE list -- it's a singly-linked
* list, find the reference to it.
@@ -2225,28 +2288,23 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
if ((ret = ws_source_open(wtds, session, uri, config,
WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0)
return (ret);
- ks = ws->ks;
- for (p = &ks->ws_head; *p != NULL; p = &(*p)->next)
+ hs = ws->hs;
+ for (p = &hs->ws_head; *p != NULL; p = &(*p)->next)
if (*p == ws) {
*p = (*p)->next;
break;
}
- /* Close the source, discarding the handles and structure. */
+ /* Drop the underlying Helium objects. */
+ ESET(he_remove(ws->he));
+ ws->he = NULL; /* The handle is dead. */
+ ESET(he_remove(ws->he_cache));
+ ws->he_cache = NULL; /* The handle is dead. */
+
+ /* Close the source, discarding the structure. */
ESET(ws_source_close(wtext, session, ws));
ws = NULL;
- /* Drop the underlying namespaces. */
- ESET(ws_source_drop_namespace(
- wtds, session, uri, NULL, ks->kvs_device));
- ESET(ws_source_drop_namespace(
- wtds, session, uri, WT_NAME_CACHE, ks->kvs_device));
-
- /* Push the change. */
- if ((ret = kvs_commit(ks->kvs_device)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
-
/* Discard the metadata entry. */
ESET(master_uri_drop(wtds, session, uri));
@@ -2262,19 +2320,18 @@ kvs_session_drop(WT_DATA_SOURCE *wtds,
}
/*
- * kvs_session_rename --
+ * helium_session_rename --
* WT_SESSION.rename method.
*/
static int
-kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
const char *uri, const char *newuri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- KVS_SOURCE *ks;
WT_EXTENSION_API *wtext;
WT_SOURCE *ws;
int ret = 0;
- char *copy;
+ char *p;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
@@ -2287,27 +2344,26 @@ kvs_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
if ((ret = ws_source_open(wtds, session, uri, config,
WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0)
return (ret);
- ks = ws->ks;
- /* Get a copy of the new name. */
- if ((copy = strdup(newuri)) == NULL) {
+ /* Get a copy of the new name for the WT_SOURCE structure. */
+ if ((p = strdup(newuri)) == NULL) {
ret = os_errno();
goto err;
}
free(ws->uri);
- ws->uri = copy;
- copy = NULL;
+ ws->uri = p;
- /* Rename the underlying namespaces. */
- ESET(ws_source_rename_namespace(
- wtds, session, uri, newuri, NULL, ks->kvs_device));
- ESET(ws_source_rename_namespace(
- wtds, session, uri, newuri, WT_NAME_CACHE, ks->kvs_device));
-
- /* Push the change. */
- if ((ret = kvs_commit(ws->kvs)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
+ /* Rename the underlying Helium objects. */
+ ESET(ws_source_name(wtds, session, newuri, NULL, &p));
+ if (ret == 0) {
+ ESET(he_rename(ws->he, p));
+ free(p);
+ }
+ ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p));
+ if (ret == 0) {
+ ESET(he_rename(ws->he_cache, p));
+ free(p);
+ }
/* Update the metadata record. */
ESET(master_uri_rename(wtds, session, uri, newuri));
@@ -2325,17 +2381,17 @@ err: ESET(unlock(wtext, session, &ds->global_lock));
}
/*
- * kvs_session_truncate --
+ * helium_session_truncate --
* WT_SESSION.truncate method.
*/
static int
-kvs_session_truncate(WT_DATA_SOURCE *wtds,
+helium_session_truncate(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
WT_EXTENSION_API *wtext;
WT_SOURCE *ws;
- int ret = 0;
+ int ret = 0, tret;
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
@@ -2346,47 +2402,42 @@ kvs_session_truncate(WT_DATA_SOURCE *wtds,
return (ret);
/* Truncate the underlying namespaces. */
- if ((ret = kvs_truncate(ws->kvs)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret));
- if ((ret = kvs_truncate(ws->kvscache)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_truncate: %s: %s", ws->uri, kvs_strerror(ret));
+ if ((tret = he_truncate(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_truncate: %s: %s", ws->uri, he_strerror(tret));
+ if ((tret = he_truncate(ws->he_cache)) != 0)
+ EMSG(wtext, session, tret,
+ "he_truncate: %s: %s", ws->uri, he_strerror(tret));
ESET(unlock(wtext, session, &ws->lock));
return (ret);
}
/*
- * kvs_session_verify --
+ * helium_session_verify --
* WT_SESSION.verify method.
*/
static int
-kvs_session_verify(WT_DATA_SOURCE *wtds,
+helium_session_verify(WT_DATA_SOURCE *wtds,
WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
{
- DATA_SOURCE *ds;
- WT_EXTENSION_API *wtext;
-
+ (void)wtds;
+ (void)session;
(void)uri;
(void)config;
-
- ds = (DATA_SOURCE *)wtds;
- wtext = ds->wtext;
-
- ERET(wtext, session, ENOTSUP, "verify: %s", strerror(ENOTSUP));
+ return (0);
}
/*
- * kvs_session_checkpoint --
+ * helium_session_checkpoint --
* WT_SESSION.checkpoint method.
*/
static int
-kvs_session_checkpoint(
+helium_session_checkpoint(
WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config)
{
DATA_SOURCE *ds;
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
WT_EXTENSION_API *wtext;
int ret = 0;
@@ -2395,208 +2446,68 @@ kvs_session_checkpoint(
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
- /*
- * Flush the device.
- *
- * XXX
- * This is a placeholder until we figure out what recovery is going
- * to look like.
- */
- if ((ks = ds->kvs_head) != NULL &&
- (ret = kvs_commit(ks->kvs_device)) != 0)
- ERET(wtext, session, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
-
- return (0);
-}
-
-/*
- * kvs_config_devices --
- * Convert the device list into an argv[] array.
- */
-static int
-kvs_config_devices(
- WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *orig, char ***devices)
-{
- WT_CONFIG_ITEM k, v;
- WT_CONFIG_SCAN *scan;
- size_t len;
- u_int cnt, slots;
- int ret = 0;
- char **argv, **p;
-
- argv = NULL;
-
- /* Set up the scan of the device list. */
- if ((ret = wtext->config_scan_begin(
- wtext, NULL, orig->str, orig->len, &scan)) != 0)
- EMSG_ERR(wtext, NULL, ret,
- "WT_EXTENSION_API.config_scan_begin: %s",
- wtext->strerror(ret));
-
- for (cnt = slots = 0; (ret = wtext->
- config_scan_next(wtext, scan, &k, &v)) == 0; ++cnt) {
- if (cnt + 1 >= slots) { /* NULL-terminate the array */
- len = slots + 20 * sizeof(*argv);
- if ((p = realloc(argv, len)) == NULL) {
- ret = os_errno();
- goto err;
- }
- argv = p;
- slots += 20;
- }
- len = k.len + 1;
- if ((argv[cnt] = calloc(len, sizeof(**argv))) == NULL) {
- ret = os_errno();
- goto err;
- }
- argv[cnt + 1] = NULL;
- memcpy(argv[cnt], k.str, k.len);
- }
- if (ret != WT_NOTFOUND)
- EMSG_ERR(wtext, NULL, ret,
- "WT_EXTENSION_API.config_scan_next: %s",
- wtext->strerror(ret));
- if ((ret = wtext->config_scan_end(wtext, scan)) != 0)
- EMSG_ERR(wtext, NULL, ret,
- "WT_EXTENSION_API.config_scan_end: %s",
- wtext->strerror(ret));
+ /* Flush all volumes. */
+ if ((hs = ds->hs_head) != NULL &&
+ (ret = he_commit(hs->he_volume)) != 0)
+ ERET(wtext, session, ret,
+ "he_commit: %s: %s", hs->device, he_strerror(ret));
- *devices = argv;
return (0);
-
-err: if (argv != NULL) {
- for (p = argv; *p != NULL; ++p)
- free(*p);
- free(argv);
- }
- return (ret);
-}
-
-/*
- * kvs_config_read --
- * Read KVS configuration.
- */
-static int
-kvs_config_read(WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *config,
- char ***devices, struct kvs_config *kvs_config, int *flagsp)
-{
- WT_CONFIG_ITEM k, v;
- WT_CONFIG_SCAN *scan;
- int ret = 0, tret;
-
- *flagsp = 0; /* Return default values. */
- if ((ret = kvs_default_config(kvs_config)) != 0)
- ERET(wtext, NULL,
- EINVAL, "kvs_default_config: %s", kvs_strerror(os_errno()));
-
- /* Set up the scan of the configuration arguments list. */
- if ((ret = wtext->config_scan_begin(
- wtext, NULL, config->str, config->len, &scan)) != 0)
- ERET(wtext, NULL, ret,
- "WT_EXTENSION_API.config_scan_begin: %s",
- wtext->strerror(ret));
- while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) {
- if (STRING_MATCH("kvs_devices", k.str, k.len)) {
- if ((ret = kvs_config_devices(wtext, &v, devices)) != 0)
- return (ret);
- continue;
- }
-
-#define KVS_CONFIG_SET(s, f) \
- if (STRING_MATCH(s, k.str, k.len)) { \
- kvs_config->f = (unsigned long)v.val; \
- continue; \
- }
- KVS_CONFIG_SET("kvs_parallelism", parallelism);
- KVS_CONFIG_SET("kvs_granularity", granularity);
- KVS_CONFIG_SET("kvs_avg_key_len", avg_key_len);
- KVS_CONFIG_SET("kvs_avg_val_len", avg_val_len);
- KVS_CONFIG_SET("kvs_write_bufs", write_bufs);
- KVS_CONFIG_SET("kvs_read_bufs", read_bufs);
- KVS_CONFIG_SET("kvs_commit_timeout", commit_timeout);
- KVS_CONFIG_SET("kvs_reclaim_threshold", reclaim_threshold);
- KVS_CONFIG_SET("kvs_reclaim_period", reclaim_period);
-
-#define KVS_FLAG_SET(s, f) \
- if (STRING_MATCH(s, k.str, k.len)) { \
- if (v.val != 0) \
- *flagsp |= f; \
- continue; \
- }
- /*
- * We don't export KVS_O_CREATE: WT_SESSION.create
- * always adds it in.
- */
- KVS_FLAG_SET("kvs_open_o_debug", KVS_O_DEBUG);
- KVS_FLAG_SET("kvs_open_o_truncate", KVS_O_TRUNCATE);
-
- EMSG_ERR(wtext, NULL, EINVAL,
- "unknown configuration key value pair %.*s/%.*s",
- (int)k.len, k.str, (int)v.len, v.str);
- }
-
- if (ret == WT_NOTFOUND)
- ret = 0;
- if (ret != 0)
- EMSG_ERR(wtext, NULL, ret,
- "WT_EXTENSION_API.config_scan_next: %s",
- wtext->strerror(ret));
-
-err: if ((tret = wtext->config_scan_end(wtext, scan)) != 0)
- EMSG(wtext, NULL, tret,
- "WT_EXTENSION_API.config_scan_end: %s",
- wtext->strerror(ret));
-
- return (ret);
}
/*
- * kvs_source_close --
- * Kill a KVS_SOURCE structure.
+ * helium_source_close --
+ * Discard a HELIUM_SOURCE.
*/
static int
-kvs_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, KVS_SOURCE *ks)
+helium_source_close(
+ WT_EXTENSION_API *wtext, WT_SESSION *session, HELIUM_SOURCE *hs)
{
WT_SOURCE *ws;
int ret = 0, tret;
/* Resolve the cache into the primary one last time and quit. */
- if (ks->cleaner_id != 0) {
- ks->cleaner_stop = 1;
+ if (hs->cleaner_id != 0) {
+ hs->cleaner_stop = 1;
- if ((tret = pthread_join(ks->cleaner_id, NULL)) != 0)
+ if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0)
EMSG(wtext, session, tret,
"pthread_join: %s", strerror(tret));
- ks->cleaner_id = 0;
+ hs->cleaner_id = 0;
}
/* Close the underlying WiredTiger sources. */
- while ((ws = ks->ws_head) != NULL) {
- ks->ws_head = ws->next;
+ while ((ws = hs->ws_head) != NULL) {
+ hs->ws_head = ws->next;
ESET(ws_source_close(wtext, session, ws));
}
- /* Flush and close the KVS source. */
- if (ks->kvs_device != NULL) {
- if ((tret = kvs_commit(ks->kvs_device)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_commit: %s: %s", ks->name, kvs_strerror(tret));
+ /* If the owner, close the database transaction store. */
+ if (hs->he_txn != NULL && hs->he_owner) {
+ if ((tret = he_close(hs->he_txn)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(tret));
+ hs->he_txn = NULL;
+ }
- /* If the owner, close the database transaction store. */
- if (ks->kvsowner && (tret = kvs_close(ks->kvstxn)) != 0)
+ /* Flush and close the Helium source. */
+ if (hs->he_volume != NULL) {
+ if ((tret = he_commit(hs->he_volume)) != 0)
EMSG(wtext, session, tret,
- "kvs_close: %s: %s",
- WT_NAME_TXN, kvs_strerror(tret));
+ "he_commit: %s: %s",
+ hs->device, he_strerror(tret));
- if ((tret = kvs_close(ks->kvs_device)) != 0)
- EMSG(wtext, session, WT_ERROR,
- "kvs_close: %s: %s", ks->name, kvs_strerror(tret));
- ks->kvs_device = NULL;
+ if ((tret = he_close(hs->he_volume)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s: %s",
+ hs->name, WT_NAME_INIT, he_strerror(tret));
+ hs->he_volume = NULL;
}
- free(ks->name);
- OVERWRITE_AND_FREE(ks);
+ free(hs->name);
+ free(hs->device);
+ OVERWRITE_AND_FREE(hs);
return (ret);
}
@@ -2609,12 +2520,12 @@ static int
cache_cleaner(WT_EXTENSION_API *wtext,
WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp)
{
- struct kvs_record *r;
CACHE_RECORD *cp;
CURSOR *cursor;
+ HE_ITEM *r;
WT_SOURCE *ws;
uint64_t txnid;
- int locked, recovery, ret = 0;
+ int locked, pushed, recovery, ret = 0;
/*
* Called in two ways: in normal processing mode where we're supplied a
@@ -2633,14 +2544,14 @@ cache_cleaner(WT_EXTENSION_API *wtext,
cursor = (CURSOR *)wtcursor;
ws = cursor->ws;
r = &cursor->record;
- locked = 0;
+ locked = pushed = 0;
/*
* For every cache key where all updates are globally visible:
* Migrate the most recent update value to the primary store.
*/
for (r->key_len = 0; (ret =
- kvs_call(wtcursor, "kvs_next", ws->kvscache, kvs_next)) == 0;) {
+ helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
/*
* Unmarshall the value, and if all of the updates are globally
* visible, update the primary with the last committed update.
@@ -2660,8 +2571,10 @@ cache_cleaner(WT_EXTENSION_API *wtext,
cache_value_last_not_aborted(wtcursor, &cp);
if (cp == NULL)
continue;
+
+ pushed = 1;
if (cp->remove) {
- if ((ret = kvs_del(ws->kvs, r)) == 0)
+ if ((ret = he_delete(ws->he, r)) == 0)
continue;
/*
@@ -2669,35 +2582,50 @@ cache_cleaner(WT_EXTENSION_API *wtext,
* primary at all, that is, an insert and remove pair
* may be confined to the cache.
*/
- if (ret == KVS_E_KEY_NOT_FOUND) {
+ if (ret == HE_ERR_ITEM_NOT_FOUND) {
ret = 0;
continue;
}
- ERET(wtext, NULL, WT_ERROR,
- "kvs_del: %s", kvs_strerror(ret));
+ ERET(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
} else {
r->val = cp->v;
r->val_len = cp->len;
- if ((ret = kvs_set(ws->kvs, r)) == 0)
+ /*
+ * If compression configured for this datastore, set the
+ * compression flag, we're updating the "real" store.
+ */
+ if (ws->config_compress)
+ r->flags |= HE_I_COMPRESS;
+ ret = he_update(ws->he, r);
+ r->flags = 0;
+ if (ret == 0)
continue;
- ERET(wtext, NULL, WT_ERROR,
- "kvs_set: %s", kvs_strerror(ret));
+
+ ERET(wtext, NULL, ret,
+ "he_update: %s", he_strerror(ret));
}
}
if (ret == WT_NOTFOUND)
ret = 0;
if (ret != 0)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_next: %s", kvs_strerror(ret));
+ ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
+
+ /*
+ * If we didn't move any keys from the cache to the primary, quit. It's
+ * possible we could still remove values from the cache, but not likely,
+ * and another pass would probably be wasted effort (especially locked).
+ */
+ if (!pushed)
+ return (0);
/*
* Push the store to stable storage for correctness. (It doesn't matter
- * what Memrata handle we push, so we just push one of them.)
+ * what Helium handle we commit, so we just commit one of them.)
*/
- if ((ret = kvs_commit(ws->kvs)) != 0)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
+ if ((ret = he_commit(ws->he)) != 0)
+ ERET(wtext, NULL, ret, "he_commit: %s", he_strerror(ret));
/*
* If we're performing recovery, that's all we need to do, we're going
@@ -2719,7 +2647,7 @@ cache_cleaner(WT_EXTENSION_API *wtext,
locked = 1;
for (r->key_len = 0; (ret =
- kvs_call(wtcursor, "kvs_next", ws->kvscache, kvs_next)) == 0;) {
+ helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
/*
* Unmarshall the value, and if all of the updates are globally
* visible, remove the cache entry.
@@ -2727,9 +2655,9 @@ cache_cleaner(WT_EXTENSION_API *wtext,
if ((ret = cache_value_unmarshall(wtcursor)) != 0)
goto err;
if (cache_value_visible_all(wtcursor, oldest)) {
- if ((ret = kvs_del(ws->kvscache, r)) != 0)
- EMSG_ERR(wtext, NULL, WT_ERROR,
- "kvs_del: %s", kvs_strerror(ret));
+ if ((ret = he_delete(ws->he_cache, r)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
continue;
}
@@ -2752,8 +2680,7 @@ cache_cleaner(WT_EXTENSION_API *wtext,
if (ret == WT_NOTFOUND)
ret = 0;
if (ret != 0)
- EMSG_ERR(wtext, NULL, WT_ERROR,
- "kvs_next: %s", kvs_strerror(ret));
+ EMSG_ERR(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
err: if (locked)
ESET(unlock(wtext, NULL, &ws->lock));
@@ -2766,11 +2693,11 @@ err: if (locked)
* Discard no longer needed entries from the transaction store.
*/
static int
-txn_cleaner(WT_CURSOR *wtcursor, kvs_t kvstxn, uint64_t txnmin)
+txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin)
{
CURSOR *cursor;
+ HE_ITEM *r;
WT_EXTENSION_API *wtext;
- struct kvs_record *r;
uint64_t txnid;
int ret = 0;
@@ -2783,23 +2710,23 @@ txn_cleaner(WT_CURSOR *wtcursor, kvs_t kvstxn, uint64_t txnmin)
* oldest transaction ID that appears anywhere in any cache.
*/
for (r->key_len = 0;
- (ret = kvs_call(wtcursor, "kvs_next", kvstxn, kvs_next)) == 0;) {
+ (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) {
memcpy(&txnid, r->key, sizeof(txnid));
- if (txnid < txnmin && (ret = kvs_del(kvstxn, r)) != 0)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_del: %s", kvs_strerror(ret));
+ if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
}
if (ret == WT_NOTFOUND)
ret = 0;
if (ret != 0)
- ERET(wtext, NULL, WT_ERROR, "kvs_next: %s", kvs_strerror(ret));
+ ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
return (0);
}
/*
* fake_cursor --
- * Fake up enough of a cursor to do KVS operations.
+ * Fake up enough of a cursor to do Helium operations.
*/
static int
fake_cursor(WT_EXTENSION_API *wtext, WT_CURSOR **wtcursorp)
@@ -2832,49 +2759,32 @@ fake_cursor(WT_EXTENSION_API *wtext, WT_CURSOR **wtcursorp)
}
/*
- * kvs_cleaner --
+ * cache_cleaner_worker --
* Thread to migrate data from the cache to the primary.
*/
static void *
-kvs_cleaner(void *arg)
+cache_cleaner_worker(void *arg)
{
struct timeval t;
CURSOR *cursor;
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
+ HE_STATS stats;
WT_CURSOR *wtcursor;
WT_EXTENSION_API *wtext;
WT_SOURCE *ws;
uint64_t oldest, txnmin, txntmp;
int cleaner_stop, delay, ret = 0;
- ks = (KVS_SOURCE *)arg;
+ hs = (HELIUM_SOURCE *)arg;
cursor = NULL;
- wtext = ks->wtext;
+ wtext = hs->wtext;
if ((ret = fake_cursor(wtext, &wtcursor)) != 0)
- EMSG_ERR(wtext, NULL, ret, "kvs_cleaner: %s", strerror(ret));
+ EMSG_ERR(wtext, NULL, ret, "cleaner: %s", strerror(ret));
cursor = (CURSOR *)wtcursor;
- for (delay = 1;;) {
- /*
- * Check the underlying caches for either a number of operations
- * or a number of bytes. It's more expensive to return values
- * from the cache (because we have to marshall/unmarshall them),
- * but there's no information yet on how to tune the values.
- *
- * For now, use 10MB as the limit, and a corresponding number of
- * operations, assuming roughly 40B per key/value pair.
- */
-#undef BYTELIMIT
-#define BYTELIMIT (10 * 1048576)
-#undef OPLIMIT
-#define OPLIMIT (BYTELIMIT / (2 * 20))
- for (ws = ks->ws_head; ws != NULL; ws = ws->next)
- if (ws->cleaner_ops > OPLIMIT ||
- ws->cleaner_bytes > BYTELIMIT)
- break;
-
+ for (cleaner_stop = delay = 0; !cleaner_stop;) {
/*
* Check if this will be the final run; cleaner_stop is declared
* volatile, and so the read will happen. We don't much care if
@@ -2882,16 +2792,45 @@ kvs_cleaner(void *arg)
* and finds the variable set. Store the read locally, reading
* the variable twice might race.
*/
- cleaner_stop = ks->cleaner_stop;
- if (ws == NULL && !cleaner_stop) {
- if (delay < 5) /* At least every 5 seconds. */
- ++delay;
+ cleaner_stop = hs->cleaner_stop;
+
+ /*
+ * Delay if this isn't the final run and the last pass didn't
+ * find any work to do.
+ */
+ if (!cleaner_stop && delay != 0) {
t.tv_sec = delay;
t.tv_usec = 0;
(void)select(0, NULL, NULL, NULL, &t);
- continue;
}
+ /* Run at least every 5 seconds. */
+ if (delay < 5)
+ ++delay;
+
+ /*
+ * Clean the datastore caches, depending on their size. It's
+ * both more and less expensive to return values from the cache:
+ * more because we have to marshall/unmarshall the values, less
+ * because there's only a single call, to the cache store rather
+ * one to the cache and one to the primary. I have no turning
+ * information, for now simply set the limit at 50MB.
+ */
+#undef CACHE_SIZE_TRIGGER
+#define CACHE_SIZE_TRIGGER (50 * 1048576)
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
+ if ((ret = he_stats(ws->he_cache, &stats)) != 0)
+ EMSG_ERR(wtext, NULL,
+ ret, "he_stats: %s", he_strerror(ret));
+ if (stats.size > CACHE_SIZE_TRIGGER)
+ break;
+ }
+ if (!cleaner_stop && ws == NULL)
+ continue;
+
+ /* There was work to do, don't delay before checking again. */
+ delay = 0;
+
/*
* Get the oldest transaction ID not yet visible to a running
* transaction. Do this before doing anything else, avoiding
@@ -2900,11 +2839,14 @@ kvs_cleaner(void *arg)
oldest = wtext->transaction_oldest(wtext);
/*
+ * If any cache needs cleaning, clean them all, because we have
+ * to know the minimum transaction ID referenced by any cache.
+ *
* For each cache/primary pair, migrate whatever records we can,
* tracking the lowest transaction ID of any entry in any cache.
*/
txnmin = UINT64_MAX;
- for (ws = ks->ws_head; ws != NULL; ws = ws->next) {
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
cursor->ws = ws;
if ((ret = cache_cleaner(
wtext, wtcursor, oldest, &txntmp)) != 0)
@@ -2923,11 +2865,8 @@ kvs_cleaner(void *arg)
* problem here.
*/
cursor->ws = NULL;
- if ((ret = txn_cleaner(wtcursor, ks->kvstxn, txnmin)) != 0)
+ if ((ret = txn_cleaner(wtcursor, hs->he_txn, txnmin)) != 0)
goto err;
-
- if (cleaner_stop)
- break;
}
err: cursor_destroy(cursor);
@@ -2935,152 +2874,219 @@ err: cursor_destroy(cursor);
}
/*
- * kvs_source_open --
- * Allocate and open a KVS source.
+ * helium_config_read --
+ * Parse the Helium configuration.
*/
static int
-kvs_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v)
+helium_config_read(WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *config,
+ char **devicep, HE_ENV *envp, int *env_setp, int *flagsp)
{
- struct kvs_config kvs_config;
- KVS_SOURCE *ks;
+ WT_CONFIG_ITEM k, v;
+ WT_CONFIG_SCAN *scan;
+ int ret = 0, tret;
+
+ *env_setp = 0;
+ *flagsp = 0;
+
+ /* Set up the scan of the configuration arguments list. */
+ if ((ret = wtext->config_scan_begin(
+ wtext, NULL, config->str, config->len, &scan)) != 0)
+ ERET(wtext, NULL, ret,
+ "WT_EXTENSION_API.config_scan_begin: %s",
+ wtext->strerror(ret));
+ while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) {
+ if (string_match("helium_devices", k.str, k.len)) {
+ if ((*devicep = calloc(1, v.len + 1)) == NULL)
+ return (os_errno());
+ memcpy(*devicep, v.str, v.len);
+ continue;
+ }
+ if (string_match("helium_env_read_cache_size", k.str, k.len)) {
+ envp->read_cache_size = (uint64_t)v.val;
+ *env_setp = 1;
+ continue;
+ }
+ if (string_match("helium_env_write_cache_size", k.str, k.len)) {
+ envp->write_cache_size = (uint64_t)v.val;
+ *env_setp = 1;
+ continue;
+ }
+ if (string_match("helium_o_volume_truncate", k.str, k.len)) {
+ if (v.val != 0)
+ *flagsp |= HE_O_VOLUME_TRUNCATE;
+ continue;
+ }
+ EMSG_ERR(wtext, NULL, EINVAL,
+ "unknown configuration key value pair %.*s=%.*s",
+ (int)k.len, k.str, (int)v.len, v.str);
+ }
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ if (ret != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_EXTENSION_API.config_scan_next: %s",
+ wtext->strerror(ret));
+
+err: if ((tret = wtext->config_scan_end(wtext, scan)) != 0)
+ EMSG(wtext, NULL, tret,
+ "WT_EXTENSION_API.config_scan_end: %s",
+ wtext->strerror(tret));
+
+ return (ret);
+}
+
+/*
+ * helium_source_open --
+ * Allocate and open a Helium source.
+ */
+static int
+helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v)
+{
+ struct he_env env;
+ HELIUM_SOURCE *hs;
WT_EXTENSION_API *wtext;
- int flags, ret = 0;
- char **device_list, **p;
+ int env_set, flags, ret = 0;
wtext = ds->wtext;
+ hs = NULL;
- ks = NULL;
- device_list = NULL;
+ VMSG(wtext, NULL, VERBOSE_L1, "volume %.*s=%.*s",
+ (int)k->len, k->str, (int)v->len, v->str);
- /* Check for a KVS source we've already opened. */
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if (STRING_MATCH(ks->name, k->str, k->len))
+ /*
+ * Check for a Helium source we've already opened: we don't check the
+ * value (which implies you can open the same underlying stores using
+ * more than one name, but I don't know of any problems that causes),
+ * we only check the key, that is, the top-level WiredTiger name.
+ */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if (string_match(hs->name, k->str, k->len))
ERET(wtext, NULL,
- EINVAL, "%s: device already open", ks->name);
+ EINVAL, "%s: device already open", hs->name);
- /* Allocate and initialize a new underlying KVS source object. */
- if ((ks = calloc(1, sizeof(*ks))) == NULL ||
- (ks->name = calloc(1, k->len + 1)) == NULL) {
- free(ks);
+ /* Allocate and initialize a new underlying Helium source object. */
+ if ((hs = calloc(1, sizeof(*hs))) == NULL ||
+ (hs->name = calloc(1, k->len + 1)) == NULL) {
+ free(hs);
return (os_errno());
}
- memcpy(ks->name, k->str, k->len);
-
- ks->txn_notify.notify = txn_notify;
- ks->wtext = wtext;
-
- /*
- * Read the configuration. We require a list of devices underlying the
- * KVS source, parse the device list found in the configuration string
- * into an array of paths.
- */
- if ((ret =
- kvs_config_read(wtext, v, &device_list, &kvs_config, &flags)) != 0)
+ memcpy(hs->name, k->str, k->len);
+ hs->txn_notify.notify = txn_notify;
+ hs->wtext = wtext;
+
+ /* Read the configuration, require a device naming the Helium store. */
+ memset(&env, 0, sizeof(env));
+ if ((ret = helium_config_read(
+ wtext, v, &hs->device, &env, &env_set, &flags)) != 0)
goto err;
- if (device_list == NULL || device_list[0] == NULL)
+ if (hs->device == NULL)
EMSG_ERR(wtext, NULL,
- EINVAL, "%s: no devices specified", ks->name);
+ EINVAL, "%s: no Helium volumes specified", hs->name);
- /* Open the underlying KVS store (creating it if necessary). */
- ks->kvs_device =
- kvs_open(device_list, &kvs_config, flags | KVS_O_CREATE);
- if (ks->kvs_device == NULL)
- EMSG_ERR(wtext, NULL, WT_ERROR,
- "kvs_open: %s: %s", ks->name, kvs_strerror(os_errno()));
+ /*
+ * Open the Helium volume, creating it if necessary. We have to open
+ * an object at the same time, that's why we have object flags as well
+ * as volume flags.
+ */
+ flags |= HE_O_CREATE |
+ HE_O_TRUNCATE | HE_O_VOLUME_CLEAN | HE_O_VOLUME_CREATE;
+ if ((hs->he_volume = he_open(
+ hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) {
+ ret = os_errno();
+ EMSG_ERR(wtext, NULL, ret,
+ "he_open: %s: %s: %s",
+ hs->name, WT_NAME_INIT, he_strerror(ret));
+ }
/* Insert the new entry at the head of the list. */
- ks->next = ds->kvs_head;
- ds->kvs_head = ks;
+ hs->next = ds->hs_head;
+ ds->hs_head = hs;
if (0) {
-err: if (ks != NULL)
- ESET(kvs_source_close(wtext, NULL, ks));
+err: if (hs != NULL)
+ ESET(helium_source_close(wtext, NULL, hs));
}
-
- if (device_list != NULL) {
- for (p = device_list; *p != NULL; ++p)
- free(*p);
- free(device_list);
- }
-
return (ret);
}
/*
- * kvs_source_open_txn --
+ * helium_source_open_txn --
* Open the database-wide transaction store.
*/
static int
-kvs_source_open_txn(DATA_SOURCE *ds)
+helium_source_open_txn(DATA_SOURCE *ds)
{
- KVS_SOURCE *ks, *kstxn;
+ HELIUM_SOURCE *hs, *hs_txn;
WT_EXTENSION_API *wtext;
- kvs_t kvstxn, t;
+ he_t he_txn, t;
int ret = 0;
wtext = ds->wtext;
/*
- * The global txn namespace is per connection, it spans multiple KVS
+ * The global txn namespace is per connection, it spans multiple Helium
* sources.
*
- * We've opened the KVS sources: check to see if any of them already
+ * We've opened the Helium sources: check to see if any of them already
* have a transaction store, and make sure we only find one.
*/
- kstxn = NULL;
- kvstxn = NULL;
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if ((t = kvs_open_namespace(
- ks->kvs_device, WT_NAME_TXN, 0)) != NULL) {
- if (kstxn != NULL) {
- (void)kvs_close(t);
- (void)kvs_close(kvstxn);
- ERET(wtext, NULL, WT_ERROR,
+ hs_txn = NULL;
+ he_txn = NULL;
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) {
+ if (hs_txn != NULL) {
+ (void)he_close(t);
+ (void)he_close(hs_txn);
+ ERET(wtext, NULL, WT_PANIC,
"found multiple transaction stores, "
"unable to proceed");
}
- kvstxn = t;
- kstxn = ks;
+ he_txn = t;
+ hs_txn = hs;
}
/*
* If we didn't find a transaction store, open a transaction store in
- * the first KVS source we loaded. (It could just as easily be the
- * last one we loaded, we're just picking one, but picking the first
+ * the first Helium source we loaded. (It could just as easily be
+ * the last one we loaded, we're just picking one, but picking the first
* seems slightly less likely to make people wonder.)
*/
- if ((ks = kstxn) == NULL) {
- for (ks = ds->kvs_head; ks->next != NULL; ks = ks->next)
+ if ((hs = hs_txn) == NULL) {
+ for (hs = ds->hs_head; hs->next != NULL; hs = hs->next)
;
- if ((kvstxn = kvs_open_namespace(
- ks->kvs_device, WT_NAME_TXN, KVS_O_CREATE)) == NULL)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_open_namespace: %s: %s",
- WT_NAME_TXN, kvs_strerror(os_errno()));
+ if ((he_txn = he_open(
+ hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) {
+ ret = os_errno();
+ ERET(wtext, NULL, ret,
+ "he_open: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(ret));
+ }
/* Push the change. */
- if ((ret = kvs_commit(ks->kvs_device)) != 0)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_commit: %s", kvs_strerror(ret));
+ if ((ret = he_commit(he_txn)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_commit: %s", he_strerror(ret));
}
+ VMSG(wtext, NULL, VERBOSE_L1, "%s" "transactional store on %s",
+ hs_txn == NULL ? "creating " : "", hs->name);
- /* Set the owner field, this KVS source has to be closed last. */
- ks->kvsowner = 1;
+ /* Set the owner field, this Helium source has to be closed last. */
+ hs->he_owner = 1;
- /* Add a reference to the open transaction store in each KVS source. */
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- ks->kvstxn = kvstxn;
+ /* Add a reference to the transaction store in each Helium source. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ hs->he_txn = he_txn;
return (0);
}
/*
- * kvs_source_recover_namespace --
- * Recover a single cache/primary pair in a KVS namespace.
+ * helium_source_recover_namespace --
+ * Recover a single cache/primary pair in a Helium namespace.
*/
static int
-kvs_source_recover_namespace(WT_DATA_SOURCE *wtds,
- KVS_SOURCE *ks, const char *name, WT_CONFIG_ARG *config)
+helium_source_recover_namespace(WT_DATA_SOURCE *wtds,
+ HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config)
{
CURSOR *cursor;
DATA_SOURCE *ds;
@@ -3099,17 +3105,17 @@ kvs_source_recover_namespace(WT_DATA_SOURCE *wtds,
uri = NULL;
/*
- * The name we store on the Memrata device is a translation of the
+ * The name we store on the Helium device is a translation of the
* WiredTiger name: do the reverse process here so we can use the
* standard source-open function.
*/
- p = name + (sizeof(WT_NAME_PREFIX) - 1);
- len = strlen("memrata:") + strlen(ks->name) + strlen(p) + 10;
+ p = name + strlen(WT_NAME_PREFIX);
+ len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10;
if ((uri = malloc(len)) == NULL) {
ret = os_errno();
goto err;
}
- (void)snprintf(uri, len, "memrata:%s/%s", ks->name, p);
+ (void)snprintf(uri, len, "helium:%s/%s", hs->name, p);
/*
* Open the cache/primary pair by going through the full open process,
@@ -3122,21 +3128,20 @@ kvs_source_recover_namespace(WT_DATA_SOURCE *wtds,
/* Fake up a cursor. */
if ((ret = fake_cursor(wtext, &wtcursor)) != 0)
- EMSG_ERR(wtext, NULL, ret,
- "kvs_source_recover_namespace: %s", strerror(ret));
+ EMSG_ERR(wtext, NULL, ret, "recovery: %s", strerror(ret));
cursor = (CURSOR *)wtcursor;
cursor->ws = ws;
/* Process, then clear, the cache. */
if ((ret = cache_cleaner(wtext, wtcursor, 0, NULL)) != 0)
goto err;
- if ((ret = kvs_truncate(ws->kvscache)) != 0)
- EMSG_ERR(wtext, NULL, WT_ERROR,
- "kvs_truncate: %s(cache): %s", ws->uri, kvs_strerror(ret));
+ if ((ret = he_truncate(ws->he_cache)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_truncate: %s(cache): %s", ws->uri, he_strerror(ret));
/* Close the underlying WiredTiger sources. */
-err: while ((ws = ks->ws_head) != NULL) {
- ks->ws_head = ws->next;
+err: while ((ws = hs->ws_head) != NULL) {
+ hs->ws_head = ws->next;
ESET(ws_source_close(wtext, NULL, ws));
}
@@ -3146,36 +3151,36 @@ err: while ((ws = ks->ws_head) != NULL) {
return (ret);
}
-struct kvs_namespace_cookie {
+struct helium_namespace_cookie {
char **list;
u_int list_cnt;
u_int list_max;
};
/*
- * kvs_namespace_list --
+ * helium_namespace_list --
* Get a list of the objects we're going to recover.
*/
static int
-kvs_namespace_list(void *cookie, const char *name)
+helium_namespace_list(void *cookie, const char *name)
{
- struct kvs_namespace_cookie *names;
- const char *p;
+ struct helium_namespace_cookie *names;
void *allocp;
names = cookie;
- /* Ignore any files without a WiredTiger prefix. */
- if (strncmp(name, WT_NAME_PREFIX, sizeof(WT_NAME_PREFIX) - 1) != 0)
+ /*
+ * Ignore any files without a WiredTiger prefix.
+ * Ignore the metadata and cache files.
+ */
+ if (!prefix_match(name, WT_NAME_PREFIX))
+ return (0);
+ if (strcmp(name, WT_NAME_INIT) == 0)
return (0);
-
- /* Ignore the transaction store. */
if (strcmp(name, WT_NAME_TXN) == 0)
return (0);
-
- /* Ignore the "cache" files. */
- p = name + (sizeof(WT_NAME_PREFIX) - 1);
- if ((p = strchr(p, '.')) != NULL && strcmp(p, WT_NAME_CACHE) == 0)
+ if (string_match(
+ strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE)))
return (0);
if (names->list_cnt + 1 >= names->list_max) {
@@ -3193,13 +3198,14 @@ kvs_namespace_list(void *cookie, const char *name)
}
/*
- * kvs_source_recover --
- * Recover the KVS source.
+ * helium_source_recover --
+ * Recover the HELIUM_SOURCE.
*/
static int
-kvs_source_recover(WT_DATA_SOURCE *wtds, KVS_SOURCE *ks, WT_CONFIG_ARG *config)
+helium_source_recover(
+ WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config)
{
- struct kvs_namespace_cookie names;
+ struct helium_namespace_cookie names;
DATA_SOURCE *ds;
WT_EXTENSION_API *wtext;
u_int i;
@@ -3207,25 +3213,27 @@ kvs_source_recover(WT_DATA_SOURCE *wtds, KVS_SOURCE *ks, WT_CONFIG_ARG *config)
ds = (DATA_SOURCE *)wtds;
wtext = ds->wtext;
-
memset(&names, 0, sizeof(names));
- /* Get a list of the cache/primary object pairs in the KVS source. */
- if ((ret = kvs_namespaces(
- ks->kvs_device, kvs_namespace_list, &names)) != 0)
- ERET(wtext, NULL, WT_ERROR,
- "kvs_namespaces: %s: %s", ks->name, kvs_strerror(ret));
+ VMSG(wtext, NULL, VERBOSE_L1, "recover %s", hs->name);
+
+ /* Get a list of the cache/primary object pairs in the Helium source. */
+ if ((ret = he_enumerate(
+ hs->device, helium_namespace_list, &names)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_enumerate: %s: %s", hs->name, he_strerror(ret));
/* Recover the objects. */
for (i = 0; i < names.list_cnt; ++i)
- if ((ret = kvs_source_recover_namespace(
- wtds, ks, names.list[i], config)) != 0)
+ if ((ret = helium_source_recover_namespace(
+ wtds, hs, names.list[i], config)) != 0)
goto err;
/* Clear the transaction store. */
- if ((ret = kvs_truncate(ks->kvstxn)) != 0)
- EMSG_ERR(wtext, NULL, WT_ERROR,
- "kvs_truncate: %s: %s", WT_NAME_TXN, kvs_strerror(ret));
+ if ((ret = he_truncate(hs->he_txn)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_truncate: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(ret));
err: for (i = 0; i < names.list_cnt; ++i)
free(names.list[i]);
@@ -3235,14 +3243,14 @@ err: for (i = 0; i < names.list_cnt; ++i)
}
/*
- * kvs_terminate --
+ * helium_terminate --
* Unload the data-source.
*/
static int
-kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
+helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
{
DATA_SOURCE *ds;
- KVS_SOURCE *ks, *last;
+ HELIUM_SOURCE *hs, *last;
WT_EXTENSION_API *wtext;
int ret = 0;
@@ -3254,20 +3262,20 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
ret = writelock(wtext, session, &ds->global_lock);
/*
- * Close the KVS sources, close the KVS source that "owns" the
+ * Close the Helium sources, close the Helium source that "owns" the
* database transaction store last.
*/
last = NULL;
- while ((ks = ds->kvs_head) != NULL) {
- ds->kvs_head = ks->next;
- if (ks->kvsowner) {
- last = ks;
+ while ((hs = ds->hs_head) != NULL) {
+ ds->hs_head = hs->next;
+ if (hs->he_owner) {
+ last = hs;
continue;
}
- ESET(kvs_source_close(wtext, session, ks));
+ ESET(helium_source_close(wtext, session, hs));
}
if (last != NULL)
- ESET(kvs_source_close(wtext, session, last));
+ ESET(helium_source_close(wtext, session, last));
/* Unlock and destroy the system. */
if (ds->lockinit) {
@@ -3282,7 +3290,7 @@ kvs_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
/*
* wiredtiger_extension_init --
- * Initialize the KVS connector code.
+ * Initialize the Helium connector code.
*/
int
wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
@@ -3291,44 +3299,47 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
* List of the WT_DATA_SOURCE methods -- it's static so it breaks at
* compile-time should the structure change underneath us.
*/
- static WT_DATA_SOURCE wtds = {
- kvs_session_create, /* session.create */
+ static const WT_DATA_SOURCE wtds = {
+ helium_session_create, /* session.create */
NULL, /* No session.compaction */
- kvs_session_drop, /* session.drop */
- kvs_session_open_cursor, /* session.open_cursor */
- kvs_session_rename, /* session.rename */
+ helium_session_drop, /* session.drop */
+ helium_session_open_cursor, /* session.open_cursor */
+ helium_session_rename, /* session.rename */
NULL, /* No session.salvage */
- kvs_session_truncate, /* session.truncate */
+ helium_session_truncate, /* session.truncate */
NULL, /* No session.range_truncate */
- kvs_session_verify, /* session.verify */
- kvs_session_checkpoint, /* session.checkpoint */
- kvs_terminate /* termination */
+ helium_session_verify, /* session.verify */
+ helium_session_checkpoint, /* session.checkpoint */
+ helium_terminate /* termination */
};
static const char *session_create_opts[] = {
- "kvs_open_o_truncate=0",
- "kvs_open_o_debug=0",
+ "helium_o_compress=0", /* HE_I_COMPRESS */
+ "helium_o_truncate=0", /* HE_O_TRUNCATE */
NULL
};
DATA_SOURCE *ds;
- KVS_SOURCE *ks;
+ HELIUM_SOURCE *hs;
WT_CONFIG_ITEM k, v;
WT_CONFIG_SCAN *scan;
WT_EXTENSION_API *wtext;
- int ret = 0;
+ int vmajor, vminor, ret = 0;
const char **p;
- (void)config; /* Unused parameters */
-
ds = NULL;
- /* Acquire the extension API */
+
wtext = connection->get_extension_api(connection);
/* Check the library version */
-#if KVS_VERSION_MAJOR != 4 || KVS_VERSION_MINOR != 13
+#if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 2
ERET(wtext, NULL, EINVAL,
- "unsupported KVS library version %d.%d, expected version 4.13",
- KVS_VERSION_MAJOR, KVS_VERSION_MINOR);
+ "unsupported Levyx/Helium header file %d.%d, expected version 2.2",
+ HE_VERSION_MAJOR, HE_VERSION_MINOR);
#endif
+ he_version(&vmajor, &vminor);
+ if (vmajor != 2 || vminor != 2)
+ ERET(wtext, NULL, EINVAL,
+ "unsupported Levyx/Helium library version %d.%d, expected "
+ "version 2.2", vmajor, vminor);
/* Allocate and initialize the local data-source structure. */
if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL)
@@ -3345,15 +3356,20 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
"WT_EXTENSION_API.config_get: config: %s",
wtext->strerror(ret));
- /* Step through the list of KVS sources, opening each one. */
+ /* Step through the list of Helium sources, opening each one. */
if ((ret =
wtext->config_scan_begin(wtext, NULL, v.str, v.len, &scan)) != 0)
EMSG_ERR(wtext, NULL, ret,
"WT_EXTENSION_API.config_scan_begin: config: %s",
wtext->strerror(ret));
- while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0)
- if ((ret = kvs_source_open(ds, &k, &v)) != 0)
+ while ((ret = wtext->config_scan_next(wtext, scan, &k, &v)) == 0) {
+ if (string_match("helium_verbose", k.str, k.len)) {
+ verbose = v.val == 0 ? 0 : 1;
+ continue;
+ }
+ if ((ret = helium_source_open(ds, &k, &v)) != 0)
goto err;
+ }
if (ret != WT_NOTFOUND)
EMSG_ERR(wtext, NULL, ret,
"WT_EXTENSION_API.config_scan_next: config: %s",
@@ -3364,26 +3380,26 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
wtext->strerror(ret));
/* Find and open the database transaction store. */
- if ((ret = kvs_source_open_txn(ds)) != 0)
+ if ((ret = helium_source_open_txn(ds)) != 0)
return (ret);
- /* Recover each KVS source. */
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
- if ((ret = kvs_source_recover(&ds->wtds, ks, config)) != 0)
+ /* Recover each Helium source. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if ((ret = helium_source_recover(&ds->wtds, hs, config)) != 0)
goto err;
- /* Start each KVS source cleaner thread. */
- for (ks = ds->kvs_head; ks != NULL; ks = ks->next)
+ /* Start each Helium source cleaner thread. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
if ((ret = pthread_create(
- &ks->cleaner_id, NULL, kvs_cleaner, ks)) != 0)
+ &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0)
EMSG_ERR(wtext, NULL, ret,
"%s: pthread_create: cleaner thread: %s",
- ks->name, strerror(ret));
+ hs->name, strerror(ret));
- /* Add KVS-specific configuration options. */
+ /* Add Helium-specific WT_SESSION.create configuration options. */
for (p = session_create_opts; *p != NULL; ++p)
if ((ret = connection->configure_method(connection,
- "session.create", "memrata:", *p, "boolean", NULL)) != 0)
+ "session.create", "helium:", *p, "boolean", NULL)) != 0)
EMSG_ERR(wtext, NULL, ret,
"WT_CONNECTION.configure_method: session.create: "
"%s: %s",
@@ -3391,19 +3407,19 @@ wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
/* Add the data source */
if ((ret = connection->add_data_source(
- connection, "memrata:", (WT_DATA_SOURCE *)ds, NULL)) != 0)
+ connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0)
EMSG_ERR(wtext, NULL, ret,
"WT_CONNECTION.add_data_source: %s", wtext->strerror(ret));
return (0);
err: if (ds != NULL)
- ESET(kvs_terminate((WT_DATA_SOURCE *)ds, NULL));
+ ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL));
return (ret);
}
/*
* wiredtiger_extension_terminate --
- * Shutdown the KVS connector code.
+ * Shutdown the Helium connector code.
*/
int
wiredtiger_extension_terminate(WT_CONNECTION *connection)
diff --git a/ext/test/memrata/Makefile.am b/ext/test/memrata/Makefile.am
deleted file mode 100644
index 1962680d3fe..00000000000
--- a/ext/test/memrata/Makefile.am
+++ /dev/null
@@ -1,12 +0,0 @@
-AM_CPPFLAGS = -I$(top_builddir) \
- -I$(top_srcdir)/src/include -I$(MEMRATA_PATH)
-
-noinst_LTLIBRARIES = libwiredtiger_memrata.la
-libwiredtiger_memrata_la_SOURCES = memrata.c
-libwiredtiger_memrata_la_LIBADD = \
- -L$(MEMRATA_PATH) -lkvs -L$(BERKELEY_DB_PATH)/lib -ldb
-
-# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well
-# as installation, it will only build static libraries. As far as I can tell,
-# the "approved" libtool way to turn them back on is by adding -rpath.
-libwiredtiger_memrata_la_LDFLAGS = -avoid-version -module -rpath /nowhere