summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/ext')
-rw-r--r--src/third_party/wiredtiger/ext/collators/reverse/Makefile.am10
-rw-r--r--src/third_party/wiredtiger/ext/collators/reverse/reverse_collator.c74
-rw-r--r--src/third_party/wiredtiger/ext/compressors/bzip2/Makefile.am6
-rw-r--r--src/third_party/wiredtiger/ext/compressors/bzip2/bzip2_compress.c407
-rw-r--r--src/third_party/wiredtiger/ext/compressors/nop/Makefile.am9
-rw-r--r--src/third_party/wiredtiger/ext/compressors/nop/nop_compress.c187
-rw-r--r--src/third_party/wiredtiger/ext/compressors/snappy/Makefile.am10
-rw-r--r--src/third_party/wiredtiger/ext/compressors/snappy/snappy_compress.c244
-rw-r--r--src/third_party/wiredtiger/ext/compressors/zlib/Makefile.am10
-rw-r--r--src/third_party/wiredtiger/ext/compressors/zlib/zlib_compress.c426
-rw-r--r--src/third_party/wiredtiger/ext/datasources/helium/Makefile.am11
-rw-r--r--src/third_party/wiredtiger/ext/datasources/helium/README125
-rw-r--r--src/third_party/wiredtiger/ext/datasources/helium/helium.c3449
13 files changed, 4968 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/ext/collators/reverse/Makefile.am b/src/third_party/wiredtiger/ext/collators/reverse/Makefile.am
new file mode 100644
index 00000000000..5cfde94d847
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/collators/reverse/Makefile.am
@@ -0,0 +1,10 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+noinst_LTLIBRARIES = libwiredtiger_reverse_collator.la
+libwiredtiger_reverse_collator_la_SOURCES = reverse_collator.c
+
+# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well
+# as installation, it will only build static libraries. As far as I can tell,
+# the "approved" libtool way to turn them back on is by adding -rpath.
+libwiredtiger_reverse_collator_la_LDFLAGS = \
+ -avoid-version -module -rpath /nowhere
diff --git a/src/third_party/wiredtiger/ext/collators/reverse/reverse_collator.c b/src/third_party/wiredtiger/ext/collators/reverse/reverse_collator.c
new file mode 100644
index 00000000000..0ccebba7919
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/collators/reverse/reverse_collator.c
@@ -0,0 +1,74 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <string.h>
+
+#include <wiredtiger_ext.h>
+
+/*
+ * collate_reverse --
+ * WiredTiger reverse collation.
+ */
+static int
+collate_reverse(WT_COLLATOR *collator,
+ WT_SESSION *session, const WT_ITEM *k1, const WT_ITEM *k2, int *ret)
+{
+ size_t len;
+ int cmp;
+
+ (void)collator; /* Unused */
+ (void)session;
+
+ len = (k1->size < k2->size) ? k1->size : k2->size;
+ cmp = memcmp(k1->data, k2->data, len);
+ if (cmp < 0)
+ *ret = 1;
+ else if (cmp > 0)
+ *ret = -1;
+ else if (k1->size < k2->size)
+ *ret = 1;
+ else if (k1->size > k2->size)
+ *ret = -1;
+ else
+ *ret = 0;
+ return (0);
+}
+
+static WT_COLLATOR reverse_collator = { collate_reverse, NULL, NULL };
+
+/*
+ * wiredtiger_extension_init --
+ * WiredTiger reverse collation extension.
+ */
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ (void)config; /* Unused parameters */
+
+ return (connection->add_collator(
+ connection, "reverse", &reverse_collator, NULL));
+}
diff --git a/src/third_party/wiredtiger/ext/compressors/bzip2/Makefile.am b/src/third_party/wiredtiger/ext/compressors/bzip2/Makefile.am
new file mode 100644
index 00000000000..0aedc2efd80
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/bzip2/Makefile.am
@@ -0,0 +1,6 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+lib_LTLIBRARIES = libwiredtiger_bzip2.la
+libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c
+libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module
+libwiredtiger_bzip2_la_LIBADD = -lbz2
diff --git a/src/third_party/wiredtiger/ext/compressors/bzip2/bzip2_compress.c b/src/third_party/wiredtiger/ext/compressors/bzip2/bzip2_compress.c
new file mode 100644
index 00000000000..cd73b237387
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/bzip2/bzip2_compress.c
@@ -0,0 +1,407 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <bzlib.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+/* Local compressor structure. */
+typedef struct {
+ WT_COMPRESSOR compressor; /* Must come first */
+
+ WT_EXTENSION_API *wt_api; /* Extension API */
+
+ int bz_verbosity; /* Configuration */
+ int bz_blocksize100k;
+ int bz_workfactor;
+ int bz_small;
+} BZIP_COMPRESSOR;
+
+/*
+ * Bzip gives us a cookie to pass to the underlying allocation functions; we
+ * we need two handles, package them up.
+ */
+typedef struct {
+ WT_COMPRESSOR *compressor;
+ WT_SESSION *session;
+} BZIP_OPAQUE;
+
+/*
+ * bzip2_error --
+ * Output an error message, and return a standard error code.
+ */
+static int
+bzip2_error(
+ WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int bzret)
+{
+ WT_EXTENSION_API *wt_api;
+ const char *msg;
+
+ wt_api = ((BZIP_COMPRESSOR *)compressor)->wt_api;
+
+ switch (bzret) {
+ case BZ_MEM_ERROR:
+ msg = "BZ_MEM_ERROR";
+ break;
+ case BZ_OUTBUFF_FULL:
+ msg = "BZ_OUTBUFF_FULL";
+ break;
+ case BZ_SEQUENCE_ERROR:
+ msg = "BZ_SEQUENCE_ERROR";
+ break;
+ case BZ_PARAM_ERROR:
+ msg = "BZ_PARAM_ERROR";
+ break;
+ case BZ_DATA_ERROR:
+ msg = "BZ_DATA_ERROR";
+ break;
+ case BZ_DATA_ERROR_MAGIC:
+ msg = "BZ_DATA_ERROR_MAGIC";
+ break;
+ case BZ_IO_ERROR:
+ msg = "BZ_IO_ERROR";
+ break;
+ case BZ_UNEXPECTED_EOF:
+ msg = "BZ_UNEXPECTED_EOF";
+ break;
+ case BZ_CONFIG_ERROR:
+ msg = "BZ_CONFIG_ERROR";
+ break;
+ default:
+ msg = "unknown error";
+ break;
+ }
+
+ (void)wt_api->err_printf(wt_api, session,
+ "bzip2 error: %s: %s: %d", call, msg, bzret);
+ return (WT_ERROR);
+}
+
+/*
+ * bzalloc --
+ * Allocate scratch buffers.
+ */
+static void *
+bzalloc(void *cookie, int number, int size)
+{
+ BZIP_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((BZIP_COMPRESSOR *)opaque->compressor)->wt_api;
+ return (wt_api->scr_alloc(
+ wt_api, opaque->session, (size_t)(number * size)));
+}
+
+/*
+ * bzfree --
+ * Free scratch buffers.
+ */
+static void
+bzfree(void *cookie, void *p)
+{
+ BZIP_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((BZIP_COMPRESSOR *)opaque->compressor)->wt_api;
+ wt_api->scr_free(wt_api, opaque->session, p);
+}
+
+/*
+ * bzip2_compress --
+ * WiredTiger bzip2 compression.
+ */
+static int
+bzip2_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp, int *compression_failed)
+{
+ BZIP_COMPRESSOR *bzip_compressor;
+ BZIP_OPAQUE opaque;
+ bz_stream bz;
+ int ret;
+
+ bzip_compressor = (BZIP_COMPRESSOR *)compressor;
+
+ memset(&bz, 0, sizeof(bz));
+ bz.bzalloc = bzalloc;
+ bz.bzfree = bzfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ bz.opaque = &opaque;
+
+ if ((ret = BZ2_bzCompressInit(&bz,
+ bzip_compressor->bz_blocksize100k,
+ bzip_compressor->bz_verbosity,
+ bzip_compressor->bz_workfactor)) != BZ_OK)
+ return (bzip2_error(
+ compressor, session, "BZ2_bzCompressInit", ret));
+
+ bz.next_in = (char *)src;
+ bz.avail_in = (uint32_t)src_len;
+ bz.next_out = (char *)dst;
+ bz.avail_out = (uint32_t)dst_len;
+ if ((ret = BZ2_bzCompress(&bz, BZ_FINISH)) == BZ_STREAM_END) {
+ *compression_failed = 0;
+ *result_lenp = dst_len - bz.avail_out;
+ } else
+ *compression_failed = 1;
+
+ if ((ret = BZ2_bzCompressEnd(&bz)) != BZ_OK)
+ return (
+ bzip2_error(compressor, session, "BZ2_bzCompressEnd", ret));
+
+ return (0);
+}
+
+/*
+ * __bzip2_compress_raw_random --
+ * Return a 32-bit pseudo-random number.
+ *
+ * This is an implementation of George Marsaglia's multiply-with-carry pseudo-
+ * random number generator. Computationally fast, with reasonable randomness
+ * properties.
+ */
+static uint32_t
+__bzip2_compress_raw_random(void)
+{
+ static uint32_t m_w = 521288629;
+ static uint32_t m_z = 362436069;
+
+ m_z = 36969 * (m_z & 65535) + (m_z >> 16);
+ m_w = 18000 * (m_w & 65535) + (m_w >> 16);
+ return (m_z << 16) + (m_w & 65535);
+}
+
+/*
+ * bzip2_compress_raw --
+ * Test function for the test/format utility.
+ */
+static int
+bzip2_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ size_t page_max, int split_pct, size_t extra,
+ uint8_t *src, uint32_t *offsets, uint32_t slots,
+ uint8_t *dst, size_t dst_len, int final,
+ size_t *result_lenp, uint32_t *result_slotsp)
+{
+ uint32_t take, twenty_pct;
+ int compression_failed, ret;
+
+ (void)page_max; /* Unused parameters */
+ (void)split_pct;
+ (void)extra;
+ (void)final;
+
+ /*
+ * This function is used by the test/format utility to test the
+ * WT_COMPRESSOR::compress_raw functionality.
+ *
+ * I'm trying to mimic how a real application is likely to behave: if
+ * it's a small number of slots, we're not going to take them because
+ * they aren't worth compressing. In all likelihood, that's going to
+ * be because the btree is wrapping up a page, but that's OK, that is
+ * going to happen a lot. In addition, add a 2% chance of not taking
+ * anything at all just because we don't want to take it. Otherwise,
+ * select between 80 and 100% of the slots and compress them, stepping
+ * down by 5 slots at a time until something works.
+ */
+ take = slots;
+ if (take < 10 || __bzip2_compress_raw_random() % 100 < 2)
+ take = 0;
+ else {
+ twenty_pct = (slots / 10) * 2;
+ if (twenty_pct < slots)
+ take -= __bzip2_compress_raw_random() % twenty_pct;
+
+ for (;;) {
+ if ((ret = bzip2_compress(compressor, session,
+ src, offsets[take],
+ dst, dst_len,
+ result_lenp, &compression_failed)) != 0)
+ return (ret);
+ if (!compression_failed)
+ break;
+ if (take < 10) {
+ take = 0;
+ break;
+ }
+ take -= 5;
+ }
+ }
+
+ *result_slotsp = take;
+ if (take == 0)
+ *result_lenp = 0;
+
+#if 0
+ fprintf(stderr,
+ "bzip2_compress_raw (%s): page_max %" PRIuMAX
+ ", split_pct %u, extra %" PRIuMAX
+ ", slots %" PRIu32 ", take %" PRIu32 ": %" PRIu32 " -> %"
+ PRIuMAX "\n",
+ final ? "final" : "not final",
+ (uintmax_t)page_max, split_pct, (uintmax_t)extra,
+ slots, take, offsets[take], (uintmax_t)*result_lenp);
+#endif
+ return (take == 0 ? EAGAIN : 0);
+}
+
+/*
+ * bzip2_decompress --
+ * WiredTiger bzip2 decompression.
+ */
+static int
+bzip2_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp)
+{
+ BZIP_COMPRESSOR *bzip_compressor;
+ BZIP_OPAQUE opaque;
+ bz_stream bz;
+ int ret, tret;
+
+ bzip_compressor = (BZIP_COMPRESSOR *)compressor;
+
+ memset(&bz, 0, sizeof(bz));
+ bz.bzalloc = bzalloc;
+ bz.bzfree = bzfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ bz.opaque = &opaque;
+
+ if ((ret = BZ2_bzDecompressInit(&bz,
+ bzip_compressor->bz_small, bzip_compressor->bz_verbosity)) != BZ_OK)
+ return (bzip2_error(
+ compressor, session, "BZ2_bzDecompressInit", ret));
+
+ bz.next_in = (char *)src;
+ bz.avail_in = (uint32_t)src_len;
+ bz.next_out = (char *)dst;
+ bz.avail_out = (uint32_t)dst_len;
+ if ((ret = BZ2_bzDecompress(&bz)) == BZ_STREAM_END) {
+ *result_lenp = dst_len - bz.avail_out;
+ ret = 0;
+ } else
+ (void)bzip2_error(compressor, session, "BZ2_bzDecompress", ret);
+
+ if ((tret = BZ2_bzDecompressEnd(&bz)) != BZ_OK)
+ return (bzip2_error(
+ compressor, session, "BZ2_bzDecompressEnd", tret));
+
+ return (ret == 0 ?
+ 0 : bzip2_error(compressor, session, "BZ2_bzDecompressEnd", ret));
+}
+
+/*
+ * bzip2_terminate --
+ * WiredTiger bzip2 compression termination.
+ */
+static int
+bzip2_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
+{
+ (void)session; /* Unused parameters */
+
+ free(compressor);
+ return (0);
+}
+
+/*
+ * bzip2_add_compressor --
+ * Add a bzip2 compressor.
+ */
+static int
+bzip2_add_compressor(WT_CONNECTION *connection, int raw, const char *name)
+{
+ BZIP_COMPRESSOR *bzip_compressor;
+
+ /*
+ * There are two almost identical bzip2 compressors: one supporting raw
+ * compression (used by test/format to test raw compression), the other
+ * without raw compression, that might be useful for real applications.
+ */
+ if ((bzip_compressor = calloc(1, sizeof(BZIP_COMPRESSOR))) == NULL)
+ return (errno);
+
+ bzip_compressor->compressor.compress = bzip2_compress;
+ bzip_compressor->
+ compressor.compress_raw = raw ? bzip2_compress_raw : NULL;
+ bzip_compressor->compressor.decompress = bzip2_decompress;
+ bzip_compressor->compressor.pre_size = NULL;
+ bzip_compressor->compressor.terminate = bzip2_terminate;
+
+ bzip_compressor->wt_api = connection->get_extension_api(connection);
+
+ /* between 0-4: set the amount of verbosity to stderr */
+ bzip_compressor->bz_verbosity = 0;
+
+ /*
+ * between 1-9: set the block size to 100k x this number (compression
+ * only)
+ */
+ bzip_compressor->bz_blocksize100k = 1;
+
+ /*
+ * between 0-250: workFactor: see bzip2 manual. 0 is a reasonable
+ * default (compression only)
+ */
+ bzip_compressor->bz_workfactor = 0;
+
+ /*
+ * if nonzero, decompress using less memory, but slower (decompression
+ * only)
+ */
+ bzip_compressor->bz_small = 0;
+
+ return (connection->add_compressor( /* Load the compressor */
+ connection, name, (WT_COMPRESSOR *)bzip_compressor, NULL));
+}
+
+/*
+ * wiredtiger_extension_init --
+ * WiredTiger bzip2 compression extension.
+ */
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ int ret;
+
+ (void)config; /* Unused parameters */
+
+ if ((ret = bzip2_add_compressor(connection, 0, "bzip2")) != 0)
+ return (ret);
+ if ((ret = bzip2_add_compressor(connection, 1, "bzip2-raw-test")) != 0)
+ return (ret);
+ return (0);
+}
diff --git a/src/third_party/wiredtiger/ext/compressors/nop/Makefile.am b/src/third_party/wiredtiger/ext/compressors/nop/Makefile.am
new file mode 100644
index 00000000000..87dbf18cb22
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/nop/Makefile.am
@@ -0,0 +1,9 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+noinst_LTLIBRARIES = libwiredtiger_nop.la
+libwiredtiger_nop_la_SOURCES = nop_compress.c
+
+# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well
+# as installation, it will only build static libraries. As far as I can tell,
+# the "approved" libtool way to turn them back on is by adding -rpath.
+libwiredtiger_nop_la_LDFLAGS = -avoid-version -module -rpath /nowhere
diff --git a/src/third_party/wiredtiger/ext/compressors/nop/nop_compress.c b/src/third_party/wiredtiger/ext/compressors/nop/nop_compress.c
new file mode 100644
index 00000000000..e536c8fefd8
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/nop/nop_compress.c
@@ -0,0 +1,187 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+/*! [WT_COMPRESSOR initialization structure] */
+/* Local compressor structure. */
+typedef struct {
+ WT_COMPRESSOR compressor; /* Must come first */
+
+ WT_EXTENSION_API *wt_api; /* Extension API */
+
+ unsigned long nop_calls; /* Count of calls */
+
+} NOP_COMPRESSOR;
+/*! [WT_COMPRESSOR initialization structure] */
+
+/*! [WT_COMPRESSOR compress] */
+/*
+ * nop_compress --
+ * A simple compression example that passes data through unchanged.
+ */
+static int
+nop_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp, int *compression_failed)
+{
+ NOP_COMPRESSOR *nop_compressor = (NOP_COMPRESSOR *)compressor;
+
+ (void)session; /* Unused parameters */
+
+ ++nop_compressor->nop_calls; /* Call count */
+
+ *compression_failed = 0;
+ if (dst_len < src_len) {
+ *compression_failed = 1;
+ return (0);
+ }
+
+ memcpy(dst, src, src_len);
+ *result_lenp = src_len;
+
+ return (0);
+}
+/*! [WT_COMPRESSOR compress] */
+
+/*! [WT_COMPRESSOR decompress] */
+/*
+ * nop_decompress --
+ * A simple decompression example that passes data through unchanged.
+ */
+static int
+nop_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp)
+{
+ NOP_COMPRESSOR *nop_compressor = (NOP_COMPRESSOR *)compressor;
+
+ (void)session; /* Unused parameters */
+ (void)src_len;
+
+ ++nop_compressor->nop_calls; /* Call count */
+
+ /*
+ * The destination length is the number of uncompressed bytes we're
+ * expected to return.
+ */
+ memcpy(dst, src, dst_len);
+ *result_lenp = dst_len;
+ return (0);
+}
+/*! [WT_COMPRESSOR decompress] */
+
+/*! [WT_COMPRESSOR presize] */
+/*
+ * nop_pre_size --
+ * A simple pre-size example that returns the source length.
+ */
+static int
+nop_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ size_t *result_lenp)
+{
+ NOP_COMPRESSOR *nop_compressor = (NOP_COMPRESSOR *)compressor;
+
+ (void)session; /* Unused parameters */
+ (void)src;
+
+ ++nop_compressor->nop_calls; /* Call count */
+
+ *result_lenp = src_len;
+ return (0);
+}
+/*! [WT_COMPRESSOR presize] */
+
+/*! [WT_COMPRESSOR terminate] */
+/*
+ * nop_terminate --
+ * WiredTiger no-op compression termination.
+ */
+static int
+nop_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
+{
+ NOP_COMPRESSOR *nop_compressor = (NOP_COMPRESSOR *)compressor;
+
+ (void)session; /* Unused parameters */
+
+ ++nop_compressor->nop_calls; /* Call count */
+
+ /* Free the allocated memory. */
+ free(compressor);
+
+ return (0);
+}
+/*! [WT_COMPRESSOR terminate] */
+
+/*! [WT_COMPRESSOR initialization function] */
+/*
+ * wiredtiger_extension_init --
+ * A simple shared library compression example.
+ */
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ NOP_COMPRESSOR *nop_compressor;
+
+ (void)config; /* Unused parameters */
+
+ if ((nop_compressor = calloc(1, sizeof(NOP_COMPRESSOR))) == NULL)
+ return (errno);
+
+ /*
+ * Allocate a local compressor structure, with a WT_COMPRESSOR structure
+ * as the first field, allowing us to treat references to either type of
+ * structure as a reference to the other type.
+ *
+ * This could be simplified if only a single database is opened in the
+ * application, we could use a static WT_COMPRESSOR structure, and a
+ * static reference to the WT_EXTENSION_API methods, then we don't need
+ * to allocate memory when the compressor is initialized or free it when
+ * the compressor is terminated. However, this approach is more general
+ * purpose and supports multiple databases per application.
+ */
+ nop_compressor->compressor.compress = nop_compress;
+ nop_compressor->compressor.compress_raw = NULL;
+ nop_compressor->compressor.decompress = nop_decompress;
+ nop_compressor->compressor.pre_size = nop_pre_size;
+ nop_compressor->compressor.terminate = nop_terminate;
+
+ nop_compressor->wt_api = connection->get_extension_api(connection);
+
+ /* Load the compressor */
+ return (connection->add_compressor(
+ connection, "nop", (WT_COMPRESSOR *)nop_compressor, NULL));
+}
+/*! [WT_COMPRESSOR initialization function] */
diff --git a/src/third_party/wiredtiger/ext/compressors/snappy/Makefile.am b/src/third_party/wiredtiger/ext/compressors/snappy/Makefile.am
new file mode 100644
index 00000000000..78317234ba0
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/snappy/Makefile.am
@@ -0,0 +1,10 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+if HAVE_BUILTIN_EXTENSION_SNAPPY
+noinst_LTLIBRARIES = libwiredtiger_snappy.la
+else
+lib_LTLIBRARIES = libwiredtiger_snappy.la
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+endif
+libwiredtiger_snappy_la_SOURCES = snappy_compress.c
+libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/third_party/wiredtiger/ext/compressors/snappy/snappy_compress.c b/src/third_party/wiredtiger/ext/compressors/snappy/snappy_compress.c
new file mode 100644
index 00000000000..7ed759e6807
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/snappy/snappy_compress.c
@@ -0,0 +1,244 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <snappy-c.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+/*
+ * We need to include the configuration file to detect whether this extension
+ * is being built into the WiredTiger library.
+ */
+#include "wiredtiger_config.h"
+#ifdef _MSC_VER
+#define inline __inline
+#endif
+
+/* Local compressor structure. */
+typedef struct {
+ WT_COMPRESSOR compressor; /* Must come first */
+
+ WT_EXTENSION_API *wt_api; /* Extension API */
+} SNAPPY_COMPRESSOR;
+
+/*
+ * wt_snappy_error --
+ * Output an error message, and return a standard error code.
+ */
+static int
+wt_snappy_error(WT_COMPRESSOR *compressor,
+ WT_SESSION *session, const char *call, snappy_status snret)
+{
+ WT_EXTENSION_API *wt_api;
+ const char *msg;
+
+ wt_api = ((SNAPPY_COMPRESSOR *)compressor)->wt_api;
+
+ switch (snret) {
+ case SNAPPY_BUFFER_TOO_SMALL:
+ msg = "SNAPPY_BUFFER_TOO_SMALL";
+ break;
+ case SNAPPY_INVALID_INPUT:
+ msg = "SNAPPY_INVALID_INPUT";
+ break;
+ default:
+ msg = "unknown error";
+ break;
+ }
+
+ (void)wt_api->err_printf(wt_api,
+ session, "snappy error: %s: %s: %d", call, msg, snret);
+ return (WT_ERROR);
+}
+
+/*
+ * wt_snappy_compress --
+ * WiredTiger snappy compression.
+ */
+static int
+wt_snappy_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp, int *compression_failed)
+{
+ snappy_status snret;
+ size_t snaplen;
+ char *snapbuf;
+
+ /*
+ * dst_len was computed in wt_snappy_pre_size, so we know it's big
+ * enough. Skip past the space we'll use to store the final count
+ * of compressed bytes.
+ */
+ snaplen = dst_len - sizeof(size_t);
+ snapbuf = (char *)dst + sizeof(size_t);
+
+ /* snaplen is an input and an output arg. */
+ snret = snappy_compress((char *)src, src_len, snapbuf, &snaplen);
+
+ if (snret == SNAPPY_OK) {
+ /*
+ * On decompression, snappy requires the exact compressed byte
+ * count (the current value of snaplen). WiredTiger does not
+ * preserve that value, so save snaplen at the beginning of the
+ * destination buffer.
+ */
+ if (snaplen + sizeof(size_t) < src_len) {
+ *(size_t *)dst = snaplen;
+ *result_lenp = snaplen + sizeof(size_t);
+ *compression_failed = 0;
+ } else
+ /* The compressor failed to produce a smaller result. */
+ *compression_failed = 1;
+ return (0);
+ }
+ return (wt_snappy_error(compressor, session, "snappy_compress", snret));
+}
+
+/*
+ * wt_snappy_decompress --
+ * WiredTiger snappy decompression.
+ */
+static int
+wt_snappy_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp)
+{
+ WT_EXTENSION_API *wt_api;
+ snappy_status snret;
+ size_t snaplen;
+
+ wt_api = ((SNAPPY_COMPRESSOR *)compressor)->wt_api;
+
+ /* retrieve the saved length */
+ snaplen = *(size_t *)src;
+ if (snaplen + sizeof(size_t) > src_len) {
+ (void)wt_api->err_printf(wt_api,
+ session,
+ "wt_snappy_decompress: stored size exceeds buffer size");
+ return (WT_ERROR);
+ }
+
+ /* dst_len is an input and an output arg. */
+ snret = snappy_uncompress(
+ (char *)src + sizeof(size_t), snaplen, (char *)dst, &dst_len);
+
+ if (snret == SNAPPY_OK) {
+ *result_lenp = dst_len;
+ return (0);
+ }
+
+ return (
+ wt_snappy_error(compressor, session, "snappy_decompress", snret));
+}
+
+/*
+ * wt_snappy_pre_size --
+ * WiredTiger snappy destination buffer sizing.
+ */
+static int
+wt_snappy_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ size_t *result_lenp)
+{
+ (void)compressor; /* Unused parameters */
+ (void)session;
+ (void)src;
+
+ /*
+ * Snappy requires the dest buffer be somewhat larger than the source.
+ * Fortunately, this is fast to compute, and will give us a dest buffer
+ * in wt_snappy_compress that we can compress to directly. We add space
+ * in the dest buffer to store the accurate compressed size.
+ */
+ *result_lenp = snappy_max_compressed_length(src_len) + sizeof(size_t);
+ return (0);
+}
+
+/*
+ * wt_snappy_terminate --
+ * WiredTiger snappy compression termination.
+ */
+static int
+wt_snappy_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
+{
+ (void)session; /* Unused parameters */
+
+ free(compressor);
+ return (0);
+}
+
+int snappy_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *);
+
+/*
+ * snappy_extension_init --
+ * WiredTiger snappy compression extension - called directly when
+ * Snappy support is built in, or via wiredtiger_extension_init when
+ * snappy support is included via extension loading.
+ */
+int
+snappy_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ SNAPPY_COMPRESSOR *snappy_compressor;
+
+ (void)config; /* Unused parameters */
+
+ if ((snappy_compressor = calloc(1, sizeof(SNAPPY_COMPRESSOR))) == NULL)
+ return (errno);
+
+ snappy_compressor->compressor.compress = wt_snappy_compress;
+ snappy_compressor->compressor.compress_raw = NULL;
+ snappy_compressor->compressor.decompress = wt_snappy_decompress;
+ snappy_compressor->compressor.pre_size = wt_snappy_pre_size;
+ snappy_compressor->compressor.terminate = wt_snappy_terminate;
+
+ snappy_compressor->wt_api = connection->get_extension_api(connection);
+
+ return (connection->add_compressor(
+ connection, "snappy", (WT_COMPRESSOR *)snappy_compressor, NULL));
+}
+
+/*
+ * We have to remove this symbol when building as a builtin extension otherwise
+ * it will conflict with other builtin libraries.
+ */
+#ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
+/*
+ * wiredtiger_extension_init --
+ * WiredTiger snappy compression extension.
+ */
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ return snappy_extension_init(connection, config);
+}
+#endif
diff --git a/src/third_party/wiredtiger/ext/compressors/zlib/Makefile.am b/src/third_party/wiredtiger/ext/compressors/zlib/Makefile.am
new file mode 100644
index 00000000000..fb0ec306562
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/zlib/Makefile.am
@@ -0,0 +1,10 @@
+AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
+
+if HAVE_BUILTIN_EXTENSION_ZLIB
+noinst_LTLIBRARIES = libwiredtiger_zlib.la
+else
+lib_LTLIBRARIES = libwiredtiger_zlib.la
+libwiredtiger_zlib_la_LDFLAGS = -avoid-version -module
+endif
+libwiredtiger_zlib_la_SOURCES = zlib_compress.c
+libwiredtiger_zlib_la_LIBADD = -lz
diff --git a/src/third_party/wiredtiger/ext/compressors/zlib/zlib_compress.c b/src/third_party/wiredtiger/ext/compressors/zlib/zlib_compress.c
new file mode 100644
index 00000000000..8dd619d695c
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/compressors/zlib/zlib_compress.c
@@ -0,0 +1,426 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <zlib.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+/*
+ * We need to include the configuration file to detect whether this extension
+ * is being built into the WiredTiger library.
+ */
+#include "wiredtiger_config.h"
+#ifdef _MSC_VER
+#define inline __inline
+#endif
+
+/* Local compressor structure. */
+typedef struct {
+ WT_COMPRESSOR compressor; /* Must come first */
+
+ WT_EXTENSION_API *wt_api; /* Extension API */
+
+ int zlib_level; /* Configuration */
+} ZLIB_COMPRESSOR;
+
+/*
+ * zlib gives us a cookie to pass to the underlying allocation functions; we
+ * need two handles, package them up.
+ */
+typedef struct {
+ WT_COMPRESSOR *compressor;
+ WT_SESSION *session;
+} ZLIB_OPAQUE;
+
+/*
+ * zlib_error --
+ * Output an error message, and return a standard error code.
+ */
+static int
+zlib_error(
+ WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int zret)
+{
+ WT_EXTENSION_API *wt_api;
+
+ wt_api = ((ZLIB_COMPRESSOR *)compressor)->wt_api;
+
+ (void)wt_api->err_printf(wt_api, session,
+ "zlib error: %s: %s: %d", call, zError(zret), zret);
+ return (WT_ERROR);
+}
+
+/*
+ * zalloc --
+ * Allocate a scratch buffer.
+ */
+static void *
+zalloc(void *cookie, uint32_t number, uint32_t size)
+{
+ ZLIB_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api;
+ return (wt_api->scr_alloc(
+ wt_api, opaque->session, (size_t)(number * size)));
+}
+
+/*
+ * zfree --
+ * Free a scratch buffer.
+ */
+static void
+zfree(void *cookie, void *p)
+{
+ ZLIB_OPAQUE *opaque;
+ WT_EXTENSION_API *wt_api;
+
+ opaque = cookie;
+ wt_api = ((ZLIB_COMPRESSOR *)opaque->compressor)->wt_api;
+ wt_api->scr_free(wt_api, opaque->session, p);
+}
+
+/*
+ * zlib_compress --
+ * WiredTiger zlib compression.
+ */
+static int
+zlib_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp, int *compression_failed)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+ ZLIB_OPAQUE opaque;
+ z_stream zs;
+ int ret;
+
+ zlib_compressor = (ZLIB_COMPRESSOR *)compressor;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = deflateInit(&zs, zlib_compressor->zlib_level)) != Z_OK)
+ return (zlib_error(compressor, session, "deflateInit", ret));
+
+ zs.next_in = src;
+ zs.avail_in = (uint32_t)src_len;
+ zs.next_out = dst;
+ zs.avail_out = (uint32_t)dst_len;
+ if (deflate(&zs, Z_FINISH) == Z_STREAM_END) {
+ *compression_failed = 0;
+ *result_lenp = zs.total_out;
+ } else
+ *compression_failed = 1;
+
+ if ((ret = deflateEnd(&zs)) != Z_OK && ret != Z_DATA_ERROR)
+ return (zlib_error(compressor, session, "deflateEnd", ret));
+
+ return (0);
+}
+
+/*
+ * zlib_find_slot --
+ * Find the slot containing the target offset (binary search).
+ */
+static inline uint32_t
+zlib_find_slot(uint32_t target, uint32_t *offsets, uint32_t slots)
+{
+ uint32_t base, indx, limit;
+
+ indx = 1;
+
+ /* Figure out which slot we got to: binary search */
+ if (target >= offsets[slots])
+ indx = slots;
+ else if (target > offsets[1])
+ for (base = 2, limit = slots - base; limit != 0; limit >>= 1) {
+ indx = base + (limit >> 1);
+ if (target < offsets[indx])
+ continue;
+ base = indx + 1;
+ --limit;
+ }
+
+ return (indx);
+}
+
+/*
+ * zlib_compress_raw --
+ * Pack records into a specified on-disk page size.
+ */
+static int
+zlib_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ size_t page_max, int split_pct, size_t extra,
+ uint8_t *src, uint32_t *offsets, uint32_t slots,
+ uint8_t *dst, size_t dst_len, int final,
+ size_t *result_lenp, uint32_t *result_slotsp)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+ ZLIB_OPAQUE opaque;
+ z_stream last_zs, zs;
+ uint32_t curr_slot, last_slot;
+ int ret;
+
+ curr_slot = last_slot = 0;
+ (void)split_pct;
+ (void)dst_len;
+ (void)final;
+
+ zlib_compressor = (ZLIB_COMPRESSOR *)compressor;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = deflateInit(&zs,
+ zlib_compressor->zlib_level)) != Z_OK)
+ return (zlib_error(compressor, session, "deflateInit", ret));
+
+ zs.next_in = src;
+ zs.next_out = dst;
+ /*
+ * Experimentally derived, reserve this many bytes for zlib to finish
+ * up a buffer. If this isn't sufficient, we don't fail but we will be
+ * inefficient.
+ */
+#define WT_ZLIB_RESERVED 24
+ zs.avail_out = (uint32_t)(page_max - extra - WT_ZLIB_RESERVED);
+ last_zs = zs;
+
+ /*
+ * Strategy: take the available output size and compress that much
+ * input. Continue until there is no input small enough or the
+ * compression fails to fit.
+ *
+ * Don't let the compression ratio become insanely good (which can
+ * happen with synthetic workloads). Once we hit a limit, stop so that
+ * the in-memory size of pages isn't totally different to the on-disk
+ * size. Otherwise we can get into trouble where every update to a
+ * page results in forced eviction based on in-memory size, even though
+ * the data fits into a single on-disk block.
+ */
+ while (zs.avail_out > 0 && zs.total_in <= zs.total_out * 20) {
+ /* Find the slot we will try to compress up to. */
+ if ((curr_slot = zlib_find_slot(
+ zs.total_in + zs.avail_out, offsets, slots)) <= last_slot)
+ break;
+
+ zs.avail_in = offsets[curr_slot] - offsets[last_slot];
+ /* Save the stream state in case the chosen data doesn't fit. */
+ last_zs = zs;
+
+ while (zs.avail_in > 0 && zs.avail_out > 0)
+ if ((ret = deflate(&zs, Z_SYNC_FLUSH)) != Z_OK)
+ return (zlib_error(
+ compressor, session, "deflate", ret));
+
+ /* Roll back if the last deflate didn't complete. */
+ if (zs.avail_in > 0) {
+ zs = last_zs;
+ break;
+ } else
+ last_slot = curr_slot;
+ }
+
+ zs.avail_out += WT_ZLIB_RESERVED;
+ ret = deflate(&zs, Z_FINISH);
+
+ /*
+ * If the end marker didn't fit, report that we got no work done. WT
+ * will compress the (possibly large) page image using ordinary
+ * compression instead.
+ */
+ if (ret == Z_OK || ret == Z_BUF_ERROR)
+ last_slot = 0;
+ else if (ret != Z_STREAM_END)
+ return (
+ zlib_error(compressor, session, "deflate end block", ret));
+
+ if ((ret = deflateEnd(&zs)) != Z_OK && ret != Z_DATA_ERROR)
+ return (zlib_error(compressor, session, "deflateEnd", ret));
+
+ if (last_slot > 0) {
+ *result_slotsp = last_slot;
+ *result_lenp = zs.total_out;
+ } else {
+ /* We didn't manage to compress anything: don't retry. */
+ *result_slotsp = 0;
+ *result_lenp = 1;
+ }
+
+#if 0
+ fprintf(stderr,
+ "zlib_compress_raw (%s): page_max %" PRIuMAX ", slots %" PRIu32
+ ", take %" PRIu32 ": %" PRIu32 " -> %" PRIuMAX "\n",
+ final ? "final" : "not final", (uintmax_t)page_max,
+ slots, last_slot, offsets[last_slot], (uintmax_t)*result_lenp);
+#endif
+ return (0);
+}
+
+/*
+ * zlib_decompress --
+ * WiredTiger zlib decompression.
+ */
+static int
+zlib_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ uint8_t *src, size_t src_len,
+ uint8_t *dst, size_t dst_len,
+ size_t *result_lenp)
+{
+ ZLIB_OPAQUE opaque;
+ z_stream zs;
+ int ret, tret;
+
+ memset(&zs, 0, sizeof(zs));
+ zs.zalloc = zalloc;
+ zs.zfree = zfree;
+ opaque.compressor = compressor;
+ opaque.session = session;
+ zs.opaque = &opaque;
+
+ if ((ret = inflateInit(&zs)) != Z_OK)
+ return (zlib_error(compressor, session, "inflateInit", ret));
+
+ zs.next_in = src;
+ zs.avail_in = (uint32_t)src_len;
+ zs.next_out = dst;
+ zs.avail_out = (uint32_t)dst_len;
+ while ((ret = inflate(&zs, Z_FINISH)) == Z_OK)
+ ;
+ if (ret == Z_STREAM_END) {
+ *result_lenp = zs.total_out;
+ ret = Z_OK;
+ }
+
+ if ((tret = inflateEnd(&zs)) != Z_OK && ret == Z_OK)
+ ret = tret;
+
+ return (ret == Z_OK ?
+ 0 : zlib_error(compressor, session, "inflate", ret));
+}
+
+/*
+ * zlib_terminate --
+ * WiredTiger zlib compression termination.
+ */
+static int
+zlib_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
+{
+ (void)session; /* Unused parameters */
+
+ free(compressor);
+ return (0);
+}
+
+/*
+ * zlib_add_compressor --
+ * Add a zlib compressor.
+ */
+static int
+zlib_add_compressor(WT_CONNECTION *connection, int raw, const char *name)
+{
+ ZLIB_COMPRESSOR *zlib_compressor;
+
+ /*
+ * There are two almost identical zlib compressors: one supporting raw
+ * compression, and one without.
+ */
+ if ((zlib_compressor = calloc(1, sizeof(ZLIB_COMPRESSOR))) == NULL)
+ return (errno);
+
+ zlib_compressor->compressor.compress = zlib_compress;
+ zlib_compressor->compressor.compress_raw = raw ?
+ zlib_compress_raw : NULL;
+ zlib_compressor->compressor.decompress = zlib_decompress;
+ zlib_compressor->compressor.pre_size = NULL;
+ zlib_compressor->compressor.terminate = zlib_terminate;
+
+ zlib_compressor->wt_api = connection->get_extension_api(connection);
+
+ /*
+ * between 0-10: level: see zlib manual.
+ */
+ zlib_compressor->zlib_level = Z_DEFAULT_COMPRESSION;
+
+ /* Load the standard compressor. */
+ return (connection->add_compressor(
+ connection, name, &zlib_compressor->compressor, NULL));
+}
+
+int zlib_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *);
+
+/*
+ * zlib_extension_init --
+ * WiredTiger zlib compression extension - called directly when zlib
+ * support is built in, or via wiredtiger_extension_init when zlib
+ * support is included via extension loading.
+ */
+int
+zlib_extension_init(
+ WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ int ret;
+
+ (void)config; /* Unused parameters */
+
+ if ((ret = zlib_add_compressor(connection, 1, "zlib")) != 0)
+ return (ret);
+ if ((ret = zlib_add_compressor(connection, 0, "zlib-noraw")) != 0)
+ return (ret);
+ return (0);
+}
+
+/*
+ * We have to remove this symbol when building as a builtin extension otherwise
+ * it will conflict with other builtin libraries.
+ */
+#ifndef HAVE_BUILTIN_EXTENSION_SNAPPY
+/*
+ * wiredtiger_extension_init --
+ * WiredTiger zlib compression extension.
+ */
+int
+wiredtiger_extension_init(
+ WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ return (zlib_extension_init(connection, config));
+}
+#endif
diff --git a/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am b/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am
new file mode 100644
index 00000000000..b4e6e67e2cd
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am
@@ -0,0 +1,11 @@
+AM_CPPFLAGS = -I$(top_builddir) \
+ -I$(top_srcdir)/src/include -I$(HELIUM_PATH)
+
+noinst_LTLIBRARIES = libwiredtiger_helium.la
+libwiredtiger_helium_la_SOURCES = helium.c
+libwiredtiger_helium_la_LIBADD = -L$(HELIUM_PATH) -lhe
+
+# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well
+# as installation, it will only build static libraries. As far as I can tell,
+# the "approved" libtool way to turn them back on is by adding -rpath.
+libwiredtiger_helium_la_LDFLAGS = -avoid-version -module -rpath /nowhere
diff --git a/src/third_party/wiredtiger/ext/datasources/helium/README b/src/third_party/wiredtiger/ext/datasources/helium/README
new file mode 100644
index 00000000000..e78ba58c71d
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/datasources/helium/README
@@ -0,0 +1,125 @@
+Helium README.
+
+The data structures are "Helium sources" which map to one or more physical
+volumes; each Helium source supports any number of "WiredTiger sources",
+where a WiredTiger source is an object similar to a Btree "file:" object.
+Each WiredTiger source supports any number of WiredTiger cursors.
+
+Each Helium source is given a logical name when first referenced, and that
+logical name is subsequently used when a WiredTiger source is created. For
+example, the logical name for a Helium source might be "dev1", and it would
+map to the Helium volumes /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create
+calls specify a URI like "table:dev1/my_table".
+
+For each WiredTiger source, we create two namespaces on the underlying device,
+a "cache" and a "primary".
+
+The cache contains key/value pairs based on updates or changes that have been
+made, and includes transactional information. So, for example, if transaction
+3 modifies key/value pair "foo/aaa", and then transaction 4 removes key "foo",
+then transaction 5 inserts key/value pair "foo/bbb", the entry in the cache
+will look something like:
+
+ Key: foo
+ Value: [transaction ID 3] [aaa]
+ [transaction ID 4] [remove]
+ [transaction ID 5] [bbb]
+
+Obviously, we have to marshall/unmarshall these values to/from the cache.
+
+In contrast, the primary contains only key/value pairs known to be committed
+and visible to any reader.
+
+When an insert, update or remove is done:
+ acquire a lock
+ read any matching key from the cache
+ check to see if the update can proceed
+ append a new value for this transaction
+ release the lock
+
+When a search is done:
+ if there's a matching key/value pair in the cache {
+ if there's an item visible to the reading transaction
+ return it
+ }
+ if there's a matching key/value pair in the primary {
+ return it
+ }
+
+When a next/prev is done:
+ move to the next/prev visible item in the cache
+ move to the next/prev visible item in the primary
+ return the one closest to the starting position
+
+Locks are not acquired for read operations, and no flushes are done for any of
+these operations.
+
+We also create one additional object, the transaction name space, which serves
+all of the WiredTiger and Helium objects in a WiredTiger connection. Whenever
+a transaction involving a Helium source commits, we insert a commit record into
+the transaction name space and flush the device. When a transaction rolls back,
+we insert an abort record into the txn name space, but don't flush the device.
+
+The visibility check is slightly different than the rest of WiredTiger: we do
+not reset anything when a transaction aborts, and so we have to check if the
+transaction has been aborted as well as check the transaction ID for visibility.
+
+We create a "cleanup" thread for every underlying Helium source. The job of
+this thread is to migrate rows from the cache object into the primary. Any
+committed, globally visible change in the cache can be copied into the primary
+and removed from the cache:
+
+ set BaseTxnID to the oldest transaction ID
+ not yet visible to a running transaction
+
+ for each row in the cache:
+ if all of the updates are greater than BaseTxnID
+ copy the last update to the primary
+
+ flush the primary to stable storage
+
+ lock the cache
+ for each row in the cache:
+ if all of the updates are greater than BaseTxnID
+ remove the row from the cache
+ unlock the cache
+
+ for each row in the transaction store:
+ if the transaction ID is less than BaseTxnID
+ remove the row
+
+We only need to lock the cache when removing rows, the initial copy to the
+primary does not require locks because only the cleanup thread ever writes
+to the primary.
+
+No lock is required when removing rows from the transaction store, once the
+transaction ID is less than the BaseTxnID, it will never be read.
+
+Helium recovery is almost identical to the cleanup thread, which migrates rows
+from the cache into the primary. For every cache/primary pair, migrate every
+commit to the primary (by definition, at recovery time it must be globally
+visible), and discard everything else (by definition, at recovery time anything
+not committed has been aborted.
+
+=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+Questions, problems, whatever:
+
+* The implementation is endian-specific, that is, the WiredTiger metadata
+stored on the Helium device is on not portable to a big-endian machine.
+Helium's metadata is portable between different endian machines, so this
+should probably be fixed.
+
+* There's a problem with transactions in WiredTiger that span more than a
+single data source. For example, consider a transaction that modifies
+both a Helium object and a Btree object. If we commit and push the Helium
+commit record to stable storage, and then crash before committing the Btree
+change, the enclosing WiredTiger transaction will/should end up aborting,
+and there's no way for us to back out the change in Helium. I'm leaving
+this problem alone until WiredTiger fine-grained durability is complete,
+we're going to need WiredTiger support for some kind of 2PC to solve this.
+
+* If a record in the cache gets too busy, we could end up unable to remove
+it (there would always be an active transaction), and it would grow forever.
+I suspect the solution is to clean it up when we realize we can't remove it,
+that is, we can rebuild the record, discarding the no longer needed entries,
+even if the record can't be entirely discarded.
diff --git a/src/third_party/wiredtiger/ext/datasources/helium/helium.c b/src/third_party/wiredtiger/ext/datasources/helium/helium.c
new file mode 100644
index 00000000000..f5be26e9119
--- /dev/null
+++ b/src/third_party/wiredtiger/ext/datasources/helium/helium.c
@@ -0,0 +1,3449 @@
+/*-
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include <sys/select.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <he.h>
+
+#include <wiredtiger.h>
+#include <wiredtiger_ext.h>
+
+typedef struct he_env HE_ENV;
+typedef struct he_item HE_ITEM;
+typedef struct he_stats HE_STATS;
+
+static int verbose = 0; /* Verbose messages */
+
+/*
+ * Macros to output error and verbose messages, and set or return an error.
+ * Error macros require local "ret" variable.
+ *
+ * ESET: update an error value, handling more/less important errors.
+ * ERET: output a message, return the error.
+ * EMSG: output a message, set the local error value.
+ * EMSG_ERR:
+ * output a message, set the local error value, jump to the err label.
+ * VMSG: verbose message.
+ */
+#undef ESET
+#define ESET(a) do { \
+ int __v; \
+ if ((__v = (a)) != 0) { \
+ /* \
+ * On error, check for a panic (it overrides all other \
+ * returns). Else, if there's no return value or the \
+ * return value is not strictly an error, override it \
+ * with the error. \
+ */ \
+ if (__v == WT_PANIC || \
+ ret == 0 || \
+ ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND) \
+ ret = __v; \
+ /* \
+ * If we're set to a Helium error at the end of the day,\
+ * switch to a generic WiredTiger error. \
+ */ \
+ if (ret < 0 && ret > -31,800) \
+ ret = WT_ERROR; \
+ } \
+} while (0)
+#undef ERET
+#define ERET(wtext, session, v, ...) do { \
+ (void) \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
+ ESET(v); \
+ return (ret); \
+} while (0)
+#undef EMSG
+#define EMSG(wtext, session, v, ...) do { \
+ (void) \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
+ ESET(v); \
+} while (0)
+#undef EMSG_ERR
+#define EMSG_ERR(wtext, session, v, ...) do { \
+ (void) \
+ wtext->err_printf(wtext, session, "helium: " __VA_ARGS__); \
+ ESET(v); \
+ goto err; \
+} while (0)
+#undef VERBOSE_L1
+#define VERBOSE_L1 1
+#undef VERBOSE_L2
+#define VERBOSE_L2 2
+#undef VMSG
+#define VMSG(wtext, session, v, ...) do { \
+ if (verbose >= v) \
+ (void)wtext-> \
+ msg_printf(wtext, session, "helium: " __VA_ARGS__); \
+} while (0)
+
+/*
+ * OVERWRITE_AND_FREE --
+ * Make sure we don't re-use a structure after it's dead.
+ */
+#undef OVERWRITE_AND_FREE
+#define OVERWRITE_AND_FREE(p) do { \
+ memset(p, 0xab, sizeof(*(p))); \
+ free(p); \
+} while (0)
+
+/*
+ * Version each object, out of sheer raging paranoia.
+ */
+#define WIREDTIGER_HELIUM_MAJOR 1 /* Major, minor version */
+#define WIREDTIGER_HELIUM_MINOR 0
+
+/*
+ * WiredTiger name space on the Helium store: all objects are named with the
+ * WiredTiger prefix (we don't require the Helium store be exclusive to our
+ * files). Primary objects are named "WiredTiger.[name]", associated cache
+ * objects are "WiredTiger.[name].cache". The per-connection transaction
+ * object is "WiredTiger.WiredTigerTxn". When we first open a Helium volume,
+ * we open/close a file in order to apply flags for the first open of the
+ * volume, that's "WiredTiger.WiredTigerInit".
+ */
+#define WT_NAME_PREFIX "WiredTiger."
+#define WT_NAME_INIT "WiredTiger.WiredTigerInit"
+#define WT_NAME_TXN "WiredTiger.WiredTigerTxn"
+#define WT_NAME_CACHE ".cache"
+
+/*
+ * WT_SOURCE --
+ * A WiredTiger source, supporting one or more cursors.
+ */
+typedef struct __wt_source {
+ char *uri; /* Unique name */
+
+ pthread_rwlock_t lock; /* Lock */
+ int lockinit; /* Lock created */
+
+ int configured; /* If structure configured */
+ u_int ref; /* Active reference count */
+
+ uint64_t append_recno; /* Allocation record number */
+
+ int config_bitfield; /* config "value_format=#t" */
+ int config_compress; /* config "helium_o_compress" */
+ int config_recno; /* config "key_format=r" */
+
+ /*
+ * Each WiredTiger object has a "primary" namespace in a Helium store
+ * plus a "cache" namespace, which has not-yet-resolved updates. There
+ * is a dirty flag so read-only data sets can ignore the cache.
+ */
+ he_t he; /* Underlying Helium object */
+ he_t he_cache; /* Underlying Helium cache */
+ int he_cache_inuse;
+
+ struct __he_source *hs; /* Underlying Helium source */
+ struct __wt_source *next; /* List of WiredTiger objects */
+} WT_SOURCE;
+
+/*
+ * HELIUM_SOURCE --
+ * A Helium volume, supporting one or more WT_SOURCE objects.
+ */
+typedef struct __he_source {
+ /*
+ * XXX
+ * The transaction commit handler must appear first in the structure.
+ */
+ WT_TXN_NOTIFY txn_notify; /* Transaction commit handler */
+
+ WT_EXTENSION_API *wtext; /* Extension functions */
+
+ char *name; /* Unique WiredTiger name */
+ char *device; /* Unique Helium volume name */
+
+ /*
+ * Maintain a handle for each underlying Helium source so checkpoint is
+ * faster, we can "commit" a single handle per source, regardless of the
+ * number of objects.
+ */
+ he_t he_volume;
+
+ struct __wt_source *ws_head; /* List of WiredTiger sources */
+
+ /*
+ * Each Helium source has a cleaner thread to migrate WiredTiger source
+ * updates from the cache namespace to the primary namespace, based on
+ * the number of bytes or the number of operations. (There's a cleaner
+ * thread per Helium store so migration operations can overlap.) We
+ * read these fields without a lock, but serialize writes to minimize
+ * races (and because it costs us nothing).
+ */
+ pthread_t cleaner_id; /* Cleaner thread ID */
+ volatile int cleaner_stop; /* Cleaner thread quit flag */
+
+ /*
+ * Each WiredTiger connection has a transaction namespace which lists
+ * resolved transactions with their committed or aborted state as a
+ * value. That namespace appears in a single Helium store (the first
+ * one created, if it doesn't already exist), and then it's referenced
+ * from other Helium stores.
+ */
+#define TXN_ABORTED 'A'
+#define TXN_COMMITTED 'C'
+#define TXN_UNRESOLVED 0
+ he_t he_txn; /* Helium txn store */
+ int he_owner; /* Owns transaction store */
+
+ struct __he_source *next; /* List of Helium sources */
+} HELIUM_SOURCE;
+
+/*
+ * DATA_SOURCE --
+ * A WiredTiger data source, supporting one or more HELIUM_SOURCE objects.
+ */
+typedef struct __data_source {
+ WT_DATA_SOURCE wtds; /* Must come first */
+
+ WT_EXTENSION_API *wtext; /* Extension functions */
+
+ pthread_rwlock_t global_lock; /* Global lock */
+ int lockinit; /* Lock created */
+
+ struct __he_source *hs_head; /* List of Helium sources */
+} DATA_SOURCE;
+
+/*
+ * CACHE_RECORD --
+ * An array of updates from the cache object.
+ *
+ * Values in the cache store are marshalled/unmarshalled to/from the store,
+ * using a simple encoding:
+ * {N records: 4B}
+ * {record#1 TxnID: 8B}
+ * {record#1 remove tombstone: 1B}
+ * {record#1 data length: 4B}
+ * {record#1 data}
+ * ...
+ *
+ * Each cursor potentially has a single set of these values.
+ */
+typedef struct __cache_record {
+ uint8_t *v; /* Value */
+ uint32_t len; /* Value length */
+ uint64_t txnid; /* Transaction ID */
+#define REMOVE_TOMBSTONE 'R'
+ int remove; /* 1/0 remove flag */
+} CACHE_RECORD;
+
+/*
+ * CURSOR --
+ * A cursor, supporting a single WiredTiger cursor.
+ */
+typedef struct __cursor {
+ WT_CURSOR wtcursor; /* Must come first */
+
+ WT_EXTENSION_API *wtext; /* Extension functions */
+
+ WT_SOURCE *ws; /* Underlying source */
+
+ HE_ITEM record; /* Record */
+ uint8_t __key[HE_MAX_KEY_LEN]; /* Record.key, Record.value */
+ uint8_t *v;
+ size_t len;
+ size_t mem_len;
+
+ struct {
+ uint8_t *v; /* Temporary buffers */
+ size_t len;
+ size_t mem_len;
+ } t1, t2, t3;
+
+ int config_append; /* config "append" */
+ int config_overwrite; /* config "overwrite" */
+
+ CACHE_RECORD *cache; /* unmarshalled cache records */
+ uint32_t cache_entries; /* cache records */
+ uint32_t cache_slots; /* cache total record slots */
+} CURSOR;
+
+/*
+ * prefix_match --
+ * Return if a string matches a prefix.
+ */
+static inline int
+prefix_match(const char *str, const char *pfx)
+{
+ return (strncmp(str, pfx, strlen(pfx)) == 0);
+}
+
+/*
+ * string_match --
+ * Return if a string matches a byte string of len bytes.
+ */
+static inline int
+string_match(const char *str, const char *bytes, size_t len)
+{
+ return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0');
+}
+
+/*
+ * cursor_destroy --
+ * Free a cursor's memory, and optionally the cursor itself.
+ */
+static void
+cursor_destroy(CURSOR *cursor)
+{
+ if (cursor != NULL) {
+ free(cursor->v);
+ free(cursor->t1.v);
+ free(cursor->t2.v);
+ free(cursor->t3.v);
+ free(cursor->cache);
+ OVERWRITE_AND_FREE(cursor);
+ }
+}
+
+/*
+ * os_errno --
+ * Limit our use of errno so it's easy to find/remove.
+ */
+static int
+os_errno(void)
+{
+ return (errno);
+}
+
+/*
+ * lock_init --
+ * Initialize a lock.
+ */
+static int
+lock_init(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+{
+ int ret = 0;
+
+ if ((ret = pthread_rwlock_init(lockp, NULL)) != 0)
+ ERET(wtext, session, WT_PANIC,
+ "pthread_rwlock_init: %s", strerror(ret));
+ return (0);
+}
+
+/*
+ * lock_destroy --
+ * Destroy a lock.
+ */
+static int
+lock_destroy(
+ WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+{
+ int ret = 0;
+
+ if ((ret = pthread_rwlock_destroy(lockp)) != 0)
+ ERET(wtext, session, WT_PANIC,
+ "pthread_rwlock_destroy: %s", strerror(ret));
+ return (0);
+}
+
+/*
+ * writelock --
+ * Acquire a write lock.
+ */
+static inline int
+writelock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+{
+ int ret = 0;
+
+ if ((ret = pthread_rwlock_wrlock(lockp)) != 0)
+ ERET(wtext, session, WT_PANIC,
+ "pthread_rwlock_wrlock: %s", strerror(ret));
+ return (0);
+}
+
+/*
+ * unlock --
+ * Release a lock.
+ */
+static inline int
+unlock(WT_EXTENSION_API *wtext, WT_SESSION *session, pthread_rwlock_t *lockp)
+{
+ int ret = 0;
+
+ if ((ret = pthread_rwlock_unlock(lockp)) != 0)
+ ERET(wtext, session, WT_PANIC,
+ "pthread_rwlock_unlock: %s", strerror(ret));
+ return (0);
+}
+
+#if 0
+/*
+ * helium_dump_kv --
+ * Dump a Helium record.
+ */
+static void
+helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp)
+{
+ (void)fprintf(stderr, "%s %3zu: ", pfx, len);
+ for (; len > 0; --len, ++p)
+ if (!isspace(*p) && isprint(*p))
+ (void)putc(*p, fp);
+ else if (len == 1 && *p == '\0') /* Skip string nuls. */
+ continue;
+ else
+ (void)fprintf(fp, "%#x", *p);
+ (void)putc('\n', fp);
+}
+
+/*
+ * helium_dump --
+ * Dump the records in a Helium store.
+ */
+static int
+helium_dump(WT_EXTENSION_API *wtext, he_t he, const char *tag)
+{
+ HE_ITEM *r, _r;
+ uint8_t k[4 * 1024], v[4 * 1024];
+ int ret = 0;
+
+ r = &_r;
+ memset(r, 0, sizeof(*r));
+ r->key = k;
+ r->val = v;
+
+ (void)fprintf(stderr, "== %s\n", tag);
+ while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) {
+#if 0
+ uint64_t recno;
+ if ((ret = wtext->struct_unpack(wtext,
+ NULL, r->key, r->key_len, "r", &recno)) != 0)
+ return (ret);
+ fprintf(stderr, "K: %" PRIu64, recno);
+#else
+ helium_dump_kv("K: ", r->key, r->key_len, stderr);
+#endif
+ helium_dump_kv("V: ", r->val, r->val_len, stderr);
+ }
+ if (ret != HE_ERR_ITEM_NOT_FOUND) {
+ fprintf(stderr, "he_next: %s\n", he_strerror(ret));
+ ret = WT_ERROR;
+ }
+ return (ret);
+}
+
+/*
+ * helium_stats --
+ * Display Helium statistics for a datastore.
+ */
+static int
+helium_stats(
+ WT_EXTENSION_API *wtext, WT_SESSION *session, he_t he, const char *tag)
+{
+ HE_STATS stats;
+ int ret = 0;
+
+ if ((ret = he_stats(he, &stats)) != 0)
+ ERET(wtext, session, ret, "he_stats: %s", he_strerror(ret));
+ fprintf(stderr, "== %s\n", tag);
+ fprintf(stderr, "name=%s\n", stats.name);
+ fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items);
+ fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items);
+ fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items);
+ fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity);
+ fprintf(stderr, "size=%" PRIu64 "B\n", stats.size);
+ return (0);
+}
+#endif
+
+/*
+ * helium_call --
+ * Call a Helium key retrieval function, handling overflow.
+ */
+static inline int
+helium_call(WT_CURSOR *wtcursor, const char *fname,
+ he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t))
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ int ret = 0;
+ char *p;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+
+ r = &cursor->record;
+ r->val = cursor->v;
+
+restart:
+ if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) {
+ if (ret == HE_ERR_ITEM_NOT_FOUND)
+ return (WT_NOTFOUND);
+ ERET(wtext, session, ret, "%s: %s", fname, he_strerror(ret));
+ }
+
+ /*
+ * If the returned length is larger than our passed-in length, we didn't
+ * get the complete value. Grow the buffer and use he_lookup to do the
+ * retrieval (he_lookup because the call succeeded and the key was
+ * copied out, so calling he_next/he_prev again would skip key/value
+ * pairs).
+ *
+ * We have to loop, another thread of control might change the length of
+ * the value, requiring we grow our buffer multiple times.
+ *
+ * We have to potentially restart the entire call in case the underlying
+ * key/value disappears.
+ */
+ for (;;) {
+ if (cursor->mem_len >= r->val_len) {
+ cursor->len = r->val_len;
+ return (0);
+ }
+
+ /* Grow the value buffer. */
+ if ((p = realloc(cursor->v, r->val_len + 32)) == NULL)
+ return (os_errno());
+ cursor->v = r->val = p;
+ cursor->mem_len = r->val_len + 32;
+
+ if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) {
+ if (ret == HE_ERR_ITEM_NOT_FOUND)
+ goto restart;
+ ERET(wtext,
+ session, ret, "he_lookup: %s", he_strerror(ret));
+ }
+ }
+ /* NOTREACHED */
+}
+
+/*
+ * txn_state_set --
+ * Resolve a transaction.
+ */
+static int
+txn_state_set(WT_EXTENSION_API *wtext,
+ WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit)
+{
+ HE_ITEM txn;
+ uint8_t val;
+ int ret = 0;
+
+ /*
+ * Update the store -- commits must be durable, flush the volume.
+ *
+ * XXX
+ * Not endian-portable, we're writing a native transaction ID to the
+ * store.
+ */
+ memset(&txn, 0, sizeof(txn));
+ txn.key = &txnid;
+ txn.key_len = sizeof(txnid);
+ val = commit ? TXN_COMMITTED : TXN_ABORTED;
+ txn.val = &val;
+ txn.val_len = sizeof(val);
+
+ if ((ret = he_update(hs->he_txn, &txn)) != 0)
+ ERET(wtext, session, ret, "he_update: %s", he_strerror(ret));
+
+ if (commit && (ret = he_commit(hs->he_txn)) != 0)
+ ERET(wtext, session, ret, "he_commit: %s", he_strerror(ret));
+ return (0);
+}
+
+/*
+ * txn_notify --
+ * Resolve a transaction; called from WiredTiger during commit/abort.
+ */
+static int
+txn_notify(WT_TXN_NOTIFY *handler,
+ WT_SESSION *session, uint64_t txnid, int committed)
+{
+ HELIUM_SOURCE *hs;
+
+ hs = (HELIUM_SOURCE *)handler;
+ return (txn_state_set(hs->wtext, session, hs, txnid, committed));
+}
+
+/*
+ * txn_state --
+ * Return a transaction's state.
+ */
+static int
+txn_state(WT_CURSOR *wtcursor, uint64_t txnid)
+{
+ CURSOR *cursor;
+ HE_ITEM txn;
+ HELIUM_SOURCE *hs;
+ uint8_t val_buf[16];
+
+ cursor = (CURSOR *)wtcursor;
+ hs = cursor->ws->hs;
+
+ memset(&txn, 0, sizeof(txn));
+ txn.key = &txnid;
+ txn.key_len = sizeof(txnid);
+ txn.val = val_buf;
+ txn.val_len = sizeof(val_buf);
+
+ if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0)
+ return (val_buf[0]);
+ return (TXN_UNRESOLVED);
+}
+
+/*
+ * cache_value_append --
+ * Append the current WiredTiger cursor's value to a cache record.
+ */
+static int
+cache_value_append(WT_CURSOR *wtcursor, int remove_op)
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ uint64_t txnid;
+ size_t len;
+ uint32_t entries;
+ uint8_t *p;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+
+ r = &cursor->record;
+
+ /*
+ * A cache update is 4B that counts the number of entries in the update,
+ * followed by sets of: 8B of txn ID then either a remove tombstone or a
+ * 4B length and variable-length data pair. Grow the value buffer, then
+ * append the cursor's information.
+ */
+ len = cursor->len + /* current length */
+ sizeof(uint32_t) + /* entries */
+ sizeof(uint64_t) + /* txn ID */
+ 1 + /* remove byte */
+ (remove_op ? 0 : /* optional data */
+ sizeof(uint32_t) + wtcursor->value.size) +
+ 32; /* slop */
+
+ if (len > cursor->mem_len) {
+ if ((p = realloc(cursor->v, len)) == NULL)
+ return (os_errno());
+ cursor->v = p;
+ cursor->mem_len = len;
+ }
+
+ /* Get the transaction ID. */
+ txnid = wtext->transaction_id(wtext, session);
+
+ /* Update the number of records in this value. */
+ if (cursor->len == 0) {
+ entries = 1;
+ cursor->len = sizeof(uint32_t);
+ } else {
+ memcpy(&entries, cursor->v, sizeof(uint32_t));
+ ++entries;
+ }
+ memcpy(cursor->v, &entries, sizeof(uint32_t));
+
+ /*
+ * Copy the WiredTiger cursor's data into place: txn ID, remove
+ * tombstone, data length, data.
+ *
+ * XXX
+ * Not endian-portable, we're writing a native transaction ID to the
+ * store.
+ */
+ p = cursor->v + cursor->len;
+ memcpy(p, &txnid, sizeof(uint64_t));
+ p += sizeof(uint64_t);
+ if (remove_op)
+ *p++ = REMOVE_TOMBSTONE;
+ else {
+ *p++ = ' ';
+ memcpy(p, &wtcursor->value.size, sizeof(uint32_t));
+ p += sizeof(uint32_t);
+ memcpy(p, wtcursor->value.data, wtcursor->value.size);
+ p += wtcursor->value.size;
+ }
+ cursor->len = (size_t)(p - cursor->v);
+
+ /* Update the underlying Helium record. */
+ r->val = cursor->v;
+ r->val_len = cursor->len;
+
+ return (0);
+}
+
+/*
+ * cache_value_unmarshall --
+ * Unmarshall a cache value into a set of records.
+ */
+static int
+cache_value_unmarshall(WT_CURSOR *wtcursor)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ uint32_t entries, i;
+ uint8_t *p;
+ int ret = 0;
+
+ cursor = (CURSOR *)wtcursor;
+
+ /* If we don't have enough record slots, allocate some more. */
+ memcpy(&entries, cursor->v, sizeof(uint32_t));
+ if (entries > cursor->cache_slots) {
+ if ((p = realloc(cursor->cache,
+ (entries + 20) * sizeof(cursor->cache[0]))) == NULL)
+ return (os_errno());
+
+ cursor->cache = (CACHE_RECORD *)p;
+ cursor->cache_slots = entries + 20;
+ }
+
+ /* Walk the value, splitting it up into records. */
+ p = cursor->v + sizeof(uint32_t);
+ for (i = 0, cp = cursor->cache; i < entries; ++i, ++cp) {
+ memcpy(&cp->txnid, p, sizeof(uint64_t));
+ p += sizeof(uint64_t);
+ cp->remove = *p++ == REMOVE_TOMBSTONE ? 1 : 0;
+ if (!cp->remove) {
+ memcpy(&cp->len, p, sizeof(uint32_t));
+ p += sizeof(uint32_t);
+ cp->v = p;
+ p += cp->len;
+ }
+ }
+ cursor->cache_entries = entries;
+
+ return (ret);
+}
+
+/*
+ * cache_value_aborted --
+ * Return if a transaction has been aborted.
+ */
+static inline int
+cache_value_aborted(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
+{
+ /*
+ * This function exists as a place to hang this comment.
+ *
+ * WiredTiger resets updated entry transaction IDs to an aborted state
+ * on rollback; to do that here would require tracking updated entries
+ * for a transaction or scanning the cache for updates made on behalf
+ * of the transaction during rollback, expensive stuff. Instead, check
+ * if the transaction has been aborted before calling the underlying
+ * WiredTiger visibility function.
+ */
+ return (txn_state(wtcursor, cp->txnid) == TXN_ABORTED ? 1 : 0);
+}
+
+/*
+ * cache_value_committed --
+ * Return if a transaction has been committed.
+ */
+static inline int
+cache_value_committed(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
+{
+ return (txn_state(wtcursor, cp->txnid) == TXN_COMMITTED ? 1 : 0);
+}
+
+/*
+ * cache_value_update_check --
+ * Return if an update can proceed based on the previous updates made to
+ * the cache entry.
+ */
+static int
+cache_value_update_check(WT_CURSOR *wtcursor)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ u_int i;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+
+ /* Only interesting for snapshot isolation. */
+ if (wtext->
+ transaction_isolation_level(wtext, session) != WT_TXN_ISO_SNAPSHOT)
+ return (0);
+
+ /*
+ * If there's an entry that's not visible and hasn't been aborted,
+ * return a deadlock.
+ */
+ for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
+ if (!cache_value_aborted(wtcursor, cp) &&
+ !wtext->transaction_visible(wtext, session, cp->txnid))
+ return (WT_ROLLBACK);
+ return (0);
+}
+
+/*
+ * cache_value_visible --
+ * Return the most recent cache entry update visible to the running
+ * transaction.
+ */
+static int
+cache_value_visible(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ u_int i;
+
+ *cpp = NULL;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+
+ /*
+ * We want the most recent cache entry update; the cache entries are
+ * in update order, walk from the end to the beginning.
+ */
+ cp = cursor->cache + cursor->cache_entries;
+ for (i = 0; i < cursor->cache_entries; ++i) {
+ --cp;
+ if (!cache_value_aborted(wtcursor, cp) &&
+ wtext->transaction_visible(wtext, session, cp->txnid)) {
+ *cpp = cp;
+ return (1);
+ }
+ }
+ return (0);
+}
+
+/*
+ * cache_value_visible_all --
+ * Return if a cache entry has no updates that aren't globally visible.
+ */
+static int
+cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ u_int i;
+
+ cursor = (CURSOR *)wtcursor;
+
+ /*
+ * Compare the update's transaction ID and the oldest transaction ID
+ * not yet visible to a running transaction. If there's an update a
+ * running transaction might want, the entry must remain in the cache.
+ * (We could tighten this requirement: if the only update required is
+ * also the update we'd migrate to the primary, it would still be OK
+ * to migrate it.)
+ */
+ for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
+ if (cp->txnid >= oldest)
+ return (0);
+ return (1);
+}
+
+/*
+ * cache_value_last_committed --
+ * Find the most recent update in a cache entry, recovery processing.
+ */
+static void
+cache_value_last_committed(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ u_int i;
+
+ *cpp = NULL;
+
+ cursor = (CURSOR *)wtcursor;
+
+ /*
+ * Find the most recent update in the cache record, we're going to try
+ * and migrate it into the primary, recovery version.
+ *
+ * We know the entry is visible, but it must have been committed before
+ * the failure to be migrated.
+ *
+ * Cache entries are in update order, walk from end to beginning.
+ */
+ cp = cursor->cache + cursor->cache_entries;
+ for (i = 0; i < cursor->cache_entries; ++i) {
+ --cp;
+ if (cache_value_committed(wtcursor, cp)) {
+ *cpp = cp;
+ return;
+ }
+ }
+}
+
+/*
+ * cache_value_last_not_aborted --
+ * Find the most recent update in a cache entry, normal processing.
+ */
+static void
+cache_value_last_not_aborted(WT_CURSOR *wtcursor, CACHE_RECORD **cpp)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ u_int i;
+
+ *cpp = NULL;
+
+ cursor = (CURSOR *)wtcursor;
+
+ /*
+ * Find the most recent update in the cache record, we're going to try
+ * and migrate it into the primary, normal processing version.
+ *
+ * We don't have to check if the entry was committed, we've already
+ * confirmed all entries for this cache key are globally visible, which
+ * means they must be either committed or aborted.
+ *
+ * Cache entries are in update order, walk from end to beginning.
+ */
+ cp = cursor->cache + cursor->cache_entries;
+ for (i = 0; i < cursor->cache_entries; ++i) {
+ --cp;
+ if (!cache_value_aborted(wtcursor, cp)) {
+ *cpp = cp;
+ return;
+ }
+ }
+}
+
+/*
+ * cache_value_txnmin --
+ * Return the oldest transaction ID involved in a cache update.
+ */
+static void
+cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ uint64_t txnmin;
+ u_int i;
+
+ cursor = (CURSOR *)wtcursor;
+
+ /* Return the oldest transaction ID for in the cache entry. */
+ txnmin = UINT64_MAX;
+ for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp)
+ if (txnmin > cp->txnid)
+ txnmin = cp->txnid;
+ *txnminp = txnmin;
+}
+
+/*
+ * key_max_err --
+ * Common error when a WiredTiger key is too large.
+ */
+static int
+key_max_err(WT_EXTENSION_API *wtext, WT_SESSION *session, size_t len)
+{
+ int ret = 0;
+
+ ERET(wtext, session, EINVAL,
+ "key length (%zu bytes) larger than the maximum Helium "
+ "key length of %d bytes",
+ len, HE_MAX_KEY_LEN);
+}
+
+/*
+ * copyin_key --
+ * Copy a WT_CURSOR key to a HE_ITEM key.
+ */
+static inline int
+copyin_key(WT_CURSOR *wtcursor, int allocate_key)
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ size_t size;
+ int ret = 0;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ ws = cursor->ws;
+ wtext = cursor->wtext;
+
+ r = &cursor->record;
+ if (ws->config_recno) {
+ /*
+ * Allocate a new record for append operations.
+ *
+ * A specified record number could potentially be larger than
+ * the maximum known record number, update the maximum number
+ * as necessary.
+ *
+ * Assume we can compare 8B values without locking them, and
+ * test again after acquiring the lock.
+ *
+ * XXX
+ * If the put fails for some reason, we'll have incremented the
+ * maximum record number past the correct point. I can't think
+ * of a reason any application would care or notice, but it's
+ * not quite right.
+ */
+ if (allocate_key && cursor->config_append) {
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+ wtcursor->recno = ++ws->append_recno;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+ } else if (wtcursor->recno > ws->append_recno) {
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+ if (wtcursor->recno > ws->append_recno)
+ ws->append_recno = wtcursor->recno;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+ }
+
+ if ((ret = wtext->struct_size(wtext, session,
+ &size, "r", wtcursor->recno)) != 0 ||
+ (ret = wtext->struct_pack(wtext, session,
+ r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno)) != 0)
+ return (ret);
+ r->key_len = size;
+ } else {
+ /* I'm not sure this test is necessary, but it's cheap. */
+ if (wtcursor->key.size > HE_MAX_KEY_LEN)
+ return (
+ key_max_err(wtext, session, wtcursor->key.size));
+
+ /*
+ * A set cursor key might reference application memory, which
+ * is only OK until the cursor operation has been called (in
+ * other words, we can only reference application memory from
+ * the WT_CURSOR.set_key call until the WT_CURSOR.op call).
+ * For this reason, do a full copy, don't just reference the
+ * WT_CURSOR key's data.
+ */
+ memcpy(r->key, wtcursor->key.data, wtcursor->key.size);
+ r->key_len = wtcursor->key.size;
+ }
+ return (0);
+}
+
+/*
+ * copyout_key --
+ * Copy a HE_ITEM key to a WT_CURSOR key.
+ */
+static inline int
+copyout_key(WT_CURSOR *wtcursor)
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+ ws = cursor->ws;
+
+ r = &cursor->record;
+ if (ws->config_recno) {
+ if ((ret = wtext->struct_unpack(wtext,
+ session, r->key, r->key_len, "r", &wtcursor->recno)) != 0)
+ return (ret);
+ } else {
+ wtcursor->key.data = r->key;
+ wtcursor->key.size = (size_t)r->key_len;
+ }
+ return (0);
+}
+
+/*
+ * copyout_val --
+ * Copy a Helium store's HE_ITEM value to a WT_CURSOR value.
+ */
+static inline int
+copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp)
+{
+ CURSOR *cursor;
+
+ cursor = (CURSOR *)wtcursor;
+
+ if (cp == NULL) {
+ wtcursor->value.data = cursor->v;
+ wtcursor->value.size = cursor->len;
+ } else {
+ wtcursor->value.data = cp->v;
+ wtcursor->value.size = cp->len;
+ }
+ return (0);
+}
+
+/*
+ * nextprev --
+ * Cursor next/prev.
+ */
+static int
+nextprev(WT_CURSOR *wtcursor, const char *fname,
+ int (*f)(he_t, HE_ITEM *, size_t, size_t))
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ WT_ITEM a, b;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ int cache_ret, cache_rm, cmp, ret = 0;
+ void *p;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ ws = cursor->ws;
+ wtext = cursor->wtext;
+ r = &cursor->record;
+
+ cache_rm = 0;
+
+ /*
+ * If the cache isn't yet in use, it's a simpler problem, just check
+ * the store. We don't care if we race, we're not guaranteeing any
+ * special behavior with respect to phantoms.
+ */
+ if (ws->he_cache_inuse == 0) {
+ cache_ret = WT_NOTFOUND;
+ goto cache_clean;
+ }
+
+skip_deleted:
+ /*
+ * The next/prev key/value pair might be in the cache, which means we
+ * are making two calls and returning the best choice. As each call
+ * overwrites both key and value, we have to have a copy of the key
+ * for the second call plus the returned key and value from the first
+ * call. That's why each cursor has 3 temporary buffers.
+ *
+ * First, copy the key.
+ */
+ if (cursor->t1.mem_len < r->key_len) {
+ if ((p = realloc(cursor->t1.v, r->key_len)) == NULL)
+ return (os_errno());
+ cursor->t1.v = p;
+ cursor->t1.mem_len = r->key_len;
+ }
+ memcpy(cursor->t1.v, r->key, r->key_len);
+ cursor->t1.len = r->key_len;
+
+ /*
+ * Move through the cache until we either find a record with a visible
+ * entry, or we reach the end/beginning.
+ */
+ for (cache_rm = 0;;) {
+ if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0)
+ break;
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ return (ret);
+
+ /* If there's no visible entry, move to the next one. */
+ if (!cache_value_visible(wtcursor, &cp))
+ continue;
+
+ /*
+ * If the entry has been deleted, remember that and continue.
+ * We can't just skip the entry because it might be a delete
+ * of an entry in the primary store, which means the cache
+ * entry stops us from returning the primary store's entry.
+ */
+ if (cp->remove)
+ cache_rm = 1;
+
+ /*
+ * Copy the cache key. If the cache's entry wasn't a delete,
+ * copy the value as well, we may return the cache entry.
+ */
+ if (cursor->t2.mem_len < r->key_len) {
+ if ((p = realloc(cursor->t2.v, r->key_len)) == NULL)
+ return (os_errno());
+ cursor->t2.v = p;
+ cursor->t2.mem_len = r->key_len;
+ }
+ memcpy(cursor->t2.v, r->key, r->key_len);
+ cursor->t2.len = r->key_len;
+
+ if (cache_rm)
+ break;
+
+ if (cursor->t3.mem_len < cp->len) {
+ if ((p = realloc(cursor->t3.v, cp->len)) == NULL)
+ return (os_errno());
+ cursor->t3.v = p;
+ cursor->t3.mem_len = cp->len;
+ }
+ memcpy(cursor->t3.v, cp->v, cp->len);
+ cursor->t3.len = cp->len;
+
+ break;
+ }
+ if (ret != 0 && ret != WT_NOTFOUND)
+ return (ret);
+ cache_ret = ret;
+
+ /* Copy the original key back into place. */
+ memcpy(r->key, cursor->t1.v, cursor->t1.len);
+ r->key_len = cursor->t1.len;
+
+cache_clean:
+ /* Get the next/prev entry from the store. */
+ ret = helium_call(wtcursor, fname, ws->he, f);
+ if (ret != 0 && ret != WT_NOTFOUND)
+ return (ret);
+
+ /* If no entries in either the cache or the primary, we're done. */
+ if (cache_ret == WT_NOTFOUND && ret == WT_NOTFOUND)
+ return (WT_NOTFOUND);
+
+ /*
+ * If both the cache and the primary had entries, decide which is a
+ * better choice and pretend we didn't find the other one.
+ */
+ if (cache_ret == 0 && ret == 0) {
+ a.data = r->key; /* a is the primary */
+ a.size = (uint32_t)r->key_len;
+ b.data = cursor->t2.v; /* b is the cache */
+ b.size = (uint32_t)cursor->t2.len;
+ if ((ret = wtext->collate(wtext, session, &a, &b, &cmp)) != 0)
+ return (ret);
+
+ if (f == he_next) {
+ if (cmp >= 0)
+ ret = WT_NOTFOUND;
+ else
+ cache_ret = WT_NOTFOUND;
+ } else {
+ if (cmp <= 0)
+ ret = WT_NOTFOUND;
+ else
+ cache_ret = WT_NOTFOUND;
+ }
+ }
+
+ /*
+ * If the cache is the key we'd choose, but it's a delete, skip past it
+ * by moving from the deleted key to the next/prev item in either the
+ * primary or the cache.
+ */
+ if (cache_ret == 0 && cache_rm) {
+ memcpy(r->key, cursor->t2.v, cursor->t2.len);
+ r->key_len = cursor->t2.len;
+ goto skip_deleted;
+ }
+
+ /* If taking the cache's entry, copy the value into place. */
+ if (cache_ret == 0) {
+ memcpy(r->key, cursor->t2.v, cursor->t2.len);
+ r->key_len = cursor->t2.len;
+
+ memcpy(cursor->v, cursor->t3.v, cursor->t3.len);
+ cursor->len = cursor->t3.len;
+ }
+
+ /* Copy out the chosen key/value pair. */
+ if ((ret = copyout_key(wtcursor)) != 0)
+ return (ret);
+ if ((ret = copyout_val(wtcursor, NULL)) != 0)
+ return (ret);
+ return (0);
+}
+
+/*
+ * helium_cursor_next --
+ * WT_CURSOR.next method.
+ */
+static int
+helium_cursor_next(WT_CURSOR *wtcursor)
+{
+ return (nextprev(wtcursor, "he_next", he_next));
+}
+
+/*
+ * helium_cursor_prev --
+ * WT_CURSOR.prev method.
+ */
+static int
+helium_cursor_prev(WT_CURSOR *wtcursor)
+{
+ return (nextprev(wtcursor, "he_prev", he_prev));
+}
+
+/*
+ * helium_cursor_reset --
+ * WT_CURSOR.reset method.
+ */
+static int
+helium_cursor_reset(WT_CURSOR *wtcursor)
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+
+ cursor = (CURSOR *)wtcursor;
+ r = &cursor->record;
+
+ /*
+ * Reset the cursor by setting the key length to 0, causing subsequent
+ * next/prev operations to return the first/last record of the object.
+ */
+ r->key_len = 0;
+ return (0);
+}
+
+/*
+ * helium_cursor_search --
+ * WT_CURSOR.search method.
+ */
+static int
+helium_cursor_search(WT_CURSOR *wtcursor)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ cursor = (CURSOR *)wtcursor;
+ ws = cursor->ws;
+
+ /* Copy in the WiredTiger cursor's key. */
+ if ((ret = copyin_key(wtcursor, 0)) != 0)
+ return (ret);
+
+ /*
+ * Check for an entry in the cache. If we find one, unmarshall it
+ * and check for a visible entry we can return.
+ */
+ if ((ret =
+ helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) {
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ return (ret);
+ if (cache_value_visible(wtcursor, &cp))
+ return (cp->remove ?
+ WT_NOTFOUND : copyout_val(wtcursor, cp));
+ } else if (ret != WT_NOTFOUND)
+ return (ret);
+
+ /* Check for an entry in the primary store. */
+ if ((ret = helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0)
+ return (ret);
+
+ return (copyout_val(wtcursor, NULL));
+}
+
+/*
+ * helium_cursor_search_near --
+ * WT_CURSOR.search_near method.
+ */
+static int
+helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact)
+{
+ int ret = 0;
+
+ /*
+ * XXX
+ * I'm not confident this is sufficient: if there are multiple threads
+ * of control, it's possible for the search for an exact match to fail,
+ * another thread of control to insert (and commit) an exact match, and
+ * then it's possible we'll return the wrong value. This needs to be
+ * revisited once the transactional code is in place.
+ */
+
+ /* Search for an exact match. */
+ if ((ret = helium_cursor_search(wtcursor)) == 0) {
+ *exact = 0;
+ return (0);
+ }
+ if (ret != WT_NOTFOUND)
+ return (ret);
+
+ /* Search for a key that's larger. */
+ if ((ret = helium_cursor_next(wtcursor)) == 0) {
+ *exact = 1;
+ return (0);
+ }
+ if (ret != WT_NOTFOUND)
+ return (ret);
+
+ /* Search for a key that's smaller. */
+ if ((ret = helium_cursor_prev(wtcursor)) == 0) {
+ *exact = -1;
+ return (0);
+ }
+
+ return (ret);
+}
+
+/*
+ * helium_cursor_insert --
+ * WT_CURSOR.insert method.
+ */
+static int
+helium_cursor_insert(WT_CURSOR *wtcursor)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ HE_ITEM *r;
+ HELIUM_SOURCE *hs;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+ ws = cursor->ws;
+ hs = ws->hs;
+ r = &cursor->record;
+
+ /* Get the WiredTiger cursor's key. */
+ if ((ret = copyin_key(wtcursor, 1)) != 0)
+ return (ret);
+
+ VMSG(wtext, session, VERBOSE_L2,
+ "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val);
+
+ /* Clear the value, assume we're adding the first cache entry. */
+ cursor->len = 0;
+
+ /* Updates are read-modify-writes, lock the underlying cache. */
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+
+ /* Read the record from the cache store. */
+ switch (ret = helium_call(
+ wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
+ case 0:
+ /* Crack the record. */
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ goto err;
+
+ /* Check if the update can proceed. */
+ if ((ret = cache_value_update_check(wtcursor)) != 0)
+ goto err;
+
+ if (cursor->config_overwrite)
+ break;
+
+ /*
+ * If overwrite is false, a visible entry (that's not a removed
+ * entry), is an error. We're done checking if there is a
+ * visible entry in the cache, otherwise repeat the check on the
+ * primary store.
+ */
+ if (cache_value_visible(wtcursor, &cp)) {
+ if (cp->remove)
+ break;
+
+ ret = WT_DUPLICATE_KEY;
+ goto err;
+ }
+ /* FALLTHROUGH */
+ case WT_NOTFOUND:
+ if (cursor->config_overwrite)
+ break;
+
+ /* If overwrite is false, an entry is an error. */
+ if ((ret = helium_call(
+ wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) {
+ if (ret == 0)
+ ret = WT_DUPLICATE_KEY;
+ goto err;
+ }
+ ret = 0;
+ break;
+ default:
+ goto err;
+ }
+
+ /*
+ * Create a new value using the current cache record plus the WiredTiger
+ * cursor's value, and update the cache.
+ */
+ if ((ret = cache_value_append(wtcursor, 0)) != 0)
+ goto err;
+ if ((ret = he_update(ws->he_cache, r)) != 0)
+ EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret));
+
+ /* Update the state while still holding the lock. */
+ if (ws->he_cache_inuse == 0)
+ ws->he_cache_inuse = 1;
+
+ /* Discard the lock. */
+err: ESET(unlock(wtext, session, &ws->lock));
+
+ /* If successful, request notification at transaction resolution. */
+ if (ret == 0)
+ ESET(
+ wtext->transaction_notify(wtext, session, &hs->txn_notify));
+
+ return (ret);
+}
+
+/*
+ * update --
+ * Update or remove an entry.
+ */
+static int
+update(WT_CURSOR *wtcursor, int remove_op)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ HE_ITEM *r;
+ HELIUM_SOURCE *hs;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+ ws = cursor->ws;
+ hs = ws->hs;
+ r = &cursor->record;
+
+ /* Get the WiredTiger cursor's key. */
+ if ((ret = copyin_key(wtcursor, 0)) != 0)
+ return (ret);
+
+ VMSG(wtext, session, VERBOSE_L2,
+ "%c %.*s.%.*s",
+ remove_op ? 'R' : 'U',
+ (int)r->key_len, r->key, (int)r->val_len, r->val);
+
+ /* Clear the value, assume we're adding the first cache entry. */
+ cursor->len = 0;
+
+ /* Updates are read-modify-writes, lock the underlying cache. */
+ if ((ret = writelock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+
+ /* Read the record from the cache store. */
+ switch (ret = helium_call(
+ wtcursor, "he_lookup", ws->he_cache, he_lookup)) {
+ case 0:
+ /* Crack the record. */
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ goto err;
+
+ /* Check if the update can proceed. */
+ if ((ret = cache_value_update_check(wtcursor)) != 0)
+ goto err;
+
+ if (cursor->config_overwrite)
+ break;
+
+ /*
+ * If overwrite is false, no entry (or a removed entry), is an
+ * error. We're done checking if there is a visible entry in
+ * the cache, otherwise repeat the check on the primary store.
+ */
+ if (cache_value_visible(wtcursor, &cp)) {
+ if (!cp->remove)
+ break;
+
+ ret = WT_NOTFOUND;
+ goto err;
+ }
+ /* FALLTHROUGH */
+ case WT_NOTFOUND:
+ if (cursor->config_overwrite)
+ break;
+
+ /* If overwrite is false, no entry is an error. */
+ if ((ret =
+ helium_call(wtcursor, "he_lookup", ws->he, he_lookup)) != 0)
+ goto err;
+
+ /*
+ * All we care about is the cache entry, which didn't exist;
+ * clear the returned value, we're about to "append" to it.
+ */
+ cursor->len = 0;
+ break;
+ default:
+ goto err;
+ }
+
+ /*
+ * Create a new cache value based on the current cache record plus the
+ * WiredTiger cursor's value.
+ */
+ if ((ret = cache_value_append(wtcursor, remove_op)) != 0)
+ goto err;
+
+ /* Push the record into the cache. */
+ if ((ret = he_update(ws->he_cache, r)) != 0)
+ EMSG(wtext, session, ret, "he_update: %s", he_strerror(ret));
+
+ /* Update the state while still holding the lock. */
+ if (ws->he_cache_inuse == 0)
+ ws->he_cache_inuse = 1;
+
+ /* Discard the lock. */
+err: ESET(unlock(wtext, session, &ws->lock));
+
+ /* If successful, request notification at transaction resolution. */
+ if (ret == 0)
+ ESET(
+ wtext->transaction_notify(wtext, session, &hs->txn_notify));
+
+ return (ret);
+}
+
+/*
+ * helium_cursor_update --
+ * WT_CURSOR.update method.
+ */
+static int
+helium_cursor_update(WT_CURSOR *wtcursor)
+{
+ return (update(wtcursor, 0));
+}
+
+/*
+ * helium_cursor_remove --
+ * WT_CURSOR.remove method.
+ */
+static int
+helium_cursor_remove(WT_CURSOR *wtcursor)
+{
+ CURSOR *cursor;
+ WT_SOURCE *ws;
+
+ cursor = (CURSOR *)wtcursor;
+ ws = cursor->ws;
+
+ /*
+ * WiredTiger's "remove" of a bitfield is really an update with a value
+ * of zero.
+ */
+ if (ws->config_bitfield) {
+ wtcursor->value.size = 1;
+ wtcursor->value.data = "";
+ return (update(wtcursor, 0));
+ }
+ return (update(wtcursor, 1));
+}
+
+/*
+ * helium_cursor_close --
+ * WT_CURSOR.close method.
+ */
+static int
+helium_cursor_close(WT_CURSOR *wtcursor)
+{
+ CURSOR *cursor;
+ WT_EXTENSION_API *wtext;
+ WT_SESSION *session;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ session = wtcursor->session;
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+ ws = cursor->ws;
+
+ if ((ret = writelock(wtext, session, &ws->lock)) == 0) {
+ --ws->ref;
+ ret = unlock(wtext, session, &ws->lock);
+ }
+ cursor_destroy(cursor);
+
+ return (ret);
+}
+
+/*
+ * ws_source_name --
+ * Build a namespace name.
+ */
+static int
+ws_source_name(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, const char *suffix, char **pp)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ size_t len;
+ int ret = 0;
+ const char *p;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /*
+ * Create the store's name. Application URIs are "helium:device/name";
+ * we want the names on the Helium device to be obviously WiredTiger's,
+ * and the device name isn't interesting. Convert to "WiredTiger:name",
+ * and add an optional suffix.
+ */
+ if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL)
+ ERET(wtext, session, EINVAL, "%s: illegal Helium URI", uri);
+ ++p;
+
+ len = strlen(WT_NAME_PREFIX) +
+ strlen(p) + (suffix == NULL ? 0 : strlen(suffix)) + 5;
+ if ((*pp = malloc(len)) == NULL)
+ return (os_errno());
+ (void)snprintf(*pp, len, "%s%s%s",
+ WT_NAME_PREFIX, p, suffix == NULL ? "" : suffix);
+ return (0);
+}
+
+/*
+ * ws_source_close --
+ * Close a WT_SOURCE reference.
+ */
+static int
+ws_source_close(WT_EXTENSION_API *wtext, WT_SESSION *session, WT_SOURCE *ws)
+{
+ int ret = 0, tret;
+
+ /*
+ * Warn if open cursors: it shouldn't happen because the upper layers of
+ * WiredTiger prevent it, so we don't do anything more than warn.
+ */
+ if (ws->ref != 0)
+ EMSG(wtext, session, WT_ERROR,
+ "%s: open object with %u open cursors being closed",
+ ws->uri, ws->ref);
+
+ if (ws->he != NULL) {
+ if ((tret = he_commit(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_commit: %s: %s", ws->uri, he_strerror(tret));
+ if ((tret = he_close(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s", ws->uri, he_strerror(tret));
+ ws->he = NULL;
+ }
+ if (ws->he_cache != NULL) {
+ if ((tret = he_close(ws->he_cache)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s(cache): %s",
+ ws->uri, he_strerror(tret));
+ ws->he_cache = NULL;
+ }
+
+ if (ws->lockinit)
+ ESET(lock_destroy(wtext, session, &ws->lock));
+
+ free(ws->uri);
+ OVERWRITE_AND_FREE(ws);
+
+ return (ret);
+}
+
+/*
+ * ws_source_open_object --
+ * Open an object in the Helium store.
+ */
+static int
+ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ HELIUM_SOURCE *hs,
+ const char *uri, const char *suffix, int flags, he_t *hep)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ he_t he;
+ char *p;
+ int ret = 0;
+
+ *hep = NULL;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ p = NULL;
+
+ /* Open the underlying Helium object. */
+ if ((ret = ws_source_name(wtds, session, uri, suffix, &p)) != 0)
+ return (ret);
+ VMSG(wtext, session, VERBOSE_L1, "open %s/%s", hs->name, p);
+ if ((he = he_open(hs->device, p, flags, NULL)) == NULL) {
+ ret = os_errno();
+ EMSG(wtext, session, ret,
+ "he_open: %s/%s: %s", hs->name, p, he_strerror(ret));
+ }
+ *hep = he;
+
+ free(p);
+ return (ret);
+}
+
+#define WS_SOURCE_OPEN_BUSY 0x01 /* Fail if source busy */
+#define WS_SOURCE_OPEN_GLOBAL 0x02 /* Keep the global lock */
+
+/*
+ * ws_source_open --
+ * Return a locked WiredTiger source, allocating and opening if it doesn't
+ * already exist.
+ */
+static int
+ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp)
+{
+ DATA_SOURCE *ds;
+ HELIUM_SOURCE *hs;
+ WT_CONFIG_ITEM a;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ size_t len;
+ int oflags, ret = 0;
+ const char *p, *t;
+
+ *refp = NULL;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ ws = NULL;
+
+ /*
+ * The URI will be "helium:" followed by a Helium name and object name
+ * pair separated by a slash, for example, "helium:volume/object".
+ */
+ if (!prefix_match(uri, "helium:"))
+ goto bad_name;
+ p = uri + strlen("helium:");
+ if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0')
+bad_name: ERET(wtext, session, EINVAL, "%s: illegal name format", uri);
+ len = (size_t)(t - p);
+
+ /* Find a matching Helium device. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if (string_match(hs->name, p, len))
+ break;
+ if (hs == NULL)
+ ERET(wtext, NULL,
+ EINVAL, "%s: no matching Helium store found", uri);
+
+ /*
+ * We're about to walk the Helium device's list of files, acquire the
+ * global lock.
+ */
+ if ((ret = writelock(wtext, session, &ds->global_lock)) != 0)
+ return (ret);
+
+ /*
+ * Check for a match: if we find one, optionally trade the global lock
+ * for the object's lock, optionally check if the object is busy, and
+ * return.
+ */
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next)
+ if (strcmp(ws->uri, uri) == 0) {
+ /* Check to see if the object is busy. */
+ if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) {
+ ret = EBUSY;
+ ESET(unlock(wtext, session, &ds->global_lock));
+ return (ret);
+ }
+ /* Swap the global lock for an object lock. */
+ if (!(flags & WS_SOURCE_OPEN_GLOBAL)) {
+ ret = writelock(wtext, session, &ws->lock);
+ ESET(unlock(wtext, session, &ds->global_lock));
+ if (ret != 0)
+ return (ret);
+ }
+ *refp = ws;
+ return (0);
+ }
+
+ /* Allocate and initialize a new underlying WiredTiger source object. */
+ if ((ws = calloc(1, sizeof(*ws))) == NULL ||
+ (ws->uri = strdup(uri)) == NULL) {
+ ret = os_errno();
+ goto err;
+ }
+ if ((ret = lock_init(wtext, session, &ws->lock)) != 0)
+ goto err;
+ ws->lockinit = 1;
+ ws->hs = hs;
+
+ /*
+ * Open the underlying Helium objects, then push the change.
+ *
+ * The naming scheme is simple: the URI names the primary store, and the
+ * URI with a trailing suffix names the associated caching store.
+ *
+ * We can set truncate flag, we always set the create flag, our caller
+ * handles attempts to create existing objects.
+ */
+ oflags = HE_O_CREATE;
+ if ((ret = wtext->config_get(wtext,
+ session, config, "helium_o_truncate", &a)) == 0 && a.val != 0)
+ oflags |= HE_O_TRUNCATE;
+ if (ret != 0 && ret != WT_NOTFOUND)
+ EMSG_ERR(wtext, session, ret,
+ "helium_o_truncate configuration: %s",
+ wtext->strerror(ret));
+
+ if ((ret = ws_source_open_object(
+ wtds, session, hs, uri, NULL, oflags, &ws->he)) != 0)
+ goto err;
+ if ((ret = ws_source_open_object(
+ wtds, session, hs, uri, WT_NAME_CACHE, oflags, &ws->he_cache)) != 0)
+ goto err;
+ if ((ret = he_commit(ws->he)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "he_commit: %s", he_strerror(ret));
+
+ /* Optionally trade the global lock for the object lock. */
+ if (!(flags & WS_SOURCE_OPEN_GLOBAL) &&
+ (ret = writelock(wtext, session, &ws->lock)) != 0)
+ goto err;
+
+ /* Insert the new entry at the head of the list. */
+ ws->next = hs->ws_head;
+ hs->ws_head = ws;
+
+ *refp = ws;
+ ws = NULL;
+
+ if (0) {
+err: if (ws != NULL)
+ ESET(ws_source_close(wtext, session, ws));
+ }
+
+ /*
+ * If there was an error or our caller doesn't need the global lock,
+ * release the global lock.
+ */
+ if (!(flags & WS_SOURCE_OPEN_GLOBAL) || ret != 0)
+ ESET(unlock(wtext, session, &ds->global_lock));
+
+ return (ret);
+}
+
+/*
+ * master_uri_get --
+ * Get the Helium master record for a URI.
+ */
+static int
+master_uri_get(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, const char **valuep)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ return (wtext->metadata_search(wtext, session, uri, valuep));
+}
+
+/*
+ * master_uri_drop --
+ * Drop the Helium master record for a URI.
+ */
+static int
+master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ return (wtext->metadata_remove(wtext, session, uri));
+}
+
+/*
+ * master_uri_rename --
+ * Rename the Helium master record for a URI.
+ */
+static int
+master_uri_rename(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, const char *newuri)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ int ret = 0;
+ const char *value;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ value = NULL;
+
+ /* Insert the record under a new name. */
+ if ((ret = master_uri_get(wtds, session, uri, &value)) != 0 ||
+ (ret = wtext->metadata_insert(wtext, session, newuri, value)) != 0)
+ goto err;
+
+ /*
+ * Remove the original record, and if that fails, attempt to remove
+ * the new record.
+ */
+ if ((ret = wtext->metadata_remove(wtext, session, uri)) != 0)
+ (void)wtext->metadata_remove(wtext, session, newuri);
+
+err: free((void *)value);
+ return (ret);
+}
+
+/*
+ * master_uri_set --
+ * Set the Helium master record for a URI.
+ */
+static int
+master_uri_set(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ WT_CONFIG_ITEM a, b, c;
+ WT_EXTENSION_API *wtext;
+ int exclusive, ret = 0;
+ char value[1024];
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ exclusive = 0;
+ if ((ret =
+ wtext->config_get(wtext, session, config, "exclusive", &a)) == 0)
+ exclusive = a.val != 0;
+ else if (ret != WT_NOTFOUND)
+ ERET(wtext, session, ret,
+ "exclusive configuration: %s", wtext->strerror(ret));
+
+ /* Get the key/value format strings. */
+ if ((ret = wtext->config_get(
+ wtext, session, config, "key_format", &a)) != 0) {
+ if (ret == WT_NOTFOUND) {
+ a.str = "u";
+ a.len = 1;
+ } else
+ ERET(wtext, session, ret,
+ "key_format configuration: %s",
+ wtext->strerror(ret));
+ }
+ if ((ret = wtext->config_get(
+ wtext, session, config, "value_format", &b)) != 0) {
+ if (ret == WT_NOTFOUND) {
+ b.str = "u";
+ b.len = 1;
+ } else
+ ERET(wtext, session, ret,
+ "value_format configuration: %s",
+ wtext->strerror(ret));
+ }
+
+ /* Get the compression configuration. */
+ if ((ret = wtext->config_get(
+ wtext, session, config, "helium_o_compress", &c)) != 0) {
+ if (ret == WT_NOTFOUND)
+ c.val = 0;
+ else
+ ERET(wtext, session, ret,
+ "helium_o_compress configuration: %s",
+ wtext->strerror(ret));
+ }
+
+ /*
+ * Create a new reference using insert (which fails if the record
+ * already exists).
+ */
+ (void)snprintf(value, sizeof(value),
+ "wiredtiger_helium_version=(major=%d,minor=%d),"
+ "key_format=%.*s,value_format=%.*s,"
+ "helium_o_compress=%d",
+ WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR,
+ (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0);
+ if ((ret = wtext->metadata_insert(wtext, session, uri, value)) == 0)
+ return (0);
+ if (ret == WT_DUPLICATE_KEY)
+ return (exclusive ? EEXIST : 0);
+ ERET(wtext, session, ret, "%s: %s", uri, wtext->strerror(ret));
+}
+
+/*
+ * helium_session_open_cursor --
+ * WT_SESSION.open_cursor method.
+ */
+static int
+helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor)
+{
+ CURSOR *cursor;
+ DATA_SOURCE *ds;
+ WT_CONFIG_ITEM v;
+ WT_CONFIG_PARSER *config_parser;
+ WT_CURSOR *wtcursor;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ int locked, ret, tret;
+ const char *value;
+
+ *new_cursor = NULL;
+
+ config_parser = NULL;
+ cursor = NULL;
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ ws = NULL;
+ locked = 0;
+ ret = tret = 0;
+ value = NULL;
+
+ /* Allocate and initialize a cursor. */
+ if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
+ return (os_errno());
+
+ if ((ret = wtext->config_get( /* Parse configuration */
+ wtext, session, config, "append", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "append configuration: %s", wtext->strerror(ret));
+ cursor->config_append = v.val != 0;
+
+ if ((ret = wtext->config_get(
+ wtext, session, config, "overwrite", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "overwrite configuration: %s", wtext->strerror(ret));
+ cursor->config_overwrite = v.val != 0;
+
+ if ((ret = wtext->collator_config(wtext, session, config)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "collator configuration: %s", wtext->strerror(ret));
+
+ /* Finish initializing the cursor. */
+ cursor->wtcursor.close = helium_cursor_close;
+ cursor->wtcursor.insert = helium_cursor_insert;
+ cursor->wtcursor.next = helium_cursor_next;
+ cursor->wtcursor.prev = helium_cursor_prev;
+ cursor->wtcursor.remove = helium_cursor_remove;
+ cursor->wtcursor.reset = helium_cursor_reset;
+ cursor->wtcursor.search = helium_cursor_search;
+ cursor->wtcursor.search_near = helium_cursor_search_near;
+ cursor->wtcursor.update = helium_cursor_update;
+
+ cursor->wtext = wtext;
+ cursor->record.key = cursor->__key;
+ if ((cursor->v = malloc(128)) == NULL)
+ goto err;
+ cursor->mem_len = 128;
+
+ /* Get a locked reference to the WiredTiger source. */
+ if ((ret = ws_source_open(wtds, session, uri, config, 0, &ws)) != 0)
+ goto err;
+ locked = 1;
+ cursor->ws = ws;
+
+ /*
+ * If this is the first access to the URI, we have to configure it
+ * using information stored in the master record.
+ */
+ if (!ws->configured) {
+ if ((ret = master_uri_get(wtds, session, uri, &value)) != 0)
+ goto err;
+
+ if ((ret = wtext->config_parser_open(wtext,
+ session, value, strlen(value), &config_parser)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "Configuration string parser: %s",
+ wtext->strerror(ret));
+ if ((ret = config_parser->get(
+ config_parser, "key_format", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "key_format configuration: %s",
+ wtext->strerror(ret));
+ ws->config_recno = v.len == 1 && v.str[0] == 'r';
+
+ if ((ret = config_parser->get(
+ config_parser, "value_format", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "value_format configuration: %s",
+ wtext->strerror(ret));
+ ws->config_bitfield =
+ v.len == 2 && isdigit(v.str[0]) && v.str[1] == 't';
+
+ if ((ret = config_parser->get(
+ config_parser, "helium_o_compress", &v)) != 0)
+ EMSG_ERR(wtext, session, ret,
+ "helium_o_compress configuration: %s",
+ wtext->strerror(ret));
+ ws->config_compress = v.val ? 1 : 0;
+
+ /*
+ * If it's a record-number key, read the last record from the
+ * object and set the allocation record value.
+ */
+ if (ws->config_recno) {
+ wtcursor = (WT_CURSOR *)cursor;
+ if ((ret = helium_cursor_reset(wtcursor)) != 0)
+ goto err;
+
+ if ((ret = helium_cursor_prev(wtcursor)) == 0)
+ ws->append_recno = wtcursor->recno;
+ else if (ret != WT_NOTFOUND)
+ goto err;
+
+ if ((ret = helium_cursor_reset(wtcursor)) != 0)
+ goto err;
+ }
+
+ ws->configured = 1;
+ }
+
+ /* Increment the open reference count to pin the URI and unlock it. */
+ ++ws->ref;
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
+ goto err;
+
+ *new_cursor = (WT_CURSOR *)cursor;
+
+ if (0) {
+err: if (ws != NULL && locked)
+ ESET(unlock(wtext, session, &ws->lock));
+ cursor_destroy(cursor);
+ }
+ if (config_parser != NULL &&
+ (tret = config_parser->close(config_parser)) != 0)
+ EMSG(wtext, session, tret,
+ "WT_CONFIG_PARSER.close: %s", wtext->strerror(tret));
+
+ free((void *)value);
+ return (ret);
+}
+
+/*
+ * helium_session_create --
+ * WT_SESSION.create method.
+ */
+static int
+helium_session_create(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ int ret = 0;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /*
+ * Get a locked reference to the WiredTiger source, then immediately
+ * unlock it, we aren't doing anything else.
+ */
+ if ((ret = ws_source_open(wtds, session, uri, config, 0, &ws)) != 0)
+ return (ret);
+ if ((ret = unlock(wtext, session, &ws->lock)) != 0)
+ return (ret);
+
+ /*
+ * Create the URI master record if it doesn't already exist.
+ *
+ * We've discarded the lock, but that's OK, creates are single-threaded
+ * at the WiredTiger level, it's not our problem to solve.
+ *
+ * If unable to enter a WiredTiger record, leave the Helium store alone.
+ * A subsequent create should do the right thing, we aren't leaving
+ * anything in an inconsistent state.
+ */
+ return (master_uri_set(wtds, session, uri, config));
+}
+
+/*
+ * helium_session_drop --
+ * WT_SESSION.drop method.
+ */
+static int
+helium_session_drop(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ HELIUM_SOURCE *hs;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE **p, *ws;
+ int ret = 0;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /*
+ * Get a locked reference to the data source: hold the global lock,
+ * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects.
+ *
+ * Remove the entry from the WT_SOURCE list -- it's a singly-linked
+ * list, find the reference to it.
+ */
+ if ((ret = ws_source_open(wtds, session, uri, config,
+ WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0)
+ return (ret);
+ hs = ws->hs;
+ for (p = &hs->ws_head; *p != NULL; p = &(*p)->next)
+ if (*p == ws) {
+ *p = (*p)->next;
+ break;
+ }
+
+ /* Drop the underlying Helium objects. */
+ ESET(he_remove(ws->he));
+ ws->he = NULL; /* The handle is dead. */
+ ESET(he_remove(ws->he_cache));
+ ws->he_cache = NULL; /* The handle is dead. */
+
+ /* Close the source, discarding the structure. */
+ ESET(ws_source_close(wtext, session, ws));
+ ws = NULL;
+
+ /* Discard the metadata entry. */
+ ESET(master_uri_drop(wtds, session, uri));
+
+ /*
+ * If we have an error at this point, panic -- there's an inconsistency
+ * in what WiredTiger knows about and the underlying store.
+ */
+ if (ret != 0)
+ ret = WT_PANIC;
+
+ ESET(unlock(wtext, session, &ds->global_lock));
+ return (ret);
+}
+
+/*
+ * helium_session_rename --
+ * WT_SESSION.rename method.
+ */
+static int
+helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session,
+ const char *uri, const char *newuri, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ int ret = 0;
+ char *p;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /*
+ * Get a locked reference to the data source; hold the global lock,
+ * we are going to change the object's name, and we can't allow
+ * other threads walking the list and comparing against the name.
+ */
+ if ((ret = ws_source_open(wtds, session, uri, config,
+ WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)) != 0)
+ return (ret);
+
+ /* Get a copy of the new name for the WT_SOURCE structure. */
+ if ((p = strdup(newuri)) == NULL) {
+ ret = os_errno();
+ goto err;
+ }
+ free(ws->uri);
+ ws->uri = p;
+
+ /* Rename the underlying Helium objects. */
+ ESET(ws_source_name(wtds, session, newuri, NULL, &p));
+ if (ret == 0) {
+ ESET(he_rename(ws->he, p));
+ free(p);
+ }
+ ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p));
+ if (ret == 0) {
+ ESET(he_rename(ws->he_cache, p));
+ free(p);
+ }
+
+ /* Update the metadata record. */
+ ESET(master_uri_rename(wtds, session, uri, newuri));
+
+ /*
+ * If we have an error at this point, panic -- there's an inconsistency
+ * in what WiredTiger knows about and the underlying store.
+ */
+ if (ret != 0)
+ ret = WT_PANIC;
+
+err: ESET(unlock(wtext, session, &ds->global_lock));
+
+ return (ret);
+}
+
+/*
+ * helium_session_truncate --
+ * WT_SESSION.truncate method.
+ */
+static int
+helium_session_truncate(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ int ret = 0, tret;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /* Get a locked reference to the WiredTiger source. */
+ if ((ret = ws_source_open(wtds, session,
+ uri, config, WS_SOURCE_OPEN_BUSY, &ws)) != 0)
+ return (ret);
+
+ /* Truncate the underlying namespaces. */
+ if ((tret = he_truncate(ws->he)) != 0)
+ EMSG(wtext, session, tret,
+ "he_truncate: %s: %s", ws->uri, he_strerror(tret));
+ if ((tret = he_truncate(ws->he_cache)) != 0)
+ EMSG(wtext, session, tret,
+ "he_truncate: %s: %s", ws->uri, he_strerror(tret));
+
+ ESET(unlock(wtext, session, &ws->lock));
+ return (ret);
+}
+
+/*
+ * helium_session_verify --
+ * WT_SESSION.verify method.
+ */
+static int
+helium_session_verify(WT_DATA_SOURCE *wtds,
+ WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config)
+{
+ (void)wtds;
+ (void)session;
+ (void)uri;
+ (void)config;
+ return (0);
+}
+
+/*
+ * helium_session_checkpoint --
+ * WT_SESSION.checkpoint method.
+ */
+static int
+helium_session_checkpoint(
+ WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config)
+{
+ DATA_SOURCE *ds;
+ HELIUM_SOURCE *hs;
+ WT_EXTENSION_API *wtext;
+ int ret = 0;
+
+ (void)config;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /* Flush all volumes. */
+ if ((hs = ds->hs_head) != NULL &&
+ (ret = he_commit(hs->he_volume)) != 0)
+ ERET(wtext, session, ret,
+ "he_commit: %s: %s", hs->device, he_strerror(ret));
+
+ return (0);
+}
+
+/*
+ * helium_source_close --
+ * Discard a HELIUM_SOURCE.
+ */
+static int
+helium_source_close(
+ WT_EXTENSION_API *wtext, WT_SESSION *session, HELIUM_SOURCE *hs)
+{
+ WT_SOURCE *ws;
+ int ret = 0, tret;
+
+ /* Resolve the cache into the primary one last time and quit. */
+ if (hs->cleaner_id != 0) {
+ hs->cleaner_stop = 1;
+
+ if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0)
+ EMSG(wtext, session, tret,
+ "pthread_join: %s", strerror(tret));
+ hs->cleaner_id = 0;
+ }
+
+ /* Close the underlying WiredTiger sources. */
+ while ((ws = hs->ws_head) != NULL) {
+ hs->ws_head = ws->next;
+ ESET(ws_source_close(wtext, session, ws));
+ }
+
+ /* If the owner, close the database transaction store. */
+ if (hs->he_txn != NULL && hs->he_owner) {
+ if ((tret = he_close(hs->he_txn)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(tret));
+ hs->he_txn = NULL;
+ }
+
+ /* Flush and close the Helium source. */
+ if (hs->he_volume != NULL) {
+ if ((tret = he_commit(hs->he_volume)) != 0)
+ EMSG(wtext, session, tret,
+ "he_commit: %s: %s",
+ hs->device, he_strerror(tret));
+
+ if ((tret = he_close(hs->he_volume)) != 0)
+ EMSG(wtext, session, tret,
+ "he_close: %s: %s: %s",
+ hs->name, WT_NAME_INIT, he_strerror(tret));
+ hs->he_volume = NULL;
+ }
+
+ free(hs->name);
+ free(hs->device);
+ OVERWRITE_AND_FREE(hs);
+
+ return (ret);
+}
+
+/*
+ * cache_cleaner --
+ * Migrate information from the cache to the primary store.
+ */
+static int
+cache_cleaner(WT_EXTENSION_API *wtext,
+ WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp)
+{
+ CACHE_RECORD *cp;
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_SOURCE *ws;
+ uint64_t txnid;
+ int locked, pushed, recovery, ret = 0;
+
+ /*
+ * Called in two ways: in normal processing mode where we're supplied a
+ * value for the oldest transaction ID not yet visible to a running
+ * transaction, and we're tracking the smallest transaction ID
+ * referenced by any cache entry, and in recovery mode where neither of
+ * those are true.
+ */
+ if (txnminp == NULL)
+ recovery = 1;
+ else {
+ recovery = 0;
+ *txnminp = UINT64_MAX;
+ }
+
+ cursor = (CURSOR *)wtcursor;
+ ws = cursor->ws;
+ r = &cursor->record;
+ locked = pushed = 0;
+
+ /*
+ * For every cache key where all updates are globally visible:
+ * Migrate the most recent update value to the primary store.
+ */
+ for (r->key_len = 0; (ret =
+ helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
+ /*
+ * Unmarshall the value, and if all of the updates are globally
+ * visible, update the primary with the last committed update.
+ * In normal processing, the last committed update test is for
+ * a globally visible update that's not explicitly aborted. In
+ * recovery processing, the last committed update test is for
+ * an explicitly committed update. See the underlying functions
+ * for more information.
+ */
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ goto err;
+ if (!recovery && !cache_value_visible_all(wtcursor, oldest))
+ continue;
+ if (recovery)
+ cache_value_last_committed(wtcursor, &cp);
+ else
+ cache_value_last_not_aborted(wtcursor, &cp);
+ if (cp == NULL)
+ continue;
+
+ pushed = 1;
+ if (cp->remove) {
+ if ((ret = he_delete(ws->he, r)) == 0)
+ continue;
+
+ /*
+ * Updates confined to the cache may not appear in the
+ * primary at all, that is, an insert and remove pair
+ * may be confined to the cache.
+ */
+ if (ret == HE_ERR_ITEM_NOT_FOUND) {
+ ret = 0;
+ continue;
+ }
+ ERET(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
+ } else {
+ r->val = cp->v;
+ r->val_len = cp->len;
+ /*
+ * If compression configured for this datastore, set the
+ * compression flag, we're updating the "real" store.
+ */
+ if (ws->config_compress)
+ r->flags |= HE_I_COMPRESS;
+ ret = he_update(ws->he, r);
+ r->flags = 0;
+ if (ret == 0)
+ continue;
+
+ ERET(wtext, NULL, ret,
+ "he_update: %s", he_strerror(ret));
+ }
+ }
+
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ if (ret != 0)
+ ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
+
+ /*
+ * If we didn't move any keys from the cache to the primary, quit. It's
+ * possible we could still remove values from the cache, but not likely,
+ * and another pass would probably be wasted effort (especially locked).
+ */
+ if (!pushed)
+ return (0);
+
+ /*
+ * Push the store to stable storage for correctness. (It doesn't matter
+ * what Helium handle we commit, so we just commit one of them.)
+ */
+ if ((ret = he_commit(ws->he)) != 0)
+ ERET(wtext, NULL, ret, "he_commit: %s", he_strerror(ret));
+
+ /*
+ * If we're performing recovery, that's all we need to do, we're going
+ * to simply discard the cache, there's no reason to remove entries one
+ * at a time.
+ */
+ if (recovery)
+ return (0);
+
+ /*
+ * For every cache key where all updates are globally visible:
+ * Remove the cache key.
+ *
+ * We're updating the cache, which requires a lock during normal
+ * cleaning.
+ */
+ if ((ret = writelock(wtext, NULL, &ws->lock)) != 0)
+ goto err;
+ locked = 1;
+
+ for (r->key_len = 0; (ret =
+ helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) {
+ /*
+ * Unmarshall the value, and if all of the updates are globally
+ * visible, remove the cache entry.
+ */
+ if ((ret = cache_value_unmarshall(wtcursor)) != 0)
+ goto err;
+ if (cache_value_visible_all(wtcursor, oldest)) {
+ if ((ret = he_delete(ws->he_cache, r)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
+ continue;
+ }
+
+ /*
+ * If the entry will remain in the cache, figure out the oldest
+ * transaction for which it contains an update (which might be
+ * different from the oldest transaction in the system). We
+ * need the oldest transaction ID that appears anywhere in any
+ * cache, it limits the records we can discard from the
+ * transaction store.
+ */
+ cache_value_txnmin(wtcursor, &txnid);
+ if (txnid < *txnminp)
+ *txnminp = txnid;
+ }
+
+ locked = 0;
+ if ((ret = unlock(wtext, NULL, &ws->lock)) != 0)
+ goto err;
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ if (ret != 0)
+ EMSG_ERR(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
+
+err: if (locked)
+ ESET(unlock(wtext, NULL, &ws->lock));
+
+ return (ret);
+}
+
+/*
+ * txn_cleaner --
+ * Discard no longer needed entries from the transaction store.
+ */
+static int
+txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin)
+{
+ CURSOR *cursor;
+ HE_ITEM *r;
+ WT_EXTENSION_API *wtext;
+ uint64_t txnid;
+ int ret = 0;
+
+ cursor = (CURSOR *)wtcursor;
+ wtext = cursor->wtext;
+ r = &cursor->record;
+
+ /*
+ * Remove all entries from the transaction store that are before the
+ * oldest transaction ID that appears anywhere in any cache.
+ */
+ for (r->key_len = 0;
+ (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) {
+ memcpy(&txnid, r->key, sizeof(txnid));
+ if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_delete: %s", he_strerror(ret));
+ }
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ if (ret != 0)
+ ERET(wtext, NULL, ret, "he_next: %s", he_strerror(ret));
+
+ return (0);
+}
+
+/*
+ * fake_cursor --
+ * Fake up enough of a cursor to do Helium operations.
+ */
+static int
+fake_cursor(WT_EXTENSION_API *wtext, WT_CURSOR **wtcursorp)
+{
+ CURSOR *cursor;
+ WT_CURSOR *wtcursor;
+
+ /*
+ * Fake a cursor.
+ */
+ if ((cursor = calloc(1, sizeof(CURSOR))) == NULL)
+ return (os_errno());
+ cursor->wtext = wtext;
+ cursor->record.key = cursor->__key;
+ if ((cursor->v = malloc(128)) == NULL) {
+ free(cursor);
+ return (os_errno());
+ }
+ cursor->mem_len = 128;
+
+ /*
+ * !!!
+ * Fake cursors don't have WT_SESSION handles.
+ */
+ wtcursor = (WT_CURSOR *)cursor;
+ wtcursor->session = NULL;
+
+ *wtcursorp = wtcursor;
+ return (0);
+}
+
+/*
+ * cache_cleaner_worker --
+ * Thread to migrate data from the cache to the primary.
+ */
+static void *
+cache_cleaner_worker(void *arg)
+{
+ struct timeval t;
+ CURSOR *cursor;
+ HELIUM_SOURCE *hs;
+ HE_STATS stats;
+ WT_CURSOR *wtcursor;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ uint64_t oldest, txnmin, txntmp;
+ int cleaner_stop, delay, ret = 0;
+
+ hs = (HELIUM_SOURCE *)arg;
+
+ cursor = NULL;
+ wtext = hs->wtext;
+
+ if ((ret = fake_cursor(wtext, &wtcursor)) != 0)
+ EMSG_ERR(wtext, NULL, ret, "cleaner: %s", strerror(ret));
+ cursor = (CURSOR *)wtcursor;
+
+ for (cleaner_stop = delay = 0; !cleaner_stop;) {
+ /*
+ * Check if this will be the final run; cleaner_stop is declared
+ * volatile, and so the read will happen. We don't much care if
+ * there's extra loops, it's enough if a read eventually happens
+ * and finds the variable set. Store the read locally, reading
+ * the variable twice might race.
+ */
+ cleaner_stop = hs->cleaner_stop;
+
+ /*
+ * Delay if this isn't the final run and the last pass didn't
+ * find any work to do.
+ */
+ if (!cleaner_stop && delay != 0) {
+ t.tv_sec = delay;
+ t.tv_usec = 0;
+ (void)select(0, NULL, NULL, NULL, &t);
+ }
+
+ /* Run at least every 5 seconds. */
+ if (delay < 5)
+ ++delay;
+
+ /*
+ * Clean the datastore caches, depending on their size. It's
+ * both more and less expensive to return values from the cache:
+ * more because we have to marshall/unmarshall the values, less
+ * because there's only a single call, to the cache store rather
+ * one to the cache and one to the primary. I have no turning
+ * information, for now simply set the limit at 50MB.
+ */
+#undef CACHE_SIZE_TRIGGER
+#define CACHE_SIZE_TRIGGER (50 * 1048576)
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
+ if ((ret = he_stats(ws->he_cache, &stats)) != 0)
+ EMSG_ERR(wtext, NULL,
+ ret, "he_stats: %s", he_strerror(ret));
+ if (stats.size > CACHE_SIZE_TRIGGER)
+ break;
+ }
+ if (!cleaner_stop && ws == NULL)
+ continue;
+
+ /* There was work to do, don't delay before checking again. */
+ delay = 0;
+
+ /*
+ * Get the oldest transaction ID not yet visible to a running
+ * transaction. Do this before doing anything else, avoiding
+ * any race with creating new WT_SOURCE handles.
+ */
+ oldest = wtext->transaction_oldest(wtext);
+
+ /*
+ * If any cache needs cleaning, clean them all, because we have
+ * to know the minimum transaction ID referenced by any cache.
+ *
+ * For each cache/primary pair, migrate whatever records we can,
+ * tracking the lowest transaction ID of any entry in any cache.
+ */
+ txnmin = UINT64_MAX;
+ for (ws = hs->ws_head; ws != NULL; ws = ws->next) {
+ cursor->ws = ws;
+ if ((ret = cache_cleaner(
+ wtext, wtcursor, oldest, &txntmp)) != 0)
+ goto err;
+ if (txntmp < txnmin)
+ txnmin = txntmp;
+ }
+
+ /*
+ * Discard any transactions less than the minimum transaction ID
+ * referenced in any cache.
+ *
+ * !!!
+ * I'm playing fast-and-loose with whether or not the cursor
+ * references an underlying WT_SOURCE, there's a structural
+ * problem here.
+ */
+ cursor->ws = NULL;
+ if ((ret = txn_cleaner(wtcursor, hs->he_txn, txnmin)) != 0)
+ goto err;
+ }
+
+err: cursor_destroy(cursor);
+ return (NULL);
+}
+
+/*
+ * helium_config_read --
+ * Parse the Helium configuration.
+ */
+static int
+helium_config_read(WT_EXTENSION_API *wtext, WT_CONFIG_ITEM *config,
+ char **devicep, HE_ENV *envp, int *env_setp, int *flagsp)
+{
+ WT_CONFIG_ITEM k, v;
+ WT_CONFIG_PARSER *config_parser;
+ int ret = 0, tret;
+
+ *env_setp = 0;
+ *flagsp = 0;
+
+ /* Traverse the configuration arguments list. */
+ if ((ret = wtext->config_parser_open(
+ wtext, NULL, config->str, config->len, &config_parser)) != 0)
+ ERET(wtext, NULL, ret,
+ "WT_EXTENSION_API.config_parser_open: %s",
+ wtext->strerror(ret));
+ while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
+ if (string_match("helium_devices", k.str, k.len)) {
+ if ((*devicep = calloc(1, v.len + 1)) == NULL)
+ return (os_errno());
+ memcpy(*devicep, v.str, v.len);
+ continue;
+ }
+ if (string_match("helium_env_read_cache_size", k.str, k.len)) {
+ envp->read_cache_size = (uint64_t)v.val;
+ *env_setp = 1;
+ continue;
+ }
+ if (string_match("helium_env_write_cache_size", k.str, k.len)) {
+ envp->write_cache_size = (uint64_t)v.val;
+ *env_setp = 1;
+ continue;
+ }
+ if (string_match("helium_o_volume_truncate", k.str, k.len)) {
+ if (v.val != 0)
+ *flagsp |= HE_O_VOLUME_TRUNCATE;
+ continue;
+ }
+ EMSG_ERR(wtext, NULL, EINVAL,
+ "unknown configuration key value pair %.*s=%.*s",
+ (int)k.len, k.str, (int)v.len, v.str);
+ }
+ if (ret == WT_NOTFOUND)
+ ret = 0;
+ if (ret != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_CONFIG_PARSER.next: %s", wtext->strerror(ret));
+
+err: if ((tret = config_parser->close(config_parser)) != 0)
+ EMSG(wtext, NULL, tret,
+ "WT_CONFIG_PARSER.close: %s", wtext->strerror(tret));
+
+ return (ret);
+}
+
+/*
+ * helium_source_open --
+ * Allocate and open a Helium source.
+ */
+static int
+helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v)
+{
+ struct he_env env;
+ HELIUM_SOURCE *hs;
+ WT_EXTENSION_API *wtext;
+ int env_set, flags, ret = 0;
+
+ wtext = ds->wtext;
+ hs = NULL;
+
+ VMSG(wtext, NULL, VERBOSE_L1, "volume %.*s=%.*s",
+ (int)k->len, k->str, (int)v->len, v->str);
+
+ /*
+ * Check for a Helium source we've already opened: we don't check the
+ * value (which implies you can open the same underlying stores using
+ * more than one name, but I don't know of any problems that causes),
+ * we only check the key, that is, the top-level WiredTiger name.
+ */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if (string_match(hs->name, k->str, k->len))
+ ERET(wtext, NULL,
+ EINVAL, "%s: device already open", hs->name);
+
+ /* Allocate and initialize a new underlying Helium source object. */
+ if ((hs = calloc(1, sizeof(*hs))) == NULL ||
+ (hs->name = calloc(1, k->len + 1)) == NULL) {
+ free(hs);
+ return (os_errno());
+ }
+ memcpy(hs->name, k->str, k->len);
+ hs->txn_notify.notify = txn_notify;
+ hs->wtext = wtext;
+
+ /* Read the configuration, require a device naming the Helium store. */
+ memset(&env, 0, sizeof(env));
+ if ((ret = helium_config_read(
+ wtext, v, &hs->device, &env, &env_set, &flags)) != 0)
+ goto err;
+ if (hs->device == NULL)
+ EMSG_ERR(wtext, NULL,
+ EINVAL, "%s: no Helium volumes specified", hs->name);
+
+ /*
+ * Open the Helium volume, creating it if necessary. We have to open
+ * an object at the same time, that's why we have object flags as well
+ * as volume flags.
+ */
+ flags |= HE_O_CREATE |
+ HE_O_TRUNCATE | HE_O_VOLUME_CLEAN | HE_O_VOLUME_CREATE;
+ if ((hs->he_volume = he_open(
+ hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) {
+ ret = os_errno();
+ EMSG_ERR(wtext, NULL, ret,
+ "he_open: %s: %s: %s",
+ hs->name, WT_NAME_INIT, he_strerror(ret));
+ }
+
+ /* Insert the new entry at the head of the list. */
+ hs->next = ds->hs_head;
+ ds->hs_head = hs;
+
+ if (0) {
+err: if (hs != NULL)
+ ESET(helium_source_close(wtext, NULL, hs));
+ }
+ return (ret);
+}
+
+/*
+ * helium_source_open_txn --
+ * Open the database-wide transaction store.
+ */
+static int
+helium_source_open_txn(DATA_SOURCE *ds)
+{
+ HELIUM_SOURCE *hs, *hs_txn;
+ WT_EXTENSION_API *wtext;
+ he_t he_txn, t;
+ int ret = 0;
+
+ wtext = ds->wtext;
+
+ /*
+ * The global txn namespace is per connection, it spans multiple Helium
+ * sources.
+ *
+ * We've opened the Helium sources: check to see if any of them already
+ * have a transaction store, and make sure we only find one.
+ */
+ hs_txn = NULL;
+ he_txn = NULL;
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) {
+ if (hs_txn != NULL) {
+ (void)he_close(t);
+ (void)he_close(hs_txn);
+ ERET(wtext, NULL, WT_PANIC,
+ "found multiple transaction stores, "
+ "unable to proceed");
+ }
+ he_txn = t;
+ hs_txn = hs;
+ }
+
+ /*
+ * If we didn't find a transaction store, open a transaction store in
+ * the first Helium source we loaded. (It could just as easily be
+ * the last one we loaded, we're just picking one, but picking the first
+ * seems slightly less likely to make people wonder.)
+ */
+ if ((hs = hs_txn) == NULL) {
+ for (hs = ds->hs_head; hs->next != NULL; hs = hs->next)
+ ;
+ if ((he_txn = he_open(
+ hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) {
+ ret = os_errno();
+ ERET(wtext, NULL, ret,
+ "he_open: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(ret));
+ }
+
+ /* Push the change. */
+ if ((ret = he_commit(he_txn)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_commit: %s", he_strerror(ret));
+ }
+ VMSG(wtext, NULL, VERBOSE_L1, "%s" "transactional store on %s",
+ hs_txn == NULL ? "creating " : "", hs->name);
+
+ /* Set the owner field, this Helium source has to be closed last. */
+ hs->he_owner = 1;
+
+ /* Add a reference to the transaction store in each Helium source. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ hs->he_txn = he_txn;
+
+ return (0);
+}
+
+/*
+ * helium_source_recover_namespace --
+ * Recover a single cache/primary pair in a Helium namespace.
+ */
+static int
+helium_source_recover_namespace(WT_DATA_SOURCE *wtds,
+ HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config)
+{
+ CURSOR *cursor;
+ DATA_SOURCE *ds;
+ WT_CURSOR *wtcursor;
+ WT_EXTENSION_API *wtext;
+ WT_SOURCE *ws;
+ size_t len;
+ int ret = 0;
+ const char *p;
+ char *uri;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ cursor = NULL;
+ ws = NULL;
+ uri = NULL;
+
+ /*
+ * The name we store on the Helium device is a translation of the
+ * WiredTiger name: do the reverse process here so we can use the
+ * standard source-open function.
+ */
+ p = name + strlen(WT_NAME_PREFIX);
+ len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10;
+ if ((uri = malloc(len)) == NULL) {
+ ret = os_errno();
+ goto err;
+ }
+ (void)snprintf(uri, len, "helium:%s/%s", hs->name, p);
+
+ /*
+ * Open the cache/primary pair by going through the full open process,
+ * instantiating the underlying WT_SOURCE object.
+ */
+ if ((ret = ws_source_open(wtds, NULL, uri, config, 0, &ws)) != 0)
+ goto err;
+ if ((ret = unlock(wtext, NULL, &ws->lock)) != 0)
+ goto err;
+
+ /* Fake up a cursor. */
+ if ((ret = fake_cursor(wtext, &wtcursor)) != 0)
+ EMSG_ERR(wtext, NULL, ret, "recovery: %s", strerror(ret));
+ cursor = (CURSOR *)wtcursor;
+ cursor->ws = ws;
+
+ /* Process, then clear, the cache. */
+ if ((ret = cache_cleaner(wtext, wtcursor, 0, NULL)) != 0)
+ goto err;
+ if ((ret = he_truncate(ws->he_cache)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_truncate: %s(cache): %s", ws->uri, he_strerror(ret));
+
+ /* Close the underlying WiredTiger sources. */
+err: while ((ws = hs->ws_head) != NULL) {
+ hs->ws_head = ws->next;
+ ESET(ws_source_close(wtext, NULL, ws));
+ }
+
+ cursor_destroy(cursor);
+ free(uri);
+
+ return (ret);
+}
+
+struct helium_namespace_cookie {
+ char **list;
+ u_int list_cnt;
+ u_int list_max;
+};
+
+/*
+ * helium_namespace_list --
+ * Get a list of the objects we're going to recover.
+ */
+static int
+helium_namespace_list(void *cookie, const char *name)
+{
+ struct helium_namespace_cookie *names;
+ void *allocp;
+
+ names = cookie;
+
+ /*
+ * Ignore any files without a WiredTiger prefix.
+ * Ignore the metadata and cache files.
+ */
+ if (!prefix_match(name, WT_NAME_PREFIX))
+ return (0);
+ if (strcmp(name, WT_NAME_INIT) == 0)
+ return (0);
+ if (strcmp(name, WT_NAME_TXN) == 0)
+ return (0);
+ if (string_match(
+ strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE)))
+ return (0);
+
+ if (names->list_cnt + 1 >= names->list_max) {
+ if ((allocp = realloc(names->list,
+ (names->list_max + 20) * sizeof(names->list[0]))) == NULL)
+ return (os_errno());
+ names->list = allocp;
+ names->list_max += 20;
+ }
+ if ((names->list[names->list_cnt] = strdup(name)) == NULL)
+ return (os_errno());
+ ++names->list_cnt;
+ names->list[names->list_cnt] = NULL;
+ return (0);
+}
+
+/*
+ * helium_source_recover --
+ * Recover the HELIUM_SOURCE.
+ */
+static int
+helium_source_recover(
+ WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config)
+{
+ struct helium_namespace_cookie names;
+ DATA_SOURCE *ds;
+ WT_EXTENSION_API *wtext;
+ u_int i;
+ int ret = 0;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+ memset(&names, 0, sizeof(names));
+
+ VMSG(wtext, NULL, VERBOSE_L1, "recover %s", hs->name);
+
+ /* Get a list of the cache/primary object pairs in the Helium source. */
+ if ((ret = he_enumerate(
+ hs->device, helium_namespace_list, &names)) != 0)
+ ERET(wtext, NULL, ret,
+ "he_enumerate: %s: %s", hs->name, he_strerror(ret));
+
+ /* Recover the objects. */
+ for (i = 0; i < names.list_cnt; ++i)
+ if ((ret = helium_source_recover_namespace(
+ wtds, hs, names.list[i], config)) != 0)
+ goto err;
+
+ /* Clear the transaction store. */
+ if ((ret = he_truncate(hs->he_txn)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "he_truncate: %s: %s: %s",
+ hs->name, WT_NAME_TXN, he_strerror(ret));
+
+err: for (i = 0; i < names.list_cnt; ++i)
+ free(names.list[i]);
+ free(names.list);
+
+ return (ret);
+}
+
+/*
+ * helium_terminate --
+ * Unload the data-source.
+ */
+static int
+helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session)
+{
+ DATA_SOURCE *ds;
+ HELIUM_SOURCE *hs, *last;
+ WT_EXTENSION_API *wtext;
+ int ret = 0;
+
+ ds = (DATA_SOURCE *)wtds;
+ wtext = ds->wtext;
+
+ /* Lock the system down. */
+ if (ds->lockinit)
+ ret = writelock(wtext, session, &ds->global_lock);
+
+ /*
+ * Close the Helium sources, close the Helium source that "owns" the
+ * database transaction store last.
+ */
+ last = NULL;
+ while ((hs = ds->hs_head) != NULL) {
+ ds->hs_head = hs->next;
+ if (hs->he_owner) {
+ last = hs;
+ continue;
+ }
+ ESET(helium_source_close(wtext, session, hs));
+ }
+ if (last != NULL)
+ ESET(helium_source_close(wtext, session, last));
+
+ /* Unlock and destroy the system. */
+ if (ds->lockinit) {
+ ESET(unlock(wtext, session, &ds->global_lock));
+ ESET(lock_destroy(wtext, NULL, &ds->global_lock));
+ }
+
+ OVERWRITE_AND_FREE(ds);
+
+ return (ret);
+}
+
+/*
+ * wiredtiger_extension_init --
+ * Initialize the Helium connector code.
+ */
+int
+wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ /*
+ * List of the WT_DATA_SOURCE methods -- it's static so it breaks at
+ * compile-time should the structure change underneath us.
+ */
+ static const WT_DATA_SOURCE wtds = {
+ helium_session_create, /* session.create */
+ NULL, /* No session.compaction */
+ helium_session_drop, /* session.drop */
+ helium_session_open_cursor, /* session.open_cursor */
+ helium_session_rename, /* session.rename */
+ NULL, /* No session.salvage */
+ helium_session_truncate, /* session.truncate */
+ NULL, /* No session.range_truncate */
+ helium_session_verify, /* session.verify */
+ helium_session_checkpoint, /* session.checkpoint */
+ helium_terminate /* termination */
+ };
+ static const char *session_create_opts[] = {
+ "helium_o_compress=0", /* HE_I_COMPRESS */
+ "helium_o_truncate=0", /* HE_O_TRUNCATE */
+ NULL
+ };
+ DATA_SOURCE *ds;
+ HELIUM_SOURCE *hs;
+ WT_CONFIG_ITEM k, v;
+ WT_CONFIG_PARSER *config_parser;
+ WT_EXTENSION_API *wtext;
+ int vmajor, vminor, ret = 0;
+ const char **p;
+
+ config_parser = NULL;
+ ds = NULL;
+
+ wtext = connection->get_extension_api(connection);
+
+ /* Check the library version */
+#if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 2
+ ERET(wtext, NULL, EINVAL,
+ "unsupported Levyx/Helium header file %d.%d, expected version 2.2",
+ HE_VERSION_MAJOR, HE_VERSION_MINOR);
+#endif
+ he_version(&vmajor, &vminor);
+ if (vmajor != 2 || vminor != 2)
+ ERET(wtext, NULL, EINVAL,
+ "unsupported Levyx/Helium library version %d.%d, expected "
+ "version 2.2", vmajor, vminor);
+
+ /* Allocate and initialize the local data-source structure. */
+ if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL)
+ return (os_errno());
+ ds->wtds = wtds;
+ ds->wtext = wtext;
+ if ((ret = lock_init(wtext, NULL, &ds->global_lock)) != 0)
+ goto err;
+ ds->lockinit = 1;
+
+ /* Get the configuration string. */
+ if ((ret = wtext->config_get(wtext, NULL, config, "config", &v)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_EXTENSION_API.config_get: config: %s",
+ wtext->strerror(ret));
+
+ /* Step through the list of Helium sources, opening each one. */
+ if ((ret = wtext->config_parser_open(
+ wtext, NULL, v.str, v.len, &config_parser)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_EXTENSION_API.config_parser_open: config: %s",
+ wtext->strerror(ret));
+ while ((ret = config_parser->next(config_parser, &k, &v)) == 0) {
+ if (string_match("helium_verbose", k.str, k.len)) {
+ verbose = v.val == 0 ? 0 : 1;
+ continue;
+ }
+ if ((ret = helium_source_open(ds, &k, &v)) != 0)
+ goto err;
+ }
+ if (ret != WT_NOTFOUND)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_CONFIG_PARSER.next: config: %s",
+ wtext->strerror(ret));
+ if ((ret = config_parser->close(config_parser)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_CONFIG_PARSER.close: config: %s",
+ wtext->strerror(ret));
+ config_parser = NULL;
+
+ /* Find and open the database transaction store. */
+ if ((ret = helium_source_open_txn(ds)) != 0)
+ return (ret);
+
+ /* Recover each Helium source. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if ((ret = helium_source_recover(&ds->wtds, hs, config)) != 0)
+ goto err;
+
+ /* Start each Helium source cleaner thread. */
+ for (hs = ds->hs_head; hs != NULL; hs = hs->next)
+ if ((ret = pthread_create(
+ &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "%s: pthread_create: cleaner thread: %s",
+ hs->name, strerror(ret));
+
+ /* Add Helium-specific WT_SESSION.create configuration options. */
+ for (p = session_create_opts; *p != NULL; ++p)
+ if ((ret = connection->configure_method(connection,
+ "session.create", "helium:", *p, "boolean", NULL)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_CONNECTION.configure_method: session.create: "
+ "%s: %s",
+ *p, wtext->strerror(ret));
+
+ /* Add the data source */
+ if ((ret = connection->add_data_source(
+ connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0)
+ EMSG_ERR(wtext, NULL, ret,
+ "WT_CONNECTION.add_data_source: %s", wtext->strerror(ret));
+ return (0);
+
+err: if (ds != NULL)
+ ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL));
+ if (config_parser != NULL)
+ (void)config_parser->close(config_parser);
+ return (ret);
+}
+
+/*
+ * wiredtiger_extension_terminate --
+ * Shutdown the Helium connector code.
+ */
+int
+wiredtiger_extension_terminate(WT_CONNECTION *connection)
+{
+ (void)connection; /* Unused parameters */
+
+ return (0);
+}