diff options
author | Luke Chen <luke.chen@mongodb.com> | 2019-05-13 13:37:59 +1000 |
---|---|---|
committer | Luke Chen <luke.chen@mongodb.com> | 2019-05-13 13:37:59 +1000 |
commit | 173232c848619139321b08ebd2e20cb47c895163 (patch) | |
tree | 7d66406ac31ba1054b8bedf93ef88fc0fc33793d /src/third_party/wiredtiger | |
parent | 451c675f4bdcae94c7562ff0bef2090f79a807f7 (diff) | |
download | mongo-173232c848619139321b08ebd2e20cb47c895163.tar.gz |
Import wiredtiger: 315563f28850026673ebb146b6f3d727178e58bc from branch mongodb-4.2
ref: d9ec69f911..315563f288
for: 4.1.12
WT-3929 Fix "pip install" to work with Python3
WT-4669 Convert operation tracking logs to t2 format
WT-4750 Sweep can remove active lookaside records when files are closed and re-opened
WT-4765 Remove support for Helium/Levyx data sources in WiredTiger
WT-4766 Python3: fix wt_ckpt_decode.py to work with python2
WT-4767 Compatibility tests failed after 3.2.0 release
Diffstat (limited to 'src/third_party/wiredtiger')
50 files changed, 826 insertions, 3930 deletions
diff --git a/src/third_party/wiredtiger/NEWS b/src/third_party/wiredtiger/NEWS index 1e821835386..3f14018779f 100644 --- a/src/third_party/wiredtiger/NEWS +++ b/src/third_party/wiredtiger/NEWS @@ -1,6 +1,23 @@ Ticket reference tags refer to tickets in the MongoDB JIRA tracking system: https://jira.mongodb.org +WiredTiger release 3.2.0, 2019-05-09 +------------------------------------ + +See the upgrading documentation for details of API and behavior changes. + +Significant changes: +* WT-3968 Use compression ratio to tune page sizes +* WT-4156 Add a new salvage API option to wiredtiger_open +* WT-4192 Remove support for raw compression in WiredTiger +* WT-4426 Change WiredTiger data format to optionally include timestamps in cells +* WT-4447 Add a quota like system to manage I/O per subsystem +* WT-4670 Remove support for LevelDB APIs in WiredTiger +* WT-4765 Remove support for Helium/Levyx data sources in WiredTiger + +See JIRA changelog for a full listing: +https://jira.mongodb.org/projects/WT/versions/21117 + WiredTiger release 3.1.0, 2018-07-12 ------------------------------------ diff --git a/src/third_party/wiredtiger/README b/src/third_party/wiredtiger/README index fd8757621bf..29ec134437d 100644 --- a/src/third_party/wiredtiger/README +++ b/src/third_party/wiredtiger/README @@ -1,6 +1,6 @@ -WiredTiger 3.1.1: (July 12, 2018) +WiredTiger 3.2.0: (May 9, 2019) -This is version 3.1.1 of WiredTiger. +This is version 3.2.0 of WiredTiger. WiredTiger release packages and documentation can be found at: @@ -8,7 +8,7 @@ WiredTiger release packages and documentation can be found at: The documentation for this specific release can be found at: - http://source.wiredtiger.com/3.1.1/index.html + http://source.wiredtiger.com/3.2.0/index.html The WiredTiger source code can be found at: diff --git a/src/third_party/wiredtiger/RELEASE_INFO b/src/third_party/wiredtiger/RELEASE_INFO index 2014ba3ee74..198843d850c 100644 --- a/src/third_party/wiredtiger/RELEASE_INFO +++ b/src/third_party/wiredtiger/RELEASE_INFO @@ -1,6 +1,6 @@ WIREDTIGER_VERSION_MAJOR=3 -WIREDTIGER_VERSION_MINOR=1 -WIREDTIGER_VERSION_PATCH=1 +WIREDTIGER_VERSION_MINOR=2 +WIREDTIGER_VERSION_PATCH=0 WIREDTIGER_VERSION="$WIREDTIGER_VERSION_MAJOR.$WIREDTIGER_VERSION_MINOR.$WIREDTIGER_VERSION_PATCH" WIREDTIGER_RELEASE_DATE=`date "+%B %e, %Y"` diff --git a/src/third_party/wiredtiger/bench/wtperf/wtperf.c b/src/third_party/wiredtiger/bench/wtperf/wtperf.c index 27f9582dc25..2005bd20ae8 100644 --- a/src/third_party/wiredtiger/bench/wtperf/wtperf.c +++ b/src/third_party/wiredtiger/bench/wtperf/wtperf.c @@ -2436,10 +2436,9 @@ static void usage(void) { printf("wtperf [-C config] " - "[-H mount] [-h home] [-O file] [-o option] [-T config]\n"); + "[-h home] [-O file] [-o option] [-T config]\n"); printf("\t-C <string> additional connection configuration\n"); printf("\t (added to option conn_config)\n"); - printf("\t-H <mount> configure Helium volume mount point\n"); printf("\t-h <string> Wired Tiger home must exist, default WT_TEST\n"); printf("\t-O <file> file contains options as listed below\n"); printf("\t-o option=val[,option=val,...] set options listed below\n"); diff --git a/src/third_party/wiredtiger/build_posix/Make.subdirs b/src/third_party/wiredtiger/build_posix/Make.subdirs index a6d369dac3e..d598c74c631 100644 --- a/src/third_party/wiredtiger/build_posix/Make.subdirs +++ b/src/third_party/wiredtiger/build_posix/Make.subdirs @@ -13,7 +13,6 @@ ext/compressors/nop ext/compressors/snappy SNAPPY ext/compressors/zlib ZLIB ext/compressors/zstd ZSTD -ext/datasources/helium HAVE_HELIUM ext/encryptors/nop ext/encryptors/rotn ext/extractors/csv diff --git a/src/third_party/wiredtiger/build_posix/aclocal/version-set.m4 b/src/third_party/wiredtiger/build_posix/aclocal/version-set.m4 index 8b39a5d09d6..687d7bd3786 100644 --- a/src/third_party/wiredtiger/build_posix/aclocal/version-set.m4 +++ b/src/third_party/wiredtiger/build_posix/aclocal/version-set.m4 @@ -1,14 +1,14 @@ dnl build by dist/s_version VERSION_MAJOR=3 -VERSION_MINOR=1 -VERSION_PATCH=1 -VERSION_STRING='"WiredTiger 3.1.1: (July 12, 2018)"' +VERSION_MINOR=2 +VERSION_PATCH=0 +VERSION_STRING='"WiredTiger 3.2.0: (May 9, 2019)"' AC_SUBST(VERSION_MAJOR) AC_SUBST(VERSION_MINOR) AC_SUBST(VERSION_PATCH) AC_SUBST(VERSION_STRING) -VERSION_NOPATCH=3.1 +VERSION_NOPATCH=3.2 AC_SUBST(VERSION_NOPATCH) diff --git a/src/third_party/wiredtiger/build_posix/aclocal/version.m4 b/src/third_party/wiredtiger/build_posix/aclocal/version.m4 index a0149218ccd..fba1a92b1d5 100644 --- a/src/third_party/wiredtiger/build_posix/aclocal/version.m4 +++ b/src/third_party/wiredtiger/build_posix/aclocal/version.m4 @@ -1,2 +1,2 @@ dnl WiredTiger product version for AC_INIT. Maintained by dist/s_version -3.1.0 +3.2.0 diff --git a/src/third_party/wiredtiger/build_posix/configure.ac.in b/src/third_party/wiredtiger/build_posix/configure.ac.in index 601c01de1fd..e99d4dcc0c7 100644 --- a/src/third_party/wiredtiger/build_posix/configure.ac.in +++ b/src/third_party/wiredtiger/build_posix/configure.ac.in @@ -249,16 +249,6 @@ AC_MSG_RESULT($with_berkeleydb) AM_CONDITIONAL([HAVE_BERKELEY_DB], [test -d $with_berkeleydb]) AC_SUBST(BERKELEY_DB_PATH, [$with_berkeleydb]) -# test/format optionally supports the Levyx/Helium key/value store. -AC_MSG_CHECKING([if --with-helium=DIR option specified]) -AC_ARG_WITH(helium, - [AS_HELP_STRING([--with-helium=DIR], - [Specify installed library directory of Helium])], - [with_helium="$withval"], [with_helium="NO_HELIUM_LIBRARY"]) -AC_MSG_RESULT($with_helium) -AM_CONDITIONAL([HAVE_HELIUM], [test -d $with_helium]) -AC_SUBST(HELIUM_PATH, [$with_helium]) - # Warn that diagnostic builds should not be used in production if test "$wt_cv_enable_diagnostic" = "yes"; then AC_MSG_WARN( diff --git a/src/third_party/wiredtiger/dist/extlist b/src/third_party/wiredtiger/dist/extlist index a5515642d48..f7f7c460035 100644 --- a/src/third_party/wiredtiger/dist/extlist +++ b/src/third_party/wiredtiger/dist/extlist @@ -5,7 +5,6 @@ ext/collators/reverse/reverse_collator.c ext/compressors/nop/nop_compress.c ext/compressors/snappy/snappy_compress.c ext/compressors/zlib/zlib_compress.c -ext/datasources/helium/helium.c ext/encryptors/nop/nop_encrypt.c ext/encryptors/rotn/rotn_encrypt.c ext/extractors/csv/csv_extractor.c diff --git a/src/third_party/wiredtiger/dist/s_errno b/src/third_party/wiredtiger/dist/s_errno index 9981842e458..1bfc5e7f1a0 100644 --- a/src/third_party/wiredtiger/dist/s_errno +++ b/src/third_party/wiredtiger/dist/s_errno @@ -37,9 +37,6 @@ error_ok() # Loop through source files. for f in `find ext src -name '*.[ci]'`; do - if expr "$f" : 'ext/datasources/helium/helium.c' > /dev/null; then - continue - fi if expr "$f" : 'src/os_win/os_winerr.c' > /dev/null; then continue fi diff --git a/src/third_party/wiredtiger/dist/s_style b/src/third_party/wiredtiger/dist/s_style index 9c1dd6fa506..92ac94a3df1 100755 --- a/src/third_party/wiredtiger/dist/s_style +++ b/src/third_party/wiredtiger/dist/s_style @@ -88,7 +88,6 @@ else fi if ! expr "$f" : 'examples/c/.*' > /dev/null && - ! expr "$f" : 'ext/datasources/helium/helium.c' > /dev/null && ! expr "$f" : 'src/include/os.h' > /dev/null && egrep "%[0-9]*zu" $f | grep -v 'SIZET_FMT' > $t; then echo "$f: %zu needs to be fixed for Windows" diff --git a/src/third_party/wiredtiger/dist/s_void b/src/third_party/wiredtiger/dist/s_void index 6c2b8b34040..fcbe2b8a9a0 100755 --- a/src/third_party/wiredtiger/dist/s_void +++ b/src/third_party/wiredtiger/dist/s_void @@ -93,9 +93,6 @@ func_ok() -e '/int fail_fs_terminate$/d' \ -e '/int handle_message$/d' \ -e '/int handle_progress$/d' \ - -e '/int helium_cursor_reserve$/d' \ - -e '/int helium_cursor_reset$/d' \ - -e '/int helium_session_verify$/d' \ -e '/int index_compare_S$/d' \ -e '/int index_compare_primary$/d' \ -e '/int index_compare_u$/d' \ diff --git a/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am b/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am deleted file mode 100644 index b4e6e67e2cd..00000000000 --- a/src/third_party/wiredtiger/ext/datasources/helium/Makefile.am +++ /dev/null @@ -1,11 +0,0 @@ -AM_CPPFLAGS = -I$(top_builddir) \ - -I$(top_srcdir)/src/include -I$(HELIUM_PATH) - -noinst_LTLIBRARIES = libwiredtiger_helium.la -libwiredtiger_helium_la_SOURCES = helium.c -libwiredtiger_helium_la_LIBADD = -L$(HELIUM_PATH) -lhe - -# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well -# as installation, it will only build static libraries. As far as I can tell, -# the "approved" libtool way to turn them back on is by adding -rpath. -libwiredtiger_helium_la_LDFLAGS = -avoid-version -module -rpath /nowhere diff --git a/src/third_party/wiredtiger/ext/datasources/helium/README b/src/third_party/wiredtiger/ext/datasources/helium/README deleted file mode 100644 index e78ba58c71d..00000000000 --- a/src/third_party/wiredtiger/ext/datasources/helium/README +++ /dev/null @@ -1,125 +0,0 @@ -Helium README. - -The data structures are "Helium sources" which map to one or more physical -volumes; each Helium source supports any number of "WiredTiger sources", -where a WiredTiger source is an object similar to a Btree "file:" object. -Each WiredTiger source supports any number of WiredTiger cursors. - -Each Helium source is given a logical name when first referenced, and that -logical name is subsequently used when a WiredTiger source is created. For -example, the logical name for a Helium source might be "dev1", and it would -map to the Helium volumes /dev/sd0 and /dev/sd1; subsequent WT_SESSION.create -calls specify a URI like "table:dev1/my_table". - -For each WiredTiger source, we create two namespaces on the underlying device, -a "cache" and a "primary". - -The cache contains key/value pairs based on updates or changes that have been -made, and includes transactional information. So, for example, if transaction -3 modifies key/value pair "foo/aaa", and then transaction 4 removes key "foo", -then transaction 5 inserts key/value pair "foo/bbb", the entry in the cache -will look something like: - - Key: foo - Value: [transaction ID 3] [aaa] - [transaction ID 4] [remove] - [transaction ID 5] [bbb] - -Obviously, we have to marshall/unmarshall these values to/from the cache. - -In contrast, the primary contains only key/value pairs known to be committed -and visible to any reader. - -When an insert, update or remove is done: - acquire a lock - read any matching key from the cache - check to see if the update can proceed - append a new value for this transaction - release the lock - -When a search is done: - if there's a matching key/value pair in the cache { - if there's an item visible to the reading transaction - return it - } - if there's a matching key/value pair in the primary { - return it - } - -When a next/prev is done: - move to the next/prev visible item in the cache - move to the next/prev visible item in the primary - return the one closest to the starting position - -Locks are not acquired for read operations, and no flushes are done for any of -these operations. - -We also create one additional object, the transaction name space, which serves -all of the WiredTiger and Helium objects in a WiredTiger connection. Whenever -a transaction involving a Helium source commits, we insert a commit record into -the transaction name space and flush the device. When a transaction rolls back, -we insert an abort record into the txn name space, but don't flush the device. - -The visibility check is slightly different than the rest of WiredTiger: we do -not reset anything when a transaction aborts, and so we have to check if the -transaction has been aborted as well as check the transaction ID for visibility. - -We create a "cleanup" thread for every underlying Helium source. The job of -this thread is to migrate rows from the cache object into the primary. Any -committed, globally visible change in the cache can be copied into the primary -and removed from the cache: - - set BaseTxnID to the oldest transaction ID - not yet visible to a running transaction - - for each row in the cache: - if all of the updates are greater than BaseTxnID - copy the last update to the primary - - flush the primary to stable storage - - lock the cache - for each row in the cache: - if all of the updates are greater than BaseTxnID - remove the row from the cache - unlock the cache - - for each row in the transaction store: - if the transaction ID is less than BaseTxnID - remove the row - -We only need to lock the cache when removing rows, the initial copy to the -primary does not require locks because only the cleanup thread ever writes -to the primary. - -No lock is required when removing rows from the transaction store, once the -transaction ID is less than the BaseTxnID, it will never be read. - -Helium recovery is almost identical to the cleanup thread, which migrates rows -from the cache into the primary. For every cache/primary pair, migrate every -commit to the primary (by definition, at recovery time it must be globally -visible), and discard everything else (by definition, at recovery time anything -not committed has been aborted. - -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= -Questions, problems, whatever: - -* The implementation is endian-specific, that is, the WiredTiger metadata -stored on the Helium device is on not portable to a big-endian machine. -Helium's metadata is portable between different endian machines, so this -should probably be fixed. - -* There's a problem with transactions in WiredTiger that span more than a -single data source. For example, consider a transaction that modifies -both a Helium object and a Btree object. If we commit and push the Helium -commit record to stable storage, and then crash before committing the Btree -change, the enclosing WiredTiger transaction will/should end up aborting, -and there's no way for us to back out the change in Helium. I'm leaving -this problem alone until WiredTiger fine-grained durability is complete, -we're going to need WiredTiger support for some kind of 2PC to solve this. - -* If a record in the cache gets too busy, we could end up unable to remove -it (there would always be an active transaction), and it would grow forever. -I suspect the solution is to clean it up when we realize we can't remove it, -that is, we can rebuild the record, discarding the no longer needed entries, -even if the record can't be entirely discarded. diff --git a/src/third_party/wiredtiger/ext/datasources/helium/helium.c b/src/third_party/wiredtiger/ext/datasources/helium/helium.c deleted file mode 100644 index 43c300a74d8..00000000000 --- a/src/third_party/wiredtiger/ext/datasources/helium/helium.c +++ /dev/null @@ -1,3445 +0,0 @@ -/*- - * Public Domain 2014-2019 MongoDB, Inc. - * Public Domain 2008-2014 WiredTiger, Inc. - * - * This is free and unencumbered software released into the public domain. - * - * Anyone is free to copy, modify, publish, use, compile, sell, or - * distribute this software, either in source code form or as a compiled - * binary, for any purpose, commercial or non-commercial, and by any - * means. - * - * In jurisdictions that recognize copyright laws, the author or authors - * of this software dedicate any and all copyright interest in the - * software to the public domain. We make this dedication for the benefit - * of the public at large and to the detriment of our heirs and - * successors. We intend this dedication to be an overt act of - * relinquishment in perpetuity of all present and future rights to this - * software under copyright law. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. - * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR - * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, - * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR - * OTHER DEALINGS IN THE SOFTWARE. - */ -#include <sys/select.h> - -#include <ctype.h> -#include <errno.h> -#include <inttypes.h> -#include <pthread.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <he.h> - -#include <wiredtiger.h> -#include <wiredtiger_ext.h> - -typedef struct he_env HE_ENV; -typedef struct he_item HE_ITEM; - -static int verbose = 0; /* Verbose messages */ - -#define WT_ERR(a) do { \ - if ((ret = (a)) != 0) \ - goto err; \ -} while (0) -#define WT_RET(a) do { \ - int __ret; \ - if ((__ret = (a)) != 0) \ - return (__ret); \ -} while (0) - -/* - * Macros to output error and verbose messages, and set or return an error. - * Error macros require local "ret" variable. - * - * ESET: update an error value, handling more/less important errors. - * ERET: output a message, return the error. - * EMSG: output a message, set the local error value. - * EMSG_ERR: - * output a message, set the local error value, jump to the err label. - * VMSG: verbose message. - */ -#undef ESET -#define ESET(a) do { \ - int __v; \ - if ((__v = (a)) != 0) { \ - /* \ - * On error, check for a panic (it overrides all other \ - * returns). Else, if there's no return value or the \ - * return value is not strictly an error, override it \ - * with the error. \ - */ \ - if (__v == WT_PANIC || \ - ret == 0 || \ - ret == WT_DUPLICATE_KEY || ret == WT_NOTFOUND) \ - ret = __v; \ - /* \ - * We don't want to return Helium errors to our caller. \ - * Map non-system errors (indicated by a negative \ - * value), outside the WiredTiger error name space, to a\ - * generic WiredTiger error. \ - */ \ - if (ret < -31999 || (ret > -31800 && ret < 0)) \ - ret = WT_ERROR; \ - } \ -} while (0) -#undef ERET -#define ERET(wt_api, session, v, ...) do { \ - (void) \ - wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\ - ESET(v); \ - return (ret); \ -} while (0) -#undef EMSG -#define EMSG(wt_api, session, v, ...) do { \ - (void) \ - wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\ - ESET(v); \ -} while (0) -#undef EMSG_ERR -#define EMSG_ERR(wt_api, session, v, ...) do { \ - (void) \ - wt_api->err_printf(wt_api, session, "helium: " __VA_ARGS__);\ - ESET(v); \ - goto err; \ -} while (0) -#undef VERBOSE_L1 -#define VERBOSE_L1 1 -#undef VERBOSE_L2 -#define VERBOSE_L2 2 -#undef VMSG -#define VMSG(wt_api, session, v, ...) do { \ - if (verbose >= v) \ - (void)wt_api-> \ - msg_printf(wt_api, session, "helium: " __VA_ARGS__);\ -} while (0) - -/* - * OVERWRITE_AND_FREE -- - * Make sure we don't re-use a structure after it's dead. - */ -#undef OVERWRITE_AND_FREE -#define OVERWRITE_AND_FREE(p) do { \ - memset(p, 0xab, sizeof(*(p))); \ - free(p); \ -} while (0) - -/* - * Version each object, out of sheer raging paranoia. - */ -#define WIREDTIGER_HELIUM_MAJOR 1 /* Major, minor version */ -#define WIREDTIGER_HELIUM_MINOR 0 - -/* - * WiredTiger name space on the Helium store: all objects are named with the - * WiredTiger prefix (we don't require the Helium store be exclusive to our - * files). Primary objects are named "WiredTiger.[name]", associated cache - * objects are "WiredTiger.[name].cache". The per-connection transaction - * object is "WiredTiger.WiredTigerTxn". When we first open a Helium volume, - * we open/close a file in order to apply flags for the first open of the - * volume, that's "WiredTiger.WiredTigerInit". - */ -#define WT_NAME_PREFIX "WiredTiger." -#define WT_NAME_INIT "WiredTiger.WiredTigerInit" -#define WT_NAME_TXN "WiredTiger.WiredTigerTxn" -#define WT_NAME_CACHE ".cache" - -/* - * WT_SOURCE -- - * A WiredTiger source, supporting one or more cursors. - */ -typedef struct __wt_source { - char *uri; /* Unique name */ - - pthread_rwlock_t lock; /* Lock */ - bool lockinit; /* Lock created */ - - bool configured; /* If structure configured */ - u_int ref; /* Active reference count */ - - uint64_t append_recno; /* Allocation record number */ - - bool config_bitfield; /* config "value_format=#t" */ - bool config_recno; /* config "key_format=r" */ - - /* - * Each WiredTiger object has a "primary" namespace in a Helium store - * plus a "cache" namespace, which has not-yet-resolved updates. There - * is a dirty flag so read-only data sets can ignore the cache. - */ - he_t he; /* Underlying Helium object */ - he_t he_cache; /* Underlying Helium cache */ - bool he_cache_inuse; /* Cache is in use */ - int he_cache_ops; /* Operations since cleaning */ - - struct __he_source *hs; /* Underlying Helium source */ - struct __wt_source *next; /* List of WiredTiger objects */ -} WT_SOURCE; - -/* - * HELIUM_SOURCE -- - * A Helium volume, supporting one or more WT_SOURCE objects. - */ -typedef struct __he_source { - /* The transaction commit handler must appear first in the structure. */ - WT_TXN_NOTIFY txn_notify; /* Transaction commit handler */ - - WT_EXTENSION_API *wt_api; /* Extension functions */ - - char *name; /* Unique WiredTiger name */ - char *device; /* Unique Helium volume name */ - - /* - * Maintain a handle for each underlying Helium source so checkpoint is - * faster, we can "commit" a single handle per source, regardless of the - * number of objects. - */ - he_t he_volume; - - struct __wt_source *ws_head; /* List of WiredTiger sources */ - - /* - * Each Helium source has a cleaner thread to migrate WiredTiger source - * updates from the cache namespace to the primary namespace, based on - * the number of bytes or the number of operations. (There's a cleaner - * thread per Helium store so migration operations can overlap.) We - * read these fields without a lock, but serialize writes to minimize - * races (and because it costs us nothing). - */ - pthread_t cleaner_id; /* Cleaner thread ID */ - volatile int cleaner_stop; /* Cleaner thread quit flag */ - - /* - * Each WiredTiger connection has a transaction namespace which lists - * resolved transactions with their committed or aborted state as a - * value. That namespace appears in a single Helium store (the first - * one created, if it doesn't already exist), and then it's referenced - * from other Helium stores. - */ -#define TXN_ABORTED 'A' -#define TXN_COMMITTED 'C' -#define TXN_UNRESOLVED 0 - he_t he_txn; /* Helium txn store */ - bool he_owner; /* Owns transaction store */ - - struct __he_source *next; /* List of Helium sources */ -} HELIUM_SOURCE; - -/* - * DATA_SOURCE -- - * A WiredTiger data source, supporting one or more HELIUM_SOURCE objects. - */ -typedef struct __data_source { - WT_DATA_SOURCE wtds; /* Must come first */ - - WT_EXTENSION_API *wt_api; /* Extension functions */ - - pthread_rwlock_t global_lock; /* Global lock */ - bool lockinit; /* Lock created */ - - struct __he_source *hs_head; /* List of Helium sources */ -} DATA_SOURCE; - -/* - * CACHE_RECORD -- - * An array of updates from the cache object. - * - * Values in the cache store are marshalled/unmarshalled to/from the store, - * using a simple encoding: - * {N records: 4B} - * {record#1 TxnID: 8B} - * {record#1 remove tombstone: 1B} - * {record#1 data length: 4B} - * {record#1 data} - * ... - * - * Each cursor potentially has a single set of these values. - */ -typedef struct __cache_record { - uint8_t *v; /* Value */ - uint32_t len; /* Value length */ - uint64_t txnid; /* Transaction ID */ -#define REMOVE_TOMBSTONE 'R' - int remove; /* 1/0 remove flag */ -} CACHE_RECORD; - -/* - * CURSOR -- - * A cursor, supporting a single WiredTiger cursor. - */ -typedef struct __cursor { - WT_CURSOR wtcursor; /* Must come first */ - - WT_EXTENSION_API *wt_api; /* Extension functions */ - - WT_SOURCE *ws; /* Underlying source */ - - HE_ITEM record; /* Record */ - uint8_t __key[HE_MAX_KEY_LEN]; /* Record.key, Record.value */ - uint8_t *v; - size_t len; - size_t mem_len; - - struct { - uint8_t *v; /* Temporary buffers */ - size_t len; - size_t mem_len; - } t1, t2, t3; - - int config_append; /* config "append" */ - int config_overwrite; /* config "overwrite" */ - - CACHE_RECORD *cache; /* unmarshalled cache records */ - uint32_t cache_entries; /* cache records */ - uint32_t cache_slots; /* cache total record slots */ -} CURSOR; - -/* - * prefix_match -- - * Return if a string matches a prefix. - */ -static inline int -prefix_match(const char *str, const char *pfx) -{ - return (strncmp(str, pfx, strlen(pfx)) == 0); -} - -/* - * string_match -- - * Return if a string matches a byte string of len bytes. - */ -static inline int -string_match(const char *str, const char *bytes, size_t len) -{ - return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0'); -} - -/* - * cursor_destroy -- - * Free a cursor's memory, and optionally the cursor itself. - */ -static void -cursor_destroy(CURSOR *cursor) -{ - if (cursor != NULL) { - free(cursor->v); - free(cursor->t1.v); - free(cursor->t2.v); - free(cursor->t3.v); - free(cursor->cache); - OVERWRITE_AND_FREE(cursor); - } -} - -/* - * os_errno -- - * Limit our use of errno so it's easy to find/remove. - */ -static int -os_errno(void) -{ - return (errno); -} - -/* - * lock_init -- - * Initialize a lock. - */ -static int -lock_init( - WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp) -{ - int ret = 0; - - if ((ret = pthread_rwlock_init(lockp, NULL)) != 0) - ERET(wt_api, session, WT_PANIC, - "pthread_rwlock_init: %s", strerror(ret)); - return (0); -} - -/* - * lock_destroy -- - * Destroy a lock. - */ -static int -lock_destroy( - WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp) -{ - int ret = 0; - - if ((ret = pthread_rwlock_destroy(lockp)) != 0) - ERET(wt_api, session, WT_PANIC, - "pthread_rwlock_destroy: %s", strerror(ret)); - return (0); -} - -/* - * writelock -- - * Acquire a write lock. - */ -static inline int -writelock( - WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp) -{ - int ret = 0; - - if ((ret = pthread_rwlock_wrlock(lockp)) != 0) - ERET(wt_api, session, WT_PANIC, - "pthread_rwlock_wrlock: %s", strerror(ret)); - return (0); -} - -/* - * unlock -- - * Release a lock. - */ -static inline int -unlock(WT_EXTENSION_API *wt_api, WT_SESSION *session, pthread_rwlock_t *lockp) -{ - int ret = 0; - - if ((ret = pthread_rwlock_unlock(lockp)) != 0) - ERET(wt_api, session, WT_PANIC, - "pthread_rwlock_unlock: %s", strerror(ret)); - return (0); -} - -#if 0 -/* - * helium_dump_kv -- - * Dump a Helium record. - */ -static void -helium_dump_kv(const char *pfx, uint8_t *p, size_t len, FILE *fp) -{ - (void)fprintf(stderr, "%s %3zu: ", pfx, len); - for (; len > 0; --len, ++p) - if (!isspace(*p) && isprint(*p)) - (void)putc(*p, fp); - else if (len == 1 && *p == '\0') /* Skip string nuls. */ - continue; - else - (void)fprintf(fp, "%#x", *p); - (void)putc('\n', fp); -} - -/* - * helium_dump -- - * Dump the records in a Helium store. - */ -static int -helium_dump(WT_EXTENSION_API *wt_api, he_t he, const char *tag) -{ - HE_ITEM *r, _r; - uint8_t k[4 * 1024], v[4 * 1024]; - int ret = 0; - - r = &_r; - memset(r, 0, sizeof(*r)); - r->key = k; - r->val = v; - - (void)fprintf(stderr, "== %s\n", tag); - while ((ret = he_next(he, r, (size_t)0, sizeof(v))) == 0) { -#if 0 - uint64_t recno; - WT_RET(wt_api->struct_unpack(wt_api, - NULL, r->key, r->key_len, "r", &recno)); - fprintf(stderr, "K: %" PRIu64, recno); -#else - helium_dump_kv("K: ", r->key, r->key_len, stderr); -#endif - helium_dump_kv("V: ", r->val, r->val_len, stderr); - } - if (ret != HE_ERR_ITEM_NOT_FOUND) { - fprintf(stderr, "he_next: %s\n", he_strerror(ret)); - ret = WT_ERROR; - } - return (ret); -} - -/* - * helium_stats -- - * Display Helium statistics for a datastore. - */ -static int -helium_stats( - WT_EXTENSION_API *wt_api, WT_SESSION *session, he_t he, const char *tag) -{ - HE_STATS stats; - int ret = 0; - - if ((ret = he_stats(he, &stats)) != 0) - ERET(wt_api, session, ret, "he_stats: %s", he_strerror(ret)); - fprintf(stderr, "== %s\n", tag); - fprintf(stderr, "name=%s\n", stats.name); - fprintf(stderr, "deleted_items=%" PRIu64 "\n", stats.deleted_items); - fprintf(stderr, "locked_items=%" PRIu64 "\n", stats.locked_items); - fprintf(stderr, "valid_items=%" PRIu64 "\n", stats.valid_items); - fprintf(stderr, "capacity=%" PRIu64 "B\n", stats.capacity); - fprintf(stderr, "size=%" PRIu64 "B\n", stats.size); - return (0); -} -#endif - -/* - * helium_call -- - * Call a Helium key retrieval function, handling overflow. - */ -static inline int -helium_call(WT_CURSOR *wtcursor, const char *fname, - he_t he, int (*f)(he_t, HE_ITEM *, size_t, size_t)) -{ - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - int ret = 0; - char *p; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - - r = &cursor->record; - r->val = cursor->v; - -restart: - if ((ret = f(he, r, (size_t)0, cursor->mem_len)) != 0) { - if (ret == HE_ERR_ITEM_NOT_FOUND) - return (WT_NOTFOUND); - ERET(wt_api, session, ret, "%s: %s", fname, he_strerror(ret)); - } - - /* - * If the returned length is larger than our passed-in length, we didn't - * get the complete value. Grow the buffer and use he_lookup to do the - * retrieval (he_lookup because the call succeeded and the key was - * copied out, so calling he_next/he_prev again would skip key/value - * pairs). - * - * We have to loop, another thread of control might change the length of - * the value, requiring we grow our buffer multiple times. - * - * We have to potentially restart the entire call in case the underlying - * key/value disappears. - */ - for (;;) { - if (cursor->mem_len >= r->val_len) { - cursor->len = r->val_len; - return (0); - } - - /* Grow the value buffer. */ - if ((p = realloc(cursor->v, r->val_len + 32)) == NULL) - return (os_errno()); - cursor->v = r->val = p; - cursor->mem_len = r->val_len + 32; - - if ((ret = he_lookup(he, r, (size_t)0, cursor->mem_len)) != 0) { - if (ret == HE_ERR_ITEM_NOT_FOUND) - goto restart; - ERET(wt_api, - session, ret, "he_lookup: %s", he_strerror(ret)); - } - } - /* NOTREACHED */ -} - -/* - * txn_state_set -- - * Resolve a transaction. - */ -static int -txn_state_set(WT_EXTENSION_API *wt_api, - WT_SESSION *session, HELIUM_SOURCE *hs, uint64_t txnid, int commit) -{ - HE_ITEM txn; - uint8_t val; - int ret = 0; - - /* - * Update the store -- commits must be durable, flush the volume. - * - * XXX - * Not endian-portable, we're writing a native transaction ID to the - * store. - */ - memset(&txn, 0, sizeof(txn)); - txn.key = &txnid; - txn.key_len = sizeof(txnid); - val = commit ? TXN_COMMITTED : TXN_ABORTED; - txn.val = &val; - txn.val_len = sizeof(val); - - if ((ret = he_update(hs->he_txn, &txn)) != 0) - ERET(wt_api, session, ret, "he_update: %s", he_strerror(ret)); - - if (commit && (ret = he_commit(hs->he_txn)) != 0) - ERET(wt_api, session, ret, "he_commit: %s", he_strerror(ret)); - return (0); -} - -/* - * txn_notify -- - * Resolve a transaction; called from WiredTiger during commit/abort. - */ -static int -txn_notify(WT_TXN_NOTIFY *handler, - WT_SESSION *session, uint64_t txnid, int committed) -{ - HELIUM_SOURCE *hs; - - hs = (HELIUM_SOURCE *)handler; - return (txn_state_set(hs->wt_api, session, hs, txnid, committed)); -} - -/* - * txn_state -- - * Return a transaction's state. - */ -static int -txn_state(WT_CURSOR *wtcursor, uint64_t txnid) -{ - CURSOR *cursor; - HE_ITEM txn; - HELIUM_SOURCE *hs; - uint8_t val_buf[16]; - - cursor = (CURSOR *)wtcursor; - hs = cursor->ws->hs; - - memset(&txn, 0, sizeof(txn)); - txn.key = &txnid; - txn.key_len = sizeof(txnid); - txn.val = val_buf; - txn.val_len = sizeof(val_buf); - - if (he_lookup(hs->he_txn, &txn, (size_t)0, sizeof(val_buf)) == 0) - return (val_buf[0]); - return (TXN_UNRESOLVED); -} - -/* - * cache_value_append -- - * Append the current WiredTiger cursor's value to a cache record. - */ -static int -cache_value_append(WT_CURSOR *wtcursor, int remove_op) -{ - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - uint64_t txnid; - size_t len; - uint32_t entries; - uint8_t *p; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - - r = &cursor->record; - - /* - * A cache update is 4B that counts the number of entries in the update, - * followed by sets of: 8B of txn ID then either a remove tombstone or a - * 4B length and variable-length data pair. Grow the value buffer, then - * append the cursor's information. - */ - len = cursor->len + /* current length */ - sizeof(uint32_t) + /* entries */ - sizeof(uint64_t) + /* txn ID */ - 1 + /* remove byte */ - (remove_op ? 0 : /* optional data */ - sizeof(uint32_t) + wtcursor->value.size) + - 32; /* slop */ - - if (len > cursor->mem_len) { - if ((p = realloc(cursor->v, len)) == NULL) - return (os_errno()); - cursor->v = p; - cursor->mem_len = len; - } - - /* Get the transaction ID. */ - txnid = wt_api->transaction_id(wt_api, session); - - /* Update the number of records in this value. */ - if (cursor->len == 0) { - entries = 1; - cursor->len = sizeof(uint32_t); - } else { - memcpy(&entries, cursor->v, sizeof(uint32_t)); - ++entries; - } - memcpy(cursor->v, &entries, sizeof(uint32_t)); - - /* - * Copy the WiredTiger cursor's data into place: txn ID, remove - * tombstone, data length, data. - * - * XXX - * Not endian-portable, we're writing a native transaction ID to the - * store. - */ - p = cursor->v + cursor->len; - memcpy(p, &txnid, sizeof(uint64_t)); - p += sizeof(uint64_t); - if (remove_op) - *p++ = REMOVE_TOMBSTONE; - else { - *p++ = ' '; - memcpy(p, &wtcursor->value.size, sizeof(uint32_t)); - p += sizeof(uint32_t); - memcpy(p, wtcursor->value.data, wtcursor->value.size); - p += wtcursor->value.size; - } - cursor->len = (size_t)(p - cursor->v); - - /* Update the underlying Helium record. */ - r->val = cursor->v; - r->val_len = cursor->len; - - return (0); -} - -/* - * cache_value_unmarshall -- - * Unmarshall a cache value into a set of records. - */ -static int -cache_value_unmarshall(WT_CURSOR *wtcursor) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - uint32_t entries, i; - uint8_t *p; - int ret = 0; - - cursor = (CURSOR *)wtcursor; - - /* If we don't have enough record slots, allocate some more. */ - memcpy(&entries, cursor->v, sizeof(uint32_t)); - if (entries > cursor->cache_slots) { - if ((p = realloc(cursor->cache, - (entries + 20) * sizeof(cursor->cache[0]))) == NULL) - return (os_errno()); - - cursor->cache = (CACHE_RECORD *)p; - cursor->cache_slots = entries + 20; - } - - /* Walk the value, splitting it up into records. */ - p = cursor->v + sizeof(uint32_t); - for (i = 0, cp = cursor->cache; i < entries; ++i, ++cp) { - memcpy(&cp->txnid, p, sizeof(uint64_t)); - p += sizeof(uint64_t); - cp->remove = *p++ == REMOVE_TOMBSTONE ? 1 : 0; - if (!cp->remove) { - memcpy(&cp->len, p, sizeof(uint32_t)); - p += sizeof(uint32_t); - cp->v = p; - p += cp->len; - } - } - cursor->cache_entries = entries; - - return (ret); -} - -/* - * cache_value_aborted -- - * Return if a transaction has been aborted. - */ -static inline int -cache_value_aborted(WT_CURSOR *wtcursor, CACHE_RECORD *cp) -{ - /* - * This function exists as a place to hang this comment. - * - * WiredTiger resets updated entry transaction IDs to an aborted state - * on rollback; to do that here would require tracking updated entries - * for a transaction or scanning the cache for updates made on behalf - * of the transaction during rollback, expensive stuff. Instead, check - * if the transaction has been aborted before calling the underlying - * WiredTiger visibility function. - */ - return (txn_state(wtcursor, cp->txnid) == TXN_ABORTED ? 1 : 0); -} - -/* - * cache_value_committed -- - * Return if a transaction has been committed. - */ -static inline int -cache_value_committed(WT_CURSOR *wtcursor, CACHE_RECORD *cp) -{ - return (txn_state(wtcursor, cp->txnid) == TXN_COMMITTED ? 1 : 0); -} - -/* - * cache_value_update_check -- - * Return if an update can proceed based on the previous updates made to - * the cache entry. - */ -static int -cache_value_update_check(WT_CURSOR *wtcursor) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - u_int i; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - - /* Only interesting for snapshot isolation. */ - if (wt_api-> - transaction_isolation_level(wt_api, session) != WT_TXN_ISO_SNAPSHOT) - return (0); - - /* - * If there's an entry that's not visible and hasn't been aborted, - * return a deadlock. - */ - for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp) - if (!cache_value_aborted(wtcursor, cp) && - !wt_api->transaction_visible(wt_api, session, cp->txnid)) - return (WT_ROLLBACK); - return (0); -} - -/* - * cache_value_visible -- - * Return the most recent cache entry update visible to the running - * transaction. - */ -static int -cache_value_visible(WT_CURSOR *wtcursor, CACHE_RECORD **cpp) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - u_int i; - - *cpp = NULL; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - - /* - * We want the most recent cache entry update; the cache entries are - * in update order, walk from the end to the beginning. - */ - cp = cursor->cache + cursor->cache_entries; - for (i = 0; i < cursor->cache_entries; ++i) { - --cp; - if (!cache_value_aborted(wtcursor, cp) && - wt_api->transaction_visible(wt_api, session, cp->txnid)) { - *cpp = cp; - return (1); - } - } - return (0); -} - -/* - * cache_value_visible_all -- - * Return if a cache entry has no updates that aren't globally visible. - */ -static int -cache_value_visible_all(WT_CURSOR *wtcursor, uint64_t oldest) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - u_int i; - - cursor = (CURSOR *)wtcursor; - - /* - * Compare the update's transaction ID and the oldest transaction ID - * not yet visible to a running transaction. If there's an update a - * running transaction might want, the entry must remain in the cache. - * (We could tighten this requirement: if the only update required is - * also the update we'd migrate to the primary, it would still be OK - * to migrate it.) - */ - for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp) - if (cp->txnid >= oldest) - return (0); - return (1); -} - -/* - * cache_value_last_committed -- - * Find the most recent update in a cache entry, recovery processing. - */ -static void -cache_value_last_committed(WT_CURSOR *wtcursor, CACHE_RECORD **cpp) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - u_int i; - - *cpp = NULL; - - cursor = (CURSOR *)wtcursor; - - /* - * Find the most recent update in the cache record, we're going to try - * and migrate it into the primary, recovery version. - * - * We know the entry is visible, but it must have been committed before - * the failure to be migrated. - * - * Cache entries are in update order, walk from end to beginning. - */ - cp = cursor->cache + cursor->cache_entries; - for (i = 0; i < cursor->cache_entries; ++i) { - --cp; - if (cache_value_committed(wtcursor, cp)) { - *cpp = cp; - return; - } - } -} - -/* - * cache_value_last_not_aborted -- - * Find the most recent update in a cache entry, normal processing. - */ -static void -cache_value_last_not_aborted(WT_CURSOR *wtcursor, CACHE_RECORD **cpp) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - u_int i; - - *cpp = NULL; - - cursor = (CURSOR *)wtcursor; - - /* - * Find the most recent update in the cache record, we're going to try - * and migrate it into the primary, normal processing version. - * - * We don't have to check if the entry was committed, we've already - * confirmed all entries for this cache key are globally visible, which - * means they must be either committed or aborted. - * - * Cache entries are in update order, walk from end to beginning. - */ - cp = cursor->cache + cursor->cache_entries; - for (i = 0; i < cursor->cache_entries; ++i) { - --cp; - if (!cache_value_aborted(wtcursor, cp)) { - *cpp = cp; - return; - } - } -} - -/* - * cache_value_txnmin -- - * Return the oldest transaction ID involved in a cache update. - */ -static void -cache_value_txnmin(WT_CURSOR *wtcursor, uint64_t *txnminp) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - uint64_t txnmin; - u_int i; - - cursor = (CURSOR *)wtcursor; - - /* Return the oldest transaction ID for in the cache entry. */ - txnmin = UINT64_MAX; - for (i = 0, cp = cursor->cache; i < cursor->cache_entries; ++i, ++cp) - if (txnmin > cp->txnid) - txnmin = cp->txnid; - *txnminp = txnmin; -} - -/* - * key_max_err -- - * Common error when a WiredTiger key is too large. - */ -static int -key_max_err(WT_EXTENSION_API *wt_api, WT_SESSION *session, size_t len) -{ - int ret = 0; - - ERET(wt_api, session, EINVAL, - "key length (%zu bytes) larger than the maximum Helium " - "key length of %d bytes", - len, HE_MAX_KEY_LEN); -} - -/* - * copyin_key -- - * Copy a WT_CURSOR key to a HE_ITEM key. - */ -static inline int -copyin_key(WT_CURSOR *wtcursor, int allocate_key) -{ - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - WT_SOURCE *ws; - size_t size; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - ws = cursor->ws; - wt_api = cursor->wt_api; - - r = &cursor->record; - if (ws->config_recno) { - /* - * Allocate a new record for append operations. - * - * A specified record number could potentially be larger than - * the maximum known record number, update the maximum number - * as necessary. - * - * Assume we can compare 8B values without locking them, and - * test again after acquiring the lock. - * - * XXX - * If the put fails for some reason, we'll have incremented the - * maximum record number past the correct point. I can't think - * of a reason any application would care or notice, but it's - * not quite right. - */ - if (allocate_key && cursor->config_append) { - WT_RET(writelock(wt_api, session, &ws->lock)); - wtcursor->recno = ++ws->append_recno; - WT_RET(unlock(wt_api, session, &ws->lock)); - } else if (wtcursor->recno > ws->append_recno) { - WT_RET(writelock(wt_api, session, &ws->lock)); - if (wtcursor->recno > ws->append_recno) - ws->append_recno = wtcursor->recno; - WT_RET(unlock(wt_api, session, &ws->lock)); - } - - WT_RET(wt_api->struct_size(wt_api, - session, &size, "r", wtcursor->recno)); - WT_RET(wt_api->struct_pack(wt_api, - session, r->key, HE_MAX_KEY_LEN, "r", wtcursor->recno)); - r->key_len = size; - } else { - /* I'm not sure this test is necessary, but it's cheap. */ - if (wtcursor->key.size > HE_MAX_KEY_LEN) - return ( - key_max_err(wt_api, session, wtcursor->key.size)); - - /* - * A set cursor key might reference application memory, which - * is only OK until the cursor operation has been called (in - * other words, we can only reference application memory from - * the WT_CURSOR.set_key call until the WT_CURSOR.op call). - * For this reason, do a full copy, don't just reference the - * WT_CURSOR key's data. - */ - memcpy(r->key, wtcursor->key.data, wtcursor->key.size); - r->key_len = wtcursor->key.size; - } - return (0); -} - -/* - * copyout_key -- - * Copy a HE_ITEM key to a WT_CURSOR key. - */ -static inline int -copyout_key(WT_CURSOR *wtcursor) -{ - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - WT_SOURCE *ws; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - ws = cursor->ws; - - r = &cursor->record; - if (ws->config_recno) - WT_RET(wt_api->struct_unpack(wt_api, - session, r->key, r->key_len, "r", &wtcursor->recno)); - else { - wtcursor->key.data = r->key; - wtcursor->key.size = (size_t)r->key_len; - } - return (0); -} - -/* - * copyout_val -- - * Copy a Helium store's HE_ITEM value to a WT_CURSOR value. - */ -static inline int -copyout_val(WT_CURSOR *wtcursor, CACHE_RECORD *cp) -{ - CURSOR *cursor; - - cursor = (CURSOR *)wtcursor; - - if (cp == NULL) { - wtcursor->value.data = cursor->v; - wtcursor->value.size = cursor->len; - } else { - wtcursor->value.data = cp->v; - wtcursor->value.size = cp->len; - } - return (0); -} - -/* - * nextprev -- - * Cursor next/prev. - */ -static int -nextprev(WT_CURSOR *wtcursor, const char *fname, - int (*f)(he_t, HE_ITEM *, size_t, size_t)) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - WT_ITEM a, b; - WT_SESSION *session; - WT_SOURCE *ws; - int cache_ret, cache_rm, cmp, ret = 0; - void *p; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - ws = cursor->ws; - wt_api = cursor->wt_api; - r = &cursor->record; - - cache_rm = 0; - - /* - * If the cache isn't yet in use, it's a simpler problem, just check - * the store. We don't care if we race, we're not guaranteeing any - * special behavior with respect to phantoms. - */ - if (!ws->he_cache_inuse) { - cache_ret = WT_NOTFOUND; - goto cache_clean; - } - -skip_deleted: - /* - * The next/prev key/value pair might be in the cache, which means we - * are making two calls and returning the best choice. As each call - * overwrites both key and value, we have to have a copy of the key - * for the second call plus the returned key and value from the first - * call. That's why each cursor has 3 temporary buffers. - * - * First, copy the key. - */ - if (cursor->t1.mem_len < r->key_len) { - if ((p = realloc(cursor->t1.v, r->key_len)) == NULL) - return (os_errno()); - cursor->t1.v = p; - cursor->t1.mem_len = r->key_len; - } - memcpy(cursor->t1.v, r->key, r->key_len); - cursor->t1.len = r->key_len; - - /* - * Move through the cache until we either find a record with a visible - * entry, or we reach the end/beginning. - */ - for (cache_rm = 0;;) { - if ((ret = helium_call(wtcursor, fname, ws->he_cache, f)) != 0) - break; - WT_RET(cache_value_unmarshall(wtcursor)); - - /* If there's no visible entry, move to the next one. */ - if (!cache_value_visible(wtcursor, &cp)) - continue; - - /* - * If the entry has been deleted, remember that and continue. - * We can't just skip the entry because it might be a delete - * of an entry in the primary store, which means the cache - * entry stops us from returning the primary store's entry. - */ - if (cp->remove) - cache_rm = 1; - - /* - * Copy the cache key. If the cache's entry wasn't a delete, - * copy the value as well, we may return the cache entry. - */ - if (cursor->t2.mem_len < r->key_len) { - if ((p = realloc(cursor->t2.v, r->key_len)) == NULL) - return (os_errno()); - cursor->t2.v = p; - cursor->t2.mem_len = r->key_len; - } - memcpy(cursor->t2.v, r->key, r->key_len); - cursor->t2.len = r->key_len; - - if (cache_rm) - break; - - if (cursor->t3.mem_len < cp->len) { - if ((p = realloc(cursor->t3.v, cp->len)) == NULL) - return (os_errno()); - cursor->t3.v = p; - cursor->t3.mem_len = cp->len; - } - memcpy(cursor->t3.v, cp->v, cp->len); - cursor->t3.len = cp->len; - - break; - } - if (ret != 0 && ret != WT_NOTFOUND) - return (ret); - cache_ret = ret; - - /* Copy the original key back into place. */ - memcpy(r->key, cursor->t1.v, cursor->t1.len); - r->key_len = cursor->t1.len; - -cache_clean: - /* Get the next/prev entry from the store. */ - ret = helium_call(wtcursor, fname, ws->he, f); - if (ret != 0 && ret != WT_NOTFOUND) - return (ret); - - /* If no entries in either the cache or the primary, we're done. */ - if (cache_ret == WT_NOTFOUND && ret == WT_NOTFOUND) - return (WT_NOTFOUND); - - /* - * If both the cache and the primary had entries, decide which is a - * better choice and pretend we didn't find the other one. - */ - if (cache_ret == 0 && ret == 0) { - a.data = r->key; /* a is the primary */ - a.size = (uint32_t)r->key_len; - b.data = cursor->t2.v; /* b is the cache */ - b.size = (uint32_t)cursor->t2.len; - WT_RET(wt_api->collate(wt_api, session, NULL, &a, &b, &cmp)); - - if (f == he_next) { - if (cmp >= 0) - ret = WT_NOTFOUND; - else - cache_ret = WT_NOTFOUND; - } else { - if (cmp <= 0) - ret = WT_NOTFOUND; - else - cache_ret = WT_NOTFOUND; - } - } - - /* - * If the cache is the key we'd choose, but it's a delete, skip past it - * by moving from the deleted key to the next/prev item in either the - * primary or the cache. - */ - if (cache_ret == 0 && cache_rm) { - memcpy(r->key, cursor->t2.v, cursor->t2.len); - r->key_len = cursor->t2.len; - goto skip_deleted; - } - - /* If taking the cache's entry, copy the value into place. */ - if (cache_ret == 0) { - memcpy(r->key, cursor->t2.v, cursor->t2.len); - r->key_len = cursor->t2.len; - - memcpy(cursor->v, cursor->t3.v, cursor->t3.len); - cursor->len = cursor->t3.len; - } - - /* Copy out the chosen key/value pair. */ - WT_RET(copyout_key(wtcursor)); - WT_RET(copyout_val(wtcursor, NULL)); - return (0); -} - -/* - * helium_cursor_next -- - * WT_CURSOR.next method. - */ -static int -helium_cursor_next(WT_CURSOR *wtcursor) -{ - return (nextprev(wtcursor, "he_next", he_next)); -} - -/* - * helium_cursor_prev -- - * WT_CURSOR.prev method. - */ -static int -helium_cursor_prev(WT_CURSOR *wtcursor) -{ - return (nextprev(wtcursor, "he_prev", he_prev)); -} - -/* - * helium_cursor_reset -- - * WT_CURSOR.reset method. - */ -static int -helium_cursor_reset(WT_CURSOR *wtcursor) -{ - CURSOR *cursor; - HE_ITEM *r; - - cursor = (CURSOR *)wtcursor; - r = &cursor->record; - - /* - * Reset the cursor by setting the key length to 0, causing subsequent - * next/prev operations to return the first/last record of the object. - */ - r->key_len = 0; - return (0); -} - -/* - * helium_cursor_search -- - * WT_CURSOR.search method. - */ -static int -helium_cursor_search(WT_CURSOR *wtcursor) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - WT_SOURCE *ws; - int ret = 0; - - cursor = (CURSOR *)wtcursor; - ws = cursor->ws; - - /* Copy in the WiredTiger cursor's key. */ - WT_RET(copyin_key(wtcursor, 0)); - - /* - * Check for an entry in the cache. If we find one, unmarshall it - * and check for a visible entry we can return. - */ - if ((ret = - helium_call(wtcursor, "he_lookup", ws->he_cache, he_lookup)) == 0) { - WT_RET(cache_value_unmarshall(wtcursor)); - if (cache_value_visible(wtcursor, &cp)) - return (cp->remove ? - WT_NOTFOUND : copyout_val(wtcursor, cp)); - } else if (ret != WT_NOTFOUND) - return (ret); - - /* Check for an entry in the primary store. */ - WT_RET(helium_call(wtcursor, "he_lookup", ws->he, he_lookup)); - WT_RET(copyout_val(wtcursor, NULL)); - - return (0); -} - -/* - * helium_cursor_search_near -- - * WT_CURSOR.search_near method. - */ -static int -helium_cursor_search_near(WT_CURSOR *wtcursor, int *exact) -{ - int ret = 0; - - /* - * XXX - * I'm not confident this is sufficient: if there are multiple threads - * of control, it's possible for the search for an exact match to fail, - * another thread of control to insert (and commit) an exact match, and - * then it's possible we'll return the wrong value. This needs to be - * revisited once the transactional code is in place. - */ - - /* Search for an exact match. */ - if ((ret = helium_cursor_search(wtcursor)) == 0) { - *exact = 0; - return (0); - } - if (ret != WT_NOTFOUND) - return (ret); - - /* Search for a key that's larger. */ - if ((ret = helium_cursor_next(wtcursor)) == 0) { - *exact = 1; - return (0); - } - if (ret != WT_NOTFOUND) - return (ret); - - /* Search for a key that's smaller. */ - if ((ret = helium_cursor_prev(wtcursor)) == 0) { - *exact = -1; - return (0); - } - - return (ret); -} - -/* - * helium_cursor_insert -- - * WT_CURSOR.insert method. - */ -static int -helium_cursor_insert(WT_CURSOR *wtcursor) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - HE_ITEM *r; - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - WT_SOURCE *ws; - int ret = 0; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - ws = cursor->ws; - hs = ws->hs; - r = &cursor->record; - - /* Get the WiredTiger cursor's key. */ - WT_RET(copyin_key(wtcursor, 1)); - - VMSG(wt_api, session, VERBOSE_L2, - "I %.*s.%.*s", (int)r->key_len, r->key, (int)r->val_len, r->val); - - /* Clear the value, assume we're adding the first cache entry. */ - cursor->len = 0; - - /* Updates are read-modify-writes, lock the underlying cache. */ - WT_RET(writelock(wt_api, session, &ws->lock)); - - /* Read the record from the cache store. */ - switch (ret = helium_call( - wtcursor, "he_lookup", ws->he_cache, he_lookup)) { - case 0: - /* Crack the record. */ - WT_ERR(cache_value_unmarshall(wtcursor)); - - /* Check if the update can proceed. */ - WT_ERR(cache_value_update_check(wtcursor)); - - if (cursor->config_overwrite) - break; - - /* - * If overwrite is false, a visible entry (that's not a removed - * entry), is an error. We're done checking if there is a - * visible entry in the cache, otherwise repeat the check on the - * primary store. - */ - if (cache_value_visible(wtcursor, &cp)) { - if (cp->remove) - break; - - ret = WT_DUPLICATE_KEY; - goto err; - } - /* FALLTHROUGH */ - case WT_NOTFOUND: - if (cursor->config_overwrite) - break; - - /* If overwrite is false, an entry is an error. */ - if ((ret = helium_call( - wtcursor, "he_lookup", ws->he, he_lookup)) != WT_NOTFOUND) { - if (ret == 0) - ret = WT_DUPLICATE_KEY; - goto err; - } - ret = 0; - break; - default: - goto err; - } - - /* - * Create a new value using the current cache record plus the WiredTiger - * cursor's value, and update the cache. - */ - WT_ERR(cache_value_append(wtcursor, 0)); - if ((ret = he_update(ws->he_cache, r)) != 0) - EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret)); - - /* Update the state while still holding the lock. */ - if (!ws->he_cache_inuse) - ws->he_cache_inuse = true; - ++ws->he_cache_ops; - - /* Discard the lock. */ -err: ESET(unlock(wt_api, session, &ws->lock)); - - /* If successful, request notification at transaction resolution. */ - if (ret == 0) - ESET(wt_api->transaction_notify( - wt_api, session, &hs->txn_notify)); - - return (ret); -} - -/* - * update -- - * Update or remove an entry. - */ -static int -update(WT_CURSOR *wtcursor, int remove_op) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - HE_ITEM *r; - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - WT_SOURCE *ws; - int ret = 0; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - ws = cursor->ws; - hs = ws->hs; - r = &cursor->record; - - /* Get the WiredTiger cursor's key. */ - WT_RET(copyin_key(wtcursor, 0)); - - VMSG(wt_api, session, VERBOSE_L2, - "%c %.*s.%.*s", - remove_op ? 'R' : 'U', - (int)r->key_len, r->key, (int)r->val_len, r->val); - - /* Clear the value, assume we're adding the first cache entry. */ - cursor->len = 0; - - /* Updates are read-modify-writes, lock the underlying cache. */ - WT_RET(writelock(wt_api, session, &ws->lock)); - - /* Read the record from the cache store. */ - switch (ret = helium_call( - wtcursor, "he_lookup", ws->he_cache, he_lookup)) { - case 0: - /* Crack the record. */ - WT_ERR(cache_value_unmarshall(wtcursor)); - - /* Check if the update can proceed. */ - WT_ERR(cache_value_update_check(wtcursor)); - - if (cursor->config_overwrite) - break; - - /* - * If overwrite is false, no entry (or a removed entry), is an - * error. We're done checking if there is a visible entry in - * the cache, otherwise repeat the check on the primary store. - */ - if (cache_value_visible(wtcursor, &cp)) { - if (!cp->remove) - break; - - ret = WT_NOTFOUND; - goto err; - } - /* FALLTHROUGH */ - case WT_NOTFOUND: - if (cursor->config_overwrite) - break; - - /* If overwrite is false, no entry is an error. */ - WT_ERR(helium_call(wtcursor, "he_lookup", ws->he, he_lookup)); - - /* - * All we care about is the cache entry, which didn't exist; - * clear the returned value, we're about to "append" to it. - */ - cursor->len = 0; - break; - default: - goto err; - } - - /* - * Create a new cache value based on the current cache record plus the - * WiredTiger cursor's value. - */ - WT_ERR(cache_value_append(wtcursor, remove_op)); - - /* Push the record into the cache. */ - if ((ret = he_update(ws->he_cache, r)) != 0) - EMSG(wt_api, session, ret, "he_update: %s", he_strerror(ret)); - - /* Update the state while still holding the lock. */ - if (!ws->he_cache_inuse) - ws->he_cache_inuse = true; - ++ws->he_cache_ops; - - /* Discard the lock. */ -err: ESET(unlock(wt_api, session, &ws->lock)); - - /* If successful, request notification at transaction resolution. */ - if (ret == 0) - ESET(wt_api->transaction_notify( - wt_api, session, &hs->txn_notify)); - - return (ret); -} - -/* - * helium_cursor_update -- - * WT_CURSOR.update method. - */ -static int -helium_cursor_update(WT_CURSOR *wtcursor) -{ - return (update(wtcursor, 0)); -} - -/* - * helium_cursor_reserve -- - * WT_CURSOR.reserve method. - */ -static int -helium_cursor_reserve(WT_CURSOR *wtcursor) -{ - (void)wtcursor; - - /* - * XXX - * We don't currently support reserve, this will require some work. - * The test programs don't currently detect it, so return success. - */ - return (0); -} - -/* - * helium_cursor_remove -- - * WT_CURSOR.remove method. - */ -static int -helium_cursor_remove(WT_CURSOR *wtcursor) -{ - CURSOR *cursor; - WT_SOURCE *ws; - - cursor = (CURSOR *)wtcursor; - ws = cursor->ws; - - /* - * WiredTiger's "remove" of a bitfield is really an update with a value - * of zero. - */ - if (ws->config_bitfield) { - wtcursor->value.size = 1; - wtcursor->value.data = ""; - return (update(wtcursor, 0)); - } - return (update(wtcursor, 1)); -} - -/* - * helium_cursor_close -- - * WT_CURSOR.close method. - */ -static int -helium_cursor_close(WT_CURSOR *wtcursor) -{ - CURSOR *cursor; - WT_EXTENSION_API *wt_api; - WT_SESSION *session; - WT_SOURCE *ws; - int ret = 0; - - session = wtcursor->session; - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - ws = cursor->ws; - - if ((ret = writelock(wt_api, session, &ws->lock)) == 0) { - --ws->ref; - ret = unlock(wt_api, session, &ws->lock); - } - cursor_destroy(cursor); - - return (ret); -} - -/* - * ws_source_name -- - * Build a namespace name. - */ -static int -ws_source_name(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, const char *suffix, char **pp) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - size_t len; - int ret = 0; - const char *p; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* - * Create the store's name. Application URIs are "helium:device/name"; - * we want the names on the Helium device to be obviously WiredTiger's, - * and the device name isn't interesting. Convert to "WiredTiger:name", - * and add an optional suffix. - */ - if (!prefix_match(uri, "helium:") || (p = strchr(uri, '/')) == NULL) - ERET(wt_api, session, EINVAL, "%s: illegal Helium URI", uri); - ++p; - - len = strlen(WT_NAME_PREFIX) + - strlen(p) + (suffix == NULL ? 0 : strlen(suffix)) + 5; - if ((*pp = malloc(len)) == NULL) - return (os_errno()); - (void)snprintf(*pp, len, "%s%s%s", - WT_NAME_PREFIX, p, suffix == NULL ? "" : suffix); - return (0); -} - -/* - * ws_source_close -- - * Close a WT_SOURCE reference. - */ -static int -ws_source_close(WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_SOURCE *ws) -{ - int ret = 0, tret; - - /* - * Warn if open cursors: it shouldn't happen because the upper layers of - * WiredTiger prevent it, so we don't do anything more than warn. - */ - if (ws->ref != 0) - EMSG(wt_api, session, WT_ERROR, - "%s: open object with %u open cursors being closed", - ws->uri, ws->ref); - - if (ws->he != NULL) { - if ((tret = he_commit(ws->he)) != 0) - EMSG(wt_api, session, tret, - "he_commit: %s: %s", ws->uri, he_strerror(tret)); - if ((tret = he_close(ws->he)) != 0) - EMSG(wt_api, session, tret, - "he_close: %s: %s", ws->uri, he_strerror(tret)); - ws->he = NULL; - } - if (ws->he_cache != NULL) { - if ((tret = he_close(ws->he_cache)) != 0) - EMSG(wt_api, session, tret, - "he_close: %s(cache): %s", - ws->uri, he_strerror(tret)); - ws->he_cache = NULL; - } - - if (ws->lockinit) - ESET(lock_destroy(wt_api, session, &ws->lock)); - - free(ws->uri); - OVERWRITE_AND_FREE(ws); - - return (ret); -} - -/* - * ws_source_open_object -- - * Open an object in the Helium store. - */ -static int -ws_source_open_object(WT_DATA_SOURCE *wtds, WT_SESSION *session, - HELIUM_SOURCE *hs, - const char *uri, const char *suffix, int flags, he_t *hep) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - he_t he; - char *p; - int ret = 0; - - *hep = NULL; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - p = NULL; - - /* Open the underlying Helium object. */ - WT_RET(ws_source_name(wtds, session, uri, suffix, &p)); - VMSG(wt_api, session, VERBOSE_L1, "open %s/%s", hs->name, p); - if ((he = he_open(hs->device, p, flags, NULL)) == NULL) { - ret = os_errno(); - EMSG(wt_api, session, ret, - "he_open: %s/%s: %s", hs->name, p, he_strerror(ret)); - } - *hep = he; - - free(p); - return (ret); -} - -#define WS_SOURCE_OPEN_BUSY 0x01 /* Fail if source busy */ -#define WS_SOURCE_OPEN_GLOBAL 0x02 /* Keep the global lock */ - -/* - * ws_source_open -- - * Return a locked WiredTiger source, allocating and opening if it doesn't - * already exist. - */ -static int -ws_source_open(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, WT_CONFIG_ARG *config, u_int flags, WT_SOURCE **refp) -{ - DATA_SOURCE *ds; - HELIUM_SOURCE *hs; - WT_CONFIG_ITEM a; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - size_t len; - int oflags, ret = 0; - const char *p, *t; - - *refp = NULL; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - ws = NULL; - - /* - * The URI will be "helium:" followed by a Helium name and object name - * pair separated by a slash, for example, "helium:volume/object". - */ - if (!prefix_match(uri, "helium:")) - goto bad_name; - p = uri + strlen("helium:"); - if (p[0] == '/' || (t = strchr(p, '/')) == NULL || t[1] == '\0') -bad_name: ERET(wt_api, session, EINVAL, "%s: illegal name format", uri); - len = (size_t)(t - p); - - /* Find a matching Helium device. */ - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - if (string_match(hs->name, p, len)) - break; - if (hs == NULL) - ERET(wt_api, NULL, - EINVAL, "%s: no matching Helium store found", uri); - - /* - * We're about to walk the Helium device's list of files, acquire the - * global lock. - */ - WT_RET(writelock(wt_api, session, &ds->global_lock)); - - /* - * Check for a match: if we find one, optionally trade the global lock - * for the object's lock, optionally check if the object is busy, and - * return. - */ - for (ws = hs->ws_head; ws != NULL; ws = ws->next) - if (strcmp(ws->uri, uri) == 0) { - /* Check to see if the object is busy. */ - if (ws->ref != 0 && (flags & WS_SOURCE_OPEN_BUSY)) { - ret = EBUSY; - ESET(unlock(wt_api, session, &ds->global_lock)); - return (ret); - } - /* Swap the global lock for an object lock. */ - if (!(flags & WS_SOURCE_OPEN_GLOBAL)) { - ret = writelock(wt_api, session, &ws->lock); - ESET(unlock(wt_api, session, &ds->global_lock)); - if (ret != 0) - return (ret); - } - *refp = ws; - return (0); - } - - /* Allocate and initialize a new underlying WiredTiger source object. */ - if ((ws = calloc(1, sizeof(*ws))) == NULL || - (ws->uri = strdup(uri)) == NULL) { - ret = os_errno(); - goto err; - } - WT_ERR(lock_init(wt_api, session, &ws->lock)); - ws->lockinit = true; - ws->hs = hs; - - /* - * Open the underlying Helium objects, then push the change. - * - * The naming scheme is simple: the URI names the primary store, and the - * URI with a trailing suffix names the associated caching store. - * - * We always set the create flag, our caller handles attempts to create - * existing objects. - */ - oflags = HE_O_CREATE; - if ((ret = wt_api->config_get(wt_api, - session, config, "helium_o_compress", &a)) == 0 && a.val != 0) - oflags |= HE_O_COMPRESS; - if (ret != 0 && ret != WT_NOTFOUND) - EMSG_ERR(wt_api, session, ret, - "helium_o_compress configuration: %s", - wt_api->strerror(wt_api, session, ret)); - if ((ret = wt_api->config_get(wt_api, - session, config, "helium_o_truncate", &a)) == 0 && a.val != 0) - oflags |= HE_O_TRUNCATE; - if (ret != 0 && ret != WT_NOTFOUND) - EMSG_ERR(wt_api, session, ret, - "helium_o_truncate configuration: %s", - wt_api->strerror(wt_api, session, ret)); - - WT_ERR(ws_source_open_object( - wtds, session, hs, uri, NULL, oflags, &ws->he)); - WT_ERR(ws_source_open_object( - wtds, session, hs, uri, WT_NAME_CACHE, HE_O_CREATE, &ws->he_cache)); - if ((ret = he_commit(ws->he)) != 0) - EMSG_ERR(wt_api, session, ret, - "he_commit: %s", he_strerror(ret)); - - /* Optionally trade the global lock for the object lock. */ - if (!(flags & WS_SOURCE_OPEN_GLOBAL)) - WT_ERR(writelock(wt_api, session, &ws->lock)); - - /* Insert the new entry at the head of the list. */ - ws->next = hs->ws_head; - hs->ws_head = ws; - - *refp = ws; - ws = NULL; - - if (0) { -err: if (ws != NULL) - ESET(ws_source_close(wt_api, session, ws)); - } - - /* - * If there was an error or our caller doesn't need the global lock, - * release the global lock. - */ - if (!(flags & WS_SOURCE_OPEN_GLOBAL) || ret != 0) - ESET(unlock(wt_api, session, &ds->global_lock)); - - return (ret); -} - -/* - * master_uri_get -- - * Get the Helium master record for a URI. - */ -static int -master_uri_get(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, char **valuep) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - return (wt_api->metadata_search(wt_api, session, uri, valuep)); -} - -/* - * master_uri_drop -- - * Drop the Helium master record for a URI. - */ -static int -master_uri_drop(WT_DATA_SOURCE *wtds, WT_SESSION *session, const char *uri) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - return (wt_api->metadata_remove(wt_api, session, uri)); -} - -/* - * master_uri_rename -- - * Rename the Helium master record for a URI. - */ -static int -master_uri_rename(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, const char *newuri) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - int ret = 0; - char *value; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - value = NULL; - - /* Insert the record under a new name. */ - WT_ERR(master_uri_get(wtds, session, uri, &value)); - WT_ERR(wt_api->metadata_insert(wt_api, session, newuri, value)); - - /* - * Remove the original record, and if that fails, attempt to remove - * the new record. - */ - if ((ret = wt_api->metadata_remove(wt_api, session, uri)) != 0) - (void)wt_api->metadata_remove(wt_api, session, newuri); - -err: free((void *)value); - return (ret); -} - -/* - * master_uri_set -- - * Set the Helium master record for a URI. - */ -static int -master_uri_set(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - WT_CONFIG_ITEM a, b, c; - WT_EXTENSION_API *wt_api; - int exclusive, ret = 0; - char value[1024]; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - exclusive = 0; - if ((ret = - wt_api->config_get(wt_api, session, config, "exclusive", &a)) == 0) - exclusive = a.val != 0; - else if (ret != WT_NOTFOUND) - ERET(wt_api, session, ret, - "exclusive configuration: %s", - wt_api->strerror(wt_api, session, ret)); - - /* Get the key/value format strings. */ - if ((ret = wt_api->config_get( - wt_api, session, config, "key_format", &a)) != 0) { - if (ret == WT_NOTFOUND) { - a.str = "u"; - a.len = 1; - } else - ERET(wt_api, session, ret, - "key_format configuration: %s", - wt_api->strerror(wt_api, session, ret)); - } - if ((ret = wt_api->config_get( - wt_api, session, config, "value_format", &b)) != 0) { - if (ret == WT_NOTFOUND) { - b.str = "u"; - b.len = 1; - } else - ERET(wt_api, session, ret, - "value_format configuration: %s", - wt_api->strerror(wt_api, session, ret)); - } - - /* Get the compression configuration. */ - if ((ret = wt_api->config_get( - wt_api, session, config, "helium_o_compress", &c)) != 0) { - if (ret == WT_NOTFOUND) - c.val = 0; - else - ERET(wt_api, session, ret, - "helium_o_compress configuration: %s", - wt_api->strerror(wt_api, session, ret)); - } - - /* - * Create a new reference using insert (which fails if the record - * already exists). - */ - (void)snprintf(value, sizeof(value), - "wiredtiger_helium_version=(major=%d,minor=%d)," - "key_format=%.*s,value_format=%.*s," - "helium_o_compress=%d", - WIREDTIGER_HELIUM_MAJOR, WIREDTIGER_HELIUM_MINOR, - (int)a.len, a.str, (int)b.len, b.str, c.val ? 1 : 0); - if ((ret = wt_api->metadata_insert(wt_api, session, uri, value)) == 0) - return (0); - if (ret == WT_DUPLICATE_KEY) - return (exclusive ? EEXIST : 0); - ERET(wt_api, session, - ret, "%s: %s", uri, wt_api->strerror(wt_api, session, ret)); -} - -/* - * helium_session_open_cursor -- - * WT_SESSION.open_cursor method. - */ -static int -helium_session_open_cursor(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, WT_CONFIG_ARG *config, WT_CURSOR **new_cursor) -{ - CURSOR *cursor; - DATA_SOURCE *ds; - WT_CONFIG_ITEM v; - WT_CONFIG_PARSER *config_parser; - WT_CURSOR *wtcursor; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - int locked, own, ret, tret; - char *value; - - *new_cursor = NULL; - - config_parser = NULL; - cursor = NULL; - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - ws = NULL; - locked = 0; - ret = tret = 0; - value = NULL; - - /* Allocate and initialize a cursor. */ - if ((cursor = calloc(1, sizeof(CURSOR))) == NULL) - return (os_errno()); - - if ((ret = wt_api->config_get( /* Parse configuration */ - wt_api, session, config, "append", &v)) != 0) - EMSG_ERR(wt_api, session, ret, - "append configuration: %s", - wt_api->strerror(wt_api, session, ret)); - cursor->config_append = v.val != 0; - - if ((ret = wt_api->config_get( - wt_api, session, config, "overwrite", &v)) != 0) - EMSG_ERR(wt_api, session, ret, - "overwrite configuration: %s", - wt_api->strerror(wt_api, session, ret)); - cursor->config_overwrite = v.val != 0; - - if ((ret = wt_api->collator_config( - wt_api, session, uri, config, NULL, &own)) != 0) - EMSG_ERR(wt_api, session, ret, - "collator configuration: %s", - wt_api->strerror(wt_api, session, ret)); - - /* Finish initializing the cursor. */ - cursor->wtcursor.close = helium_cursor_close; - cursor->wtcursor.insert = helium_cursor_insert; - cursor->wtcursor.next = helium_cursor_next; - cursor->wtcursor.prev = helium_cursor_prev; - cursor->wtcursor.remove = helium_cursor_remove; - cursor->wtcursor.reserve = helium_cursor_reserve; - cursor->wtcursor.reset = helium_cursor_reset; - cursor->wtcursor.search = helium_cursor_search; - cursor->wtcursor.search_near = helium_cursor_search_near; - cursor->wtcursor.update = helium_cursor_update; - - cursor->wt_api = wt_api; - cursor->record.key = cursor->__key; - if ((cursor->v = malloc(128)) == NULL) - goto err; - cursor->mem_len = 128; - - /* Get a locked reference to the WiredTiger source. */ - WT_ERR(ws_source_open(wtds, session, uri, config, 0, &ws)); - locked = 1; - cursor->ws = ws; - - /* - * If this is the first access to the URI, we have to configure it - * using information stored in the master record. - */ - if (!ws->configured) { - WT_ERR(master_uri_get(wtds, session, uri, &value)); - - if ((ret = wt_api->config_parser_open(wt_api, - session, value, strlen(value), &config_parser)) != 0) - EMSG_ERR(wt_api, session, ret, - "Configuration string parser: %s", - wt_api->strerror(wt_api, session, ret)); - if ((ret = config_parser->get( - config_parser, "key_format", &v)) != 0) - EMSG_ERR(wt_api, session, ret, - "key_format configuration: %s", - wt_api->strerror(wt_api, session, ret)); - ws->config_recno = v.len == 1 && v.str[0] == 'r'; - - if ((ret = config_parser->get( - config_parser, "value_format", &v)) != 0) - EMSG_ERR(wt_api, session, ret, - "value_format configuration: %s", - wt_api->strerror(wt_api, session, ret)); - ws->config_bitfield = v.len == 2 && - isdigit((u_char)v.str[0]) && v.str[1] == 't'; - - /* - * If it's a record-number key, read the last record from the - * object and set the allocation record value. - */ - if (ws->config_recno) { - wtcursor = (WT_CURSOR *)cursor; - WT_ERR(helium_cursor_reset(wtcursor)); - - if ((ret = helium_cursor_prev(wtcursor)) == 0) - ws->append_recno = wtcursor->recno; - else if (ret != WT_NOTFOUND) - goto err; - - WT_ERR(helium_cursor_reset(wtcursor)); - } - - ws->configured = true; - } - - /* Increment the open reference count to pin the URI and unlock it. */ - ++ws->ref; - WT_ERR(unlock(wt_api, session, &ws->lock)); - - *new_cursor = (WT_CURSOR *)cursor; - - if (0) { -err: if (ws != NULL && locked) - ESET(unlock(wt_api, session, &ws->lock)); - cursor_destroy(cursor); - } - if (config_parser != NULL && - (tret = config_parser->close(config_parser)) != 0) - EMSG(wt_api, session, tret, - "WT_CONFIG_PARSER.close: %s", - wt_api->strerror(wt_api, session, tret)); - - free((void *)value); - return (ret); -} - -/* - * helium_session_create -- - * WT_SESSION.create method. - */ -static int -helium_session_create(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* - * Get a locked reference to the WiredTiger source, then immediately - * unlock it, we aren't doing anything else. - */ - WT_RET(ws_source_open(wtds, session, uri, config, 0, &ws)); - WT_RET(unlock(wt_api, session, &ws->lock)); - - /* - * Create the URI master record if it doesn't already exist. - * - * We've discarded the lock, but that's OK, creates are single-threaded - * at the WiredTiger level, it's not our problem to solve. - * - * If unable to enter a WiredTiger record, leave the Helium store alone. - * A subsequent create should do the right thing, we aren't leaving - * anything in an inconsistent state. - */ - return (master_uri_set(wtds, session, uri, config)); -} - -/* - * helium_session_drop -- - * WT_SESSION.drop method. - */ -static int -helium_session_drop(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - WT_SOURCE **p, *ws; - int ret = 0; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* - * Get a locked reference to the data source: hold the global lock, - * we're changing the HELIUM_SOURCE's list of WT_SOURCE objects. - * - * Remove the entry from the WT_SOURCE list -- it's a singly-linked - * list, find the reference to it. - */ - WT_RET(ws_source_open(wtds, session, uri, config, - WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)); - hs = ws->hs; - for (p = &hs->ws_head; *p != NULL; p = &(*p)->next) - if (*p == ws) { - *p = (*p)->next; - break; - } - - /* Drop the underlying Helium objects. */ - ESET(he_remove(ws->he)); - ws->he = NULL; /* The handle is dead. */ - ESET(he_remove(ws->he_cache)); - ws->he_cache = NULL; /* The handle is dead. */ - - /* Close the source, discarding the structure. */ - ESET(ws_source_close(wt_api, session, ws)); - ws = NULL; - - /* Discard the metadata entry. */ - ESET(master_uri_drop(wtds, session, uri)); - - /* - * If we have an error at this point, panic -- there's an inconsistency - * in what WiredTiger knows about and the underlying store. - */ - if (ret != 0) - ret = WT_PANIC; - - ESET(unlock(wt_api, session, &ds->global_lock)); - return (ret); -} - -/* - * helium_session_rename -- - * WT_SESSION.rename method. - */ -static int -helium_session_rename(WT_DATA_SOURCE *wtds, WT_SESSION *session, - const char *uri, const char *newuri, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - int ret = 0; - char *p; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* - * Get a locked reference to the data source; hold the global lock, - * we are going to change the object's name, and we can't allow - * other threads walking the list and comparing against the name. - */ - WT_RET(ws_source_open(wtds, session, uri, config, - WS_SOURCE_OPEN_BUSY | WS_SOURCE_OPEN_GLOBAL, &ws)); - - /* Get a copy of the new name for the WT_SOURCE structure. */ - if ((p = strdup(newuri)) == NULL) { - ret = os_errno(); - goto err; - } - free(ws->uri); - ws->uri = p; - - /* Rename the underlying Helium objects. */ - ESET(ws_source_name(wtds, session, newuri, NULL, &p)); - if (ret == 0) { - ESET(he_rename(ws->he, p)); - free(p); - } - ESET(ws_source_name(wtds, session, newuri, WT_NAME_CACHE, &p)); - if (ret == 0) { - ESET(he_rename(ws->he_cache, p)); - free(p); - } - - /* Update the metadata record. */ - ESET(master_uri_rename(wtds, session, uri, newuri)); - - /* - * If we have an error at this point, panic -- there's an inconsistency - * in what WiredTiger knows about and the underlying store. - */ - if (ret != 0) - ret = WT_PANIC; - -err: ESET(unlock(wt_api, session, &ds->global_lock)); - - return (ret); -} - -/* - * helium_session_truncate -- - * WT_SESSION.truncate method. - */ -static int -helium_session_truncate(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - int ret = 0; - - (void)config; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* - * XXX - * Fail URI truncation for now. (Truncation based on a cursor range is - * handled by the upper-levels of WiredTiger, this is just support for - * URI truncation.) The problem is there's no way to truncate an open - * object in Helium without closing handles, and we can't close/re-open - * handles because we don't have the configuration information from the - * open. - */ - ERET(wt_api, session, ENOTSUP, "WT_SESSION.truncate: %s", uri); -} - -/* - * helium_session_verify -- - * WT_SESSION.verify method. - */ -static int -helium_session_verify(WT_DATA_SOURCE *wtds, - WT_SESSION *session, const char *uri, WT_CONFIG_ARG *config) -{ - (void)wtds; - (void)session; - (void)uri; - (void)config; - return (0); -} - -/* - * helium_session_checkpoint -- - * WT_SESSION.checkpoint method. - */ -static int -helium_session_checkpoint( - WT_DATA_SOURCE *wtds, WT_SESSION *session, WT_CONFIG_ARG *config) -{ - DATA_SOURCE *ds; - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - int ret = 0; - - (void)config; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* Flush all volumes. */ - if ((hs = ds->hs_head) != NULL && - (ret = he_commit(hs->he_volume)) != 0) - ERET(wt_api, session, ret, - "he_commit: %s: %s", hs->device, he_strerror(ret)); - - return (0); -} - -/* - * helium_source_close -- - * Discard a HELIUM_SOURCE. - */ -static int -helium_source_close( - WT_EXTENSION_API *wt_api, WT_SESSION *session, HELIUM_SOURCE *hs) -{ - WT_SOURCE *ws; - int ret = 0, tret; - - /* Resolve the cache into the primary one last time and quit. */ - if (hs->cleaner_id != 0) { - hs->cleaner_stop = 1; - - if ((tret = pthread_join(hs->cleaner_id, NULL)) != 0) - EMSG(wt_api, session, tret, - "pthread_join: %s", strerror(tret)); - hs->cleaner_id = 0; - } - - /* Close the underlying WiredTiger sources. */ - while ((ws = hs->ws_head) != NULL) { - hs->ws_head = ws->next; - ESET(ws_source_close(wt_api, session, ws)); - } - - /* If the owner, close the database transaction store. */ - if (hs->he_txn != NULL && hs->he_owner) { - if ((tret = he_close(hs->he_txn)) != 0) - EMSG(wt_api, session, tret, - "he_close: %s: %s: %s", - hs->name, WT_NAME_TXN, he_strerror(tret)); - hs->he_txn = NULL; - hs->he_owner = false; - } - - /* Flush and close the Helium source. */ - if (hs->he_volume != NULL) { - if ((tret = he_commit(hs->he_volume)) != 0) - EMSG(wt_api, session, tret, - "he_commit: %s: %s", - hs->device, he_strerror(tret)); - - if ((tret = he_close(hs->he_volume)) != 0) - EMSG(wt_api, session, tret, - "he_close: %s: %s: %s", - hs->name, WT_NAME_INIT, he_strerror(tret)); - hs->he_volume = NULL; - } - - free(hs->name); - free(hs->device); - OVERWRITE_AND_FREE(hs); - - return (ret); -} - -/* - * cache_cleaner -- - * Migrate information from the cache to the primary store. - */ -static int -cache_cleaner(WT_EXTENSION_API *wt_api, - WT_CURSOR *wtcursor, uint64_t oldest, uint64_t *txnminp) -{ - CACHE_RECORD *cp; - CURSOR *cursor; - HE_ITEM *r; - WT_SOURCE *ws; - uint64_t txnid; - int locked, pushed, recovery, ret = 0; - - /* - * Called in two ways: in normal processing mode where we're supplied a - * value for the oldest transaction ID not yet visible to a running - * transaction, and we're tracking the smallest transaction ID - * referenced by any cache entry, and in recovery mode where neither of - * those are true. - */ - if (txnminp == NULL) - recovery = 1; - else { - recovery = 0; - *txnminp = UINT64_MAX; - } - - cursor = (CURSOR *)wtcursor; - ws = cursor->ws; - r = &cursor->record; - locked = pushed = 0; - - /* - * For every cache key where all updates are globally visible: - * Migrate the most recent update value to the primary store. - */ - for (r->key_len = 0; (ret = - helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) { - /* - * Unmarshall the value, and if all of the updates are globally - * visible, update the primary with the last committed update. - * In normal processing, the last committed update test is for - * a globally visible update that's not explicitly aborted. In - * recovery processing, the last committed update test is for - * an explicitly committed update. See the underlying functions - * for more information. - */ - if ((ret = cache_value_unmarshall(wtcursor)) != 0) - goto err; - if (!recovery && !cache_value_visible_all(wtcursor, oldest)) - continue; - if (recovery) - cache_value_last_committed(wtcursor, &cp); - else - cache_value_last_not_aborted(wtcursor, &cp); - if (cp == NULL) - continue; - - pushed = 1; - if (cp->remove) { - if ((ret = he_delete(ws->he, r)) == 0) - continue; - - /* - * Updates confined to the cache may not appear in the - * primary at all, that is, an insert and remove pair - * may be confined to the cache. - */ - if (ret == HE_ERR_ITEM_NOT_FOUND) { - ret = 0; - continue; - } - ERET(wt_api, NULL, ret, - "he_delete: %s", he_strerror(ret)); - } else { - r->val = cp->v; - r->val_len = cp->len; - ret = he_update(ws->he, r); - if (ret == 0) - continue; - - ERET(wt_api, NULL, ret, - "he_update: %s", he_strerror(ret)); - } - } - - if (ret == WT_NOTFOUND) - ret = 0; - if (ret != 0) - ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret)); - - /* - * If we didn't move any keys from the cache to the primary, quit. It's - * possible we could still remove values from the cache, but not likely, - * and another pass would probably be wasted effort (especially locked). - */ - if (!pushed) - return (0); - - /* - * Push the store to stable storage for correctness. (It doesn't matter - * what Helium handle we commit, so we just commit one of them.) - */ - if ((ret = he_commit(ws->he)) != 0) - ERET(wt_api, NULL, ret, "he_commit: %s", he_strerror(ret)); - - /* - * If we're performing recovery, that's all we need to do, we're going - * to simply discard the cache, there's no reason to remove entries one - * at a time. - */ - if (recovery) - return (0); - - /* - * For every cache key where all updates are globally visible: - * Remove the cache key. - * - * We're updating the cache, which requires a lock during normal - * cleaning. - */ - WT_ERR(writelock(wt_api, NULL, &ws->lock)); - locked = 1; - - for (r->key_len = 0; (ret = - helium_call(wtcursor, "he_next", ws->he_cache, he_next)) == 0;) { - /* - * Unmarshall the value, and if all of the updates are globally - * visible, remove the cache entry. - */ - WT_ERR(cache_value_unmarshall(wtcursor)); - if (cache_value_visible_all(wtcursor, oldest)) { - if ((ret = he_delete(ws->he_cache, r)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "he_delete: %s", he_strerror(ret)); - continue; - } - - /* - * If the entry will remain in the cache, figure out the oldest - * transaction for which it contains an update (which might be - * different from the oldest transaction in the system). We - * need the oldest transaction ID that appears anywhere in any - * cache, it limits the records we can discard from the - * transaction store. - */ - cache_value_txnmin(wtcursor, &txnid); - if (txnid < *txnminp) - *txnminp = txnid; - } - - locked = 0; - WT_ERR(unlock(wt_api, NULL, &ws->lock)); - if (ret == WT_NOTFOUND) - ret = 0; - if (ret != 0) - EMSG_ERR(wt_api, NULL, ret, "he_next: %s", he_strerror(ret)); - -err: if (locked) - ESET(unlock(wt_api, NULL, &ws->lock)); - - return (ret); -} - -/* - * txn_cleaner -- - * Discard no longer needed entries from the transaction store. - */ -static int -txn_cleaner(WT_CURSOR *wtcursor, he_t he_txn, uint64_t txnmin) -{ - CURSOR *cursor; - HE_ITEM *r; - WT_EXTENSION_API *wt_api; - uint64_t txnid; - int ret = 0; - - cursor = (CURSOR *)wtcursor; - wt_api = cursor->wt_api; - r = &cursor->record; - - /* - * Remove all entries from the transaction store that are before the - * oldest transaction ID that appears anywhere in any cache. - */ - for (r->key_len = 0; - (ret = helium_call(wtcursor, "he_next", he_txn, he_next)) == 0;) { - memcpy(&txnid, r->key, sizeof(txnid)); - if (txnid < txnmin && (ret = he_delete(he_txn, r)) != 0) - ERET(wt_api, NULL, ret, - "he_delete: %s", he_strerror(ret)); - } - if (ret == WT_NOTFOUND) - ret = 0; - if (ret != 0) - ERET(wt_api, NULL, ret, "he_next: %s", he_strerror(ret)); - - return (0); -} - -/* - * fake_cursor -- - * Fake up enough of a cursor to do Helium operations. - */ -static int -fake_cursor(WT_EXTENSION_API *wt_api, WT_CURSOR **wtcursorp) -{ - CURSOR *cursor; - WT_CURSOR *wtcursor; - - /* - * Fake a cursor. - */ - if ((cursor = calloc(1, sizeof(CURSOR))) == NULL) - return (os_errno()); - cursor->wt_api = wt_api; - cursor->record.key = cursor->__key; - if ((cursor->v = malloc(128)) == NULL) { - free(cursor); - return (os_errno()); - } - cursor->mem_len = 128; - - /* - * !!! - * Fake cursors don't have WT_SESSION handles. - */ - wtcursor = (WT_CURSOR *)cursor; - wtcursor->session = NULL; - - *wtcursorp = wtcursor; - return (0); -} - -/* - * cache_cleaner_worker -- - * Thread to migrate data from the cache to the primary. - */ -static void * -cache_cleaner_worker(void *arg) -{ - struct timeval t; - CURSOR *cursor; - HELIUM_SOURCE *hs; - WT_CURSOR *wtcursor; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - uint64_t oldest, txnmin, txntmp; - int cleaner_stop, delay, ret = 0; - - hs = (HELIUM_SOURCE *)arg; - - cursor = NULL; - wt_api = hs->wt_api; - - if ((ret = fake_cursor(wt_api, &wtcursor)) != 0) - EMSG_ERR(wt_api, NULL, ret, "cleaner: %s", strerror(ret)); - cursor = (CURSOR *)wtcursor; - - for (cleaner_stop = delay = 0; !cleaner_stop;) { - /* - * Check if this will be the final run; cleaner_stop is declared - * volatile, and so the read will happen. We don't much care if - * there's extra loops, it's enough if a read eventually happens - * and finds the variable set. Store the read locally, reading - * the variable twice might race. - */ - cleaner_stop = hs->cleaner_stop; - - /* - * Delay if this isn't the final run and the last pass didn't - * find any work to do. - */ - if (!cleaner_stop && delay != 0) { - t.tv_sec = delay; - t.tv_usec = 0; - (void)select(0, NULL, NULL, NULL, &t); - } - - /* Run at least every 5 seconds. */ - if (delay < 5) - ++delay; - - /* - * Clean the datastore caches. It's both more and less expensive - * to return values from the cache: more because we have to - * marshall/unmarshall the values, less because there's only a - * single lookup to the cache store rather than a lookup into - * the cache and then the primary. I have no tuning information, - * for now, just clean if there have been 1K operations. - */ -#undef CACHE_SIZE_TRIGGER -#define CACHE_SIZE_TRIGGER (1000) - for (ws = hs->ws_head; ws != NULL; ws = ws->next) - if (ws->he_cache_ops > CACHE_SIZE_TRIGGER) - break; - if (!cleaner_stop && ws == NULL) - continue; - - /* There was work to do, don't delay before checking again. */ - delay = 0; - - /* - * Get the oldest transaction ID not yet visible to a running - * transaction. Do this before doing anything else, avoiding - * any race with creating new WT_SOURCE handles. - */ - oldest = wt_api->transaction_oldest(wt_api); - - /* - * If any cache needs cleaning, clean them all, because we have - * to know the minimum transaction ID referenced by any cache. - * - * For each cache/primary pair, migrate whatever records we can, - * tracking the lowest transaction ID of any entry in any cache. - */ - txnmin = UINT64_MAX; - for (ws = hs->ws_head; ws != NULL; ws = ws->next) { - /* Reset the operations counter. */ - ws->he_cache_ops = 0; - - cursor->ws = ws; - WT_ERR(cache_cleaner( - wt_api, wtcursor, oldest, &txntmp)); - if (txntmp < txnmin) - txnmin = txntmp; - } - - /* - * Discard any transactions less than the minimum transaction ID - * referenced in any cache. - * - * !!! - * I'm playing fast-and-loose with whether or not the cursor - * references an underlying WT_SOURCE, there's a structural - * problem here. - */ - cursor->ws = NULL; - WT_ERR(txn_cleaner(wtcursor, hs->he_txn, txnmin)); - } - -err: cursor_destroy(cursor); - return (NULL); -} - -/* - * helium_config_read -- - * Parse the Helium configuration. - */ -static int -helium_config_read(WT_EXTENSION_API *wt_api, WT_CONFIG_ITEM *config, - char **devicep, HE_ENV *envp, int *env_setp, int *flagsp) -{ - WT_CONFIG_ITEM k, v; - WT_CONFIG_PARSER *config_parser; - int ret = 0, tret; - - *env_setp = 0; - *flagsp = 0; - - /* Traverse the configuration arguments list. */ - if ((ret = wt_api->config_parser_open( - wt_api, NULL, config->str, config->len, &config_parser)) != 0) - ERET(wt_api, NULL, ret, - "WT_EXTENSION_API.config_parser_open: %s", - wt_api->strerror(wt_api, NULL, ret)); - while ((ret = config_parser->next(config_parser, &k, &v)) == 0) { - if (string_match("helium_devices", k.str, k.len)) { - if ((*devicep = calloc(1, v.len + 1)) == NULL) - return (os_errno()); - memcpy(*devicep, v.str, v.len); - continue; - } - if (string_match("helium_read_cache", k.str, k.len)) { - envp->read_cache = (uint64_t)v.val; - *env_setp = 1; - continue; - } - if (string_match("helium_write_cache", k.str, k.len)) { - envp->write_cache = (uint64_t)v.val; - *env_setp = 1; - continue; - } - if (string_match("helium_o_volume_truncate", k.str, k.len)) { - if (v.val != 0) - *flagsp |= HE_O_VOLUME_TRUNCATE; - continue; - } - EMSG_ERR(wt_api, NULL, EINVAL, - "unknown configuration key value pair %.*s=%.*s", - (int)k.len, k.str, (int)v.len, v.str); - } - if (ret == WT_NOTFOUND) - ret = 0; - if (ret != 0) - EMSG_ERR(wt_api, NULL, ret, - "WT_CONFIG_PARSER.next: %s", - wt_api->strerror(wt_api, NULL, ret)); - -err: if ((tret = config_parser->close(config_parser)) != 0) - EMSG(wt_api, NULL, tret, - "WT_CONFIG_PARSER.close: %s", - wt_api->strerror(wt_api, NULL, tret)); - - return (ret); -} - -/* - * helium_source_open -- - * Allocate and open a Helium source. - */ -static int -helium_source_open(DATA_SOURCE *ds, WT_CONFIG_ITEM *k, WT_CONFIG_ITEM *v) -{ - struct he_env env; - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - int env_set, flags, ret = 0; - - wt_api = ds->wt_api; - hs = NULL; - - VMSG(wt_api, NULL, VERBOSE_L1, "volume %.*s=%.*s", - (int)k->len, k->str, (int)v->len, v->str); - - /* - * Check for a Helium source we've already opened: we don't check the - * value (which implies you can open the same underlying stores using - * more than one name, but I don't know of any problems that causes), - * we only check the key, that is, the top-level WiredTiger name. - */ - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - if (string_match(hs->name, k->str, k->len)) - ERET(wt_api, NULL, - EINVAL, "%s: device already open", hs->name); - - /* Allocate and initialize a new underlying Helium source object. */ - if ((hs = calloc(1, sizeof(*hs))) == NULL || - (hs->name = calloc(1, k->len + 1)) == NULL) { - free(hs); - return (os_errno()); - } - memcpy(hs->name, k->str, k->len); - hs->txn_notify.notify = txn_notify; - hs->wt_api = wt_api; - - /* Read the configuration, require a device naming the Helium store. */ - memset(&env, 0, sizeof(env)); - WT_ERR(helium_config_read( - wt_api, v, &hs->device, &env, &env_set, &flags)); - if (hs->device == NULL) - EMSG_ERR(wt_api, NULL, - EINVAL, "%s: no Helium volumes specified", hs->name); - - /* - * Open the Helium volume, creating it if necessary. We have to open - * an object at the same time, that's why we have object flags as well - * as volume flags. - */ - flags |= HE_O_CREATE | HE_O_TRUNCATE | HE_O_CLEAN | HE_O_VOLUME_CREATE; - if ((hs->he_volume = he_open( - hs->device, WT_NAME_INIT, flags, env_set ? &env : NULL)) == NULL) { - ret = os_errno(); - EMSG_ERR(wt_api, NULL, ret, - "he_open: %s: %s: %s", - hs->name, WT_NAME_INIT, he_strerror(ret)); - } - - /* Insert the new entry at the head of the list. */ - hs->next = ds->hs_head; - ds->hs_head = hs; - - if (0) { -err: if (hs != NULL) - ESET(helium_source_close(wt_api, NULL, hs)); - } - return (ret); -} - -/* - * helium_source_txn_open -- - * Open the database-wide transaction store. - */ -static int -helium_source_txn_open(DATA_SOURCE *ds) -{ - HELIUM_SOURCE *hs, *hs_txn; - WT_EXTENSION_API *wt_api; - he_t he_txn, t; - int ret = 0; - - wt_api = ds->wt_api; - - /* - * The global txn namespace is per connection, it spans multiple Helium - * sources. - * - * We've opened the Helium sources: check to see if any of them already - * have a transaction store, and make sure we only find one. - */ - hs_txn = NULL; - he_txn = NULL; - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - if ((t = he_open(hs->device, WT_NAME_TXN, 0, NULL)) != NULL) { - if (hs_txn != NULL) { - (void)he_close(t); - (void)he_close(hs_txn); - ERET(wt_api, NULL, WT_PANIC, - "found multiple transaction stores, " - "unable to proceed"); - } - he_txn = t; - hs_txn = hs; - } - - /* - * If we didn't find a transaction store, open a transaction store in - * the first Helium source we loaded. (It could just as easily be the - * last one we loaded, we're just picking one, but picking the first - * seems slightly less likely to make people wonder.) - */ - if ((hs = hs_txn) == NULL) { - for (hs = ds->hs_head; hs->next != NULL; hs = hs->next) - ; - if ((he_txn = he_open( - hs->device, WT_NAME_TXN, HE_O_CREATE, NULL)) == NULL) { - ret = os_errno(); - ERET(wt_api, NULL, ret, - "he_open: %s: %s: %s", - hs->name, WT_NAME_TXN, he_strerror(ret)); - } - - /* Push the change. */ - if ((ret = he_commit(he_txn)) != 0) - ERET(wt_api, NULL, ret, - "he_commit: %s", he_strerror(ret)); - } - VMSG(wt_api, NULL, VERBOSE_L1, "%s" "transactional store on %s", - hs_txn == NULL ? "creating " : "", hs->name); - - /* Set the owner field, this Helium source has to be closed last. */ - hs->he_owner = true; - - /* Add a reference to the transaction store in each Helium source. */ - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - hs->he_txn = he_txn; - - return (0); -} - -/* - * helium_source_txn_truncate -- - * Truncate the database-wide transaction store. - */ -static int -helium_source_txn_truncate(DATA_SOURCE *ds) -{ - HELIUM_SOURCE *hs; - WT_EXTENSION_API *wt_api; - int ret = 0; - - wt_api = ds->wt_api; - - /* - * We want to truncate the transaction store after recovery, but there - * isn't a Helium truncate operation. Remove/re-open the store instead. - */ - hs = ds->hs_head; - if (hs->he_txn != NULL && (ret = he_remove(hs->he_txn)) != 0) - ERET(wt_api, NULL, ret, - "he_remove: %s: %s: %s", - hs->name, WT_NAME_TXN, he_strerror(ret)); - - /* The handle is dead, clear any references. */ - for (hs = ds->hs_head; hs != NULL; hs = hs->next) { - hs->he_txn = NULL; - hs->he_owner = false; - } - - return (helium_source_txn_open(ds)); -} - -/* - * helium_source_recover_namespace -- - * Recover a single cache/primary pair in a Helium namespace. - */ -static int -helium_source_recover_namespace(WT_DATA_SOURCE *wtds, - HELIUM_SOURCE *hs, const char *name, WT_CONFIG_ARG *config) -{ - CURSOR *cursor; - DATA_SOURCE *ds; - WT_CURSOR *wtcursor; - WT_EXTENSION_API *wt_api; - WT_SOURCE *ws; - size_t len; - int ret = 0; - const char *p; - char *uri; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - cursor = NULL; - ws = NULL; - uri = NULL; - - /* - * The name we store on the Helium device is a translation of the - * WiredTiger name: do the reverse process here so we can use the - * standard source-open function. - */ - p = name + strlen(WT_NAME_PREFIX); - len = strlen("helium:") + strlen(hs->name) + strlen(p) + 10; - if ((uri = malloc(len)) == NULL) { - ret = os_errno(); - goto err; - } - (void)snprintf(uri, len, "helium:%s/%s", hs->name, p); - - /* - * Open the cache/primary pair by going through the full open process, - * instantiating the underlying WT_SOURCE object. - */ - WT_ERR(ws_source_open(wtds, NULL, uri, config, 0, &ws)); - WT_ERR(unlock(wt_api, NULL, &ws->lock)); - - /* Fake up a cursor. */ - if ((ret = fake_cursor(wt_api, &wtcursor)) != 0) - EMSG_ERR(wt_api, NULL, ret, "recovery: %s", strerror(ret)); - cursor = (CURSOR *)wtcursor; - cursor->ws = ws; - - /* Process, then remove, the cache. */ - WT_ERR(cache_cleaner(wt_api, wtcursor, 0, NULL)); - - if ((ret = he_remove(ws->he_cache)) != 0) - EMSG(wt_api, NULL, ret, - "he_remove: %s(cache): %s", ws->uri, he_strerror(ret)); - ws->he_cache = NULL; /* The handle is dead. */ - - /* Close the underlying WiredTiger sources. */ -err: while ((ws = hs->ws_head) != NULL) { - hs->ws_head = ws->next; - ESET(ws_source_close(wt_api, NULL, ws)); - } - - cursor_destroy(cursor); - free(uri); - - return (ret); -} - -struct helium_namespace_cookie { - char **list; - u_int list_cnt; - u_int list_max; -}; - -/* - * helium_namespace_list -- - * Get a list of the objects we're going to recover. - */ -static int -helium_namespace_list(void *cookie, const char *name) -{ - struct helium_namespace_cookie *names; - void *allocp; - - names = cookie; - - /* - * Ignore any files without a WiredTiger prefix. - * Ignore the metadata and cache files. - */ - if (!prefix_match(name, WT_NAME_PREFIX)) - return (0); - if (strcmp(name, WT_NAME_INIT) == 0) - return (0); - if (strcmp(name, WT_NAME_TXN) == 0) - return (0); - if (string_match( - strrchr(name, '.'), WT_NAME_CACHE, strlen(WT_NAME_CACHE))) - return (0); - - if (names->list_cnt + 1 >= names->list_max) { - if ((allocp = realloc(names->list, - (names->list_max + 20) * sizeof(names->list[0]))) == NULL) - return (os_errno()); - names->list = allocp; - names->list_max += 20; - } - if ((names->list[names->list_cnt] = strdup(name)) == NULL) - return (os_errno()); - ++names->list_cnt; - names->list[names->list_cnt] = NULL; - return (0); -} - -/* - * helium_source_recover -- - * Recover the HELIUM_SOURCE. - */ -static int -helium_source_recover( - WT_DATA_SOURCE *wtds, HELIUM_SOURCE *hs, WT_CONFIG_ARG *config) -{ - struct helium_namespace_cookie names; - DATA_SOURCE *ds; - WT_EXTENSION_API *wt_api; - u_int i; - int ret = 0; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - memset(&names, 0, sizeof(names)); - - VMSG(wt_api, NULL, VERBOSE_L1, "recover %s", hs->name); - - /* Get a list of the cache/primary object pairs in the Helium source. */ - if ((ret = he_enumerate( - hs->device, helium_namespace_list, &names)) != 0) - ERET(wt_api, NULL, ret, - "he_enumerate: %s: %s", hs->name, he_strerror(ret)); - - /* Recover the objects. */ - for (i = 0; i < names.list_cnt; ++i) - WT_ERR(helium_source_recover_namespace( - wtds, hs, names.list[i], config)); - -err: for (i = 0; i < names.list_cnt; ++i) - free(names.list[i]); - free(names.list); - - return (ret); -} - -/* - * helium_terminate -- - * Unload the data-source. - */ -static int -helium_terminate(WT_DATA_SOURCE *wtds, WT_SESSION *session) -{ - DATA_SOURCE *ds; - HELIUM_SOURCE *hs, *last; - WT_EXTENSION_API *wt_api; - int ret = 0; - - ds = (DATA_SOURCE *)wtds; - wt_api = ds->wt_api; - - /* Lock the system down. */ - if (ds->lockinit) - ret = writelock(wt_api, session, &ds->global_lock); - - /* - * Close the Helium sources, close the Helium source that "owns" the - * database transaction store last. - */ - last = NULL; - while ((hs = ds->hs_head) != NULL) { - ds->hs_head = hs->next; - if (hs->he_owner) { - last = hs; - continue; - } - ESET(helium_source_close(wt_api, session, hs)); - } - if (last != NULL) - ESET(helium_source_close(wt_api, session, last)); - - /* Unlock and destroy the system. */ - if (ds->lockinit) { - ESET(unlock(wt_api, session, &ds->global_lock)); - ESET(lock_destroy(wt_api, NULL, &ds->global_lock)); - } - - OVERWRITE_AND_FREE(ds); - - return (ret); -} - -/* - * wiredtiger_extension_init -- - * Initialize the Helium connector code. - */ -int -wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) -{ - /* - * List of the WT_DATA_SOURCE methods -- it's static so it breaks at - * compile-time should the structure change underneath us. - */ - static const WT_DATA_SOURCE wtds = { - NULL, /* No session.alter */ - helium_session_create, /* session.create */ - NULL, /* No session.compaction */ - helium_session_drop, /* session.drop */ - helium_session_open_cursor, /* session.open_cursor */ - helium_session_rename, /* session.rename */ - NULL, /* No session.salvage */ - helium_session_truncate, /* session.truncate */ - NULL, /* No session.range_truncate */ - helium_session_verify, /* session.verify */ - helium_session_checkpoint, /* session.checkpoint */ - helium_terminate /* termination */ - }; - static const char *session_create_opts[] = { - "helium_o_compress=0", /* HE_O_COMPRESS */ - "helium_o_truncate=0", /* HE_O_TRUNCATE */ - NULL - }; - DATA_SOURCE *ds; - HELIUM_SOURCE *hs; - WT_CONFIG_ITEM k, v; - WT_CONFIG_PARSER *config_parser; - WT_EXTENSION_API *wt_api; - int vmajor, vminor, vpatch, ret = 0; - const char **p; - - config_parser = NULL; - ds = NULL; - - wt_api = connection->get_extension_api(connection); - - /* Check the library version */ -#if HE_VERSION_MAJOR != 2 || HE_VERSION_MINOR != 12 - ERET(wt_api, NULL, EINVAL, - "unsupported Helium header file %d.%d, expected version 2.12", - HE_VERSION_MAJOR, HE_VERSION_MINOR); -#endif - (void)he_version(&vmajor, &vminor, &vpatch); - if (vmajor != 2 || vminor != 12) - ERET(wt_api, NULL, EINVAL, - "unsupported Helium library version %d.%d, expected " - "version 2.12", vmajor, vminor); - - /* Allocate and initialize the local data-source structure. */ - if ((ds = calloc(1, sizeof(DATA_SOURCE))) == NULL) - return (os_errno()); - ds->wtds = wtds; - ds->wt_api = wt_api; - WT_ERR(lock_init(wt_api, NULL, &ds->global_lock)); - ds->lockinit = true; - - /* Step through the list of Helium sources, opening each one. */ - if ((ret = wt_api->config_parser_open_arg( - wt_api, NULL, config, &config_parser)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "WT_EXTENSION_API.config_parser_open: config: %s", - wt_api->strerror(wt_api, NULL, ret)); - while ((ret = config_parser->next(config_parser, &k, &v)) == 0) { - if (string_match("helium_verbose", k.str, k.len)) { - verbose = v.val == 0 ? 0 : 1; - continue; - } - if ((ret = helium_source_open(ds, &k, &v)) != 0) - goto err; - } - if (ret != WT_NOTFOUND) - EMSG_ERR(wt_api, NULL, ret, - "WT_CONFIG_PARSER.next: config: %s", - wt_api->strerror(wt_api, NULL, ret)); - if ((ret = config_parser->close(config_parser)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "WT_CONFIG_PARSER.close: config: %s", - wt_api->strerror(wt_api, NULL, ret)); - config_parser = NULL; - - /* - * Find and open the database transaction store, recover each Helium - * source, then discard the transaction store's contents. - */ - WT_ERR(helium_source_txn_open(ds)); - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - WT_ERR(helium_source_recover(&ds->wtds, hs, config)); - WT_ERR(helium_source_txn_truncate(ds)); - - /* Start each Helium source cleaner thread. */ - for (hs = ds->hs_head; hs != NULL; hs = hs->next) - if ((ret = pthread_create( - &hs->cleaner_id, NULL, cache_cleaner_worker, hs)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "%s: pthread_create: cleaner thread: %s", - hs->name, strerror(ret)); - - /* Add Helium-specific WT_SESSION.create configuration options. */ - for (p = session_create_opts; *p != NULL; ++p) - if ((ret = connection->configure_method(connection, - "WT_SESSION.create", "helium:", *p, "boolean", NULL)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "WT_CONNECTION.configure_method: session.create: " - "%s: %s", - *p, wt_api->strerror(wt_api, NULL, ret)); - - /* Add the data source */ - if ((ret = connection->add_data_source( - connection, "helium:", (WT_DATA_SOURCE *)ds, NULL)) != 0) - EMSG_ERR(wt_api, NULL, ret, - "WT_CONNECTION.add_data_source: %s", - wt_api->strerror(wt_api, NULL, ret)); - return (0); - -err: if (ds != NULL) - ESET(helium_terminate((WT_DATA_SOURCE *)ds, NULL)); - if (config_parser != NULL) - (void)config_parser->close(config_parser); - return (ret); -} - -/* - * wiredtiger_extension_terminate -- - * Shutdown the Helium connector code. - */ -int -wiredtiger_extension_terminate(WT_CONNECTION *connection) -{ - (void)connection; /* Unused parameters */ - - return (0); -} diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 280259ae8f2..77c0a6a895c 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -1,5 +1,5 @@ { - "commit": "d9ec69f9111b036ee0b19b47368e15bff8d4818d", + "commit": "315563f28850026673ebb146b6f3d727178e58bc", "github": "wiredtiger/wiredtiger.git", "vendor": "wiredtiger", "branch": "mongodb-4.2" diff --git a/src/third_party/wiredtiger/lang/python/Makefile.am b/src/third_party/wiredtiger/lang/python/Makefile.am index 3f29ea6aeea..ace11dfa5b9 100644 --- a/src/third_party/wiredtiger/lang/python/Makefile.am +++ b/src/third_party/wiredtiger/lang/python/Makefile.am @@ -2,7 +2,7 @@ PYSRC = $(top_srcdir)/lang/python PYDIRS = -t $(abs_builddir) -I $(abs_top_srcdir):$(abs_top_builddir) -L $(abs_top_builddir)/.libs PYDST = $(abs_builddir)/wiredtiger PYFILES = $(PYDST)/fpacking.py $(PYDST)/intpacking.py $(PYDST)/packing.py \ - $(PYDST)/__init__.py + $(PYDST)/packutil.py $(PYDST)/__init__.py PY_MAJOR_VERSION := $$($(PYTHON) -c \ 'import sys; print(int(sys.version_info.major))') diff --git a/src/third_party/wiredtiger/lang/python/setup_pip.py b/src/third_party/wiredtiger/lang/python/setup_pip.py index 9edd0b24a41..cafa05a9732 100644..100755 --- a/src/third_party/wiredtiger/lang/python/setup_pip.py +++ b/src/third_party/wiredtiger/lang/python/setup_pip.py @@ -98,7 +98,7 @@ def check_needed_dependencies(builtins, inc_paths, lib_paths): # Locate an executable in the PATH. def find_executable(exename, path): p = subprocess.Popen(['which', exename ], stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, universal_newlines=True) out, err = p.communicate('') out = str(out) # needed for Python3 if out == '': @@ -146,7 +146,8 @@ def get_sources_curdir(): DEVNULL = open(os.devnull, 'w') gitproc = subprocess.Popen( ['git', 'ls-tree', '-r', '--name-only', 'HEAD^{tree}'], - stdin=DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdin=DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) sources = [line.rstrip() for line in gitproc.stdout.readlines()] err = gitproc.stderr.read() gitproc.wait() @@ -176,6 +177,7 @@ def get_library_dirs(): dirs.append("/usr/local/lib") dirs.append("/usr/local/lib64") dirs.append("/lib/x86_64-linux-gnu") + dirs.append("/usr/lib/x86_64-linux-gnu") dirs.append("/opt/local/lib") dirs.append("/usr/lib") dirs.append("/usr/lib64") @@ -231,10 +233,6 @@ elif os.path.isfile(os.path.join(this_dir, 'LICENSE')): else: die('running from an unknown directory') -python3 = (sys.version_info[0] > 2) -if python3: - die('Python3 is not yet supported') - # Ensure that Extensions won't be built for 32 bit, # that won't work with WiredTiger. if sys.maxsize < 2**32: diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger/intpacking.py b/src/third_party/wiredtiger/lang/python/wiredtiger/intpacking.py index ce28ea8ae2d..b20aee919a0 100755 --- a/src/third_party/wiredtiger/lang/python/wiredtiger/intpacking.py +++ b/src/third_party/wiredtiger/lang/python/wiredtiger/intpacking.py @@ -28,8 +28,12 @@ # from __future__ import print_function -from wiredtiger.packing import _chr, _ord import math, struct, sys +try: + from wiredtiger.packutil import _chr, _ord, x00_entry, xff_entry +except ImportError: + # When WiredTiger is installed as a package, python2 needs this + from .packutil import _chr, _ord, x00_entry, xff_entry # Variable-length integer packing # need: up to 64 bits, both signed and unsigned @@ -64,14 +68,6 @@ POS_2BYTE_MAX = 2**13 + POS_1BYTE_MAX MINUS_BIT = -1 << 64 UINT64_MASK = 0xffffffffffffffff -_python3 = (sys.version_info >= (3, 0, 0)) -if _python3: - xff_entry = 0xff - x00_entry = 0x00 -else: - xff_entry = '\xff' - x00_entry = '\x00' - def getbits(x, start, end=0): '''return the least significant bits of x, from start to end''' return (x & ((1 << start) - 1)) >> (end) diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger/packing.py b/src/third_party/wiredtiger/lang/python/wiredtiger/packing.py index 31b12a58126..47b6af1c786 100755 --- a/src/third_party/wiredtiger/lang/python/wiredtiger/packing.py +++ b/src/third_party/wiredtiger/lang/python/wiredtiger/packing.py @@ -49,50 +49,15 @@ Format Python Notes u str raw byte array """ -import sys - -# In the Python3 world, we pack into the bytes type, which like a list of ints. -# In Python2, we pack into a string. Create a set of constants and methods -# to hide the differences from the main code. -if sys.version_info[0] >= 3: - x00 = b'\x00' - xff = b'\xff' - empty_pack = b'' - def _ord(b): - return b - - def _chr(x, y=None): - a = [x] - if y != None: - a.append(y) - return bytes(a) - - def _is_string(s): - return type(s) is str - - def _string_result(s): - return s.decode() - -else: - x00 = '\x00' - xff = '\xff' - empty_pack = '' - def _ord(b): - return ord(b) - - def _chr(x, y=None): - s = chr(x) - if y != None: - s += chr(y) - return s - - def _is_string(s): - return type(s) is unicode - - def _string_result(s): - return s - -from wiredtiger.intpacking import pack_int, unpack_int +try: + from wiredtiger.packutil import _chr, _is_string, _ord, _string_result, \ + empty_pack, x00 + from wiredtiger.intpacking import pack_int, unpack_int +except ImportError: + # When WiredTiger is installed as a package, python2 needs this + from .packutil import _chr, _is_string, _ord, _string_result, \ + empty_pack, x00 + from intpacking import pack_int, unpack_int def __get_type(fmt): if not fmt: diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger/packutil.py b/src/third_party/wiredtiger/lang/python/wiredtiger/packutil.py new file mode 100755 index 00000000000..cbff2148361 --- /dev/null +++ b/src/third_party/wiredtiger/lang/python/wiredtiger/packutil.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2019 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# WiredTiger packing and unpacking utility functions and constants + +import sys + +# In the Python3 world, we pack into the bytes type, which like a list of ints. +# In Python2, we pack into a string. Create a set of constants and methods +# to hide the differences from the main code. + +# all bits on or off, expressed as a bytes type +x00 = b'\x00' +xff = b'\xff' +x00_entry = x00[0] +xff_entry = xff[0] +empty_pack = b'' + +_python3 = (sys.version_info >= (3, 0, 0)) +if _python3: + def _ord(b): + return b + + def _chr(x, y=None): + a = [x] + if y != None: + a.append(y) + return bytes(a) + + def _is_string(s): + return type(s) is str + + def _string_result(s): + return s.decode() + +else: + def _ord(b): + return ord(b) + + def _chr(x, y=None): + s = chr(x) + if y != None: + s += chr(y) + return s + + def _is_string(s): + return type(s) is unicode + + def _string_result(s): + return s diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger/pip_init.py b/src/third_party/wiredtiger/lang/python/wiredtiger/pip_init.py index 98f15bf88f4..187a21443b7 100644..100755 --- a/src/third_party/wiredtiger/lang/python/wiredtiger/pip_init.py +++ b/src/third_party/wiredtiger/lang/python/wiredtiger/pip_init.py @@ -41,8 +41,12 @@ if fname != '__init__.py' and fname != '__init__.pyc': # After importing the SWIG-generated file, copy all symbols from from it # to this module so they will appear in the wiredtiger namespace. me = sys.modules[__name__] -sys.path.append(os.path.dirname(__file__)) # needed for Python3 -import wiredtiger -for name in dir(wiredtiger): - value = getattr(wiredtiger, name) +sys.path.append(os.path.dirname(__file__)) +try: + import wiredtiger.wiredtiger as swig_wiredtiger +except ImportError: + # for Python2 + import wiredtiger as swig_wiredtiger +for name in dir(swig_wiredtiger): + value = getattr(swig_wiredtiger, name) setattr(me, name, value) diff --git a/src/third_party/wiredtiger/src/btree/bt_handle.c b/src/third_party/wiredtiger/src/btree/bt_handle.c index 3a920a3a26b..72cb2d259bd 100644 --- a/src/third_party/wiredtiger/src/btree/bt_handle.c +++ b/src/third_party/wiredtiger/src/btree/bt_handle.c @@ -143,6 +143,12 @@ __wt_btree_open(WT_SESSION_IMPL *session, const char *op_cfg[]) /* Initialize and configure the WT_BTREE structure. */ WT_ERR(__btree_conf(session, &ckpt)); + /* + * We could be a re-open of a table that was put in the lookaside + * dropped list. Remove our id from that list. + */ + __wt_las_remove_dropped(session); + /* Connect to the underlying block manager. */ filename = dhandle->name; if (!WT_PREFIX_SKIP(filename, "file:")) diff --git a/src/third_party/wiredtiger/src/cache/cache_las.c b/src/third_party/wiredtiger/src/cache/cache_las.c index 5f6e4c198ff..0d2e9520961 100644 --- a/src/third_party/wiredtiger/src/cache/cache_las.c +++ b/src/third_party/wiredtiger/src/cache/cache_las.c @@ -877,6 +877,33 @@ __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid) } /* + * __wt_las_remove_dropped -- + * Remove an opened btree ID if it is in the dropped table. + */ +void +__wt_las_remove_dropped(WT_SESSION_IMPL *session) +{ + WT_BTREE *btree; + WT_CACHE *cache; + u_int i, j; + + btree = S2BT(session); + cache = S2C(session)->cache; + + __wt_spin_lock(session, &cache->las_sweep_lock); + for (i = 0; i < cache->las_dropped_next && + cache->las_dropped[i] != btree->id; i++) + ; + + if (i < cache->las_dropped_next) { + cache->las_dropped_next--; + for (j = i; j < cache->las_dropped_next; j++) + cache->las_dropped[j] = cache->las_dropped[j + 1]; + } + __wt_spin_unlock(session, &cache->las_sweep_lock); +} + +/* * __wt_las_save_dropped -- * Save a dropped btree ID to be swept from the lookaside table. */ @@ -953,6 +980,19 @@ __las_sweep_init(WT_SESSION_IMPL *session) goto err; } + /* + * Record the current page ID: sweep will stop after this point. + * + * Since the btree IDs we're scanning are closed, any eviction must + * have already completed, so we won't miss anything with this + * approach. + * + * Also, if a tree is reopened and there is lookaside activity before + * this sweep completes, it will have a higher page ID and should not + * be removed. + */ + cache->las_sweep_max_pageid = cache->las_pageid; + /* Scan the btree IDs to find min/max. */ cache->las_sweep_dropmin = UINT32_MAX; cache->las_sweep_dropmax = 0; @@ -1049,7 +1089,7 @@ __wt_las_sweep(WT_SESSION_IMPL *session) * table. Searching for the same key could leave us stuck at * the end of the table, repeatedly checking the same rows. */ - sweep_key->size = 0; + __wt_buf_free(session, sweep_key); } else ret = __las_sweep_init(session); if (ret != 0) @@ -1079,6 +1119,17 @@ __wt_las_sweep(WT_SESSION_IMPL *session) cnt = 0; /* + * Don't go past the end of lookaside from when sweep started. + * If a file is reopened, its ID may be reused past this point + * so the bitmap we're using is not valid. + */ + if (las_pageid > cache->las_sweep_max_pageid) { + __wt_buf_free(session, sweep_key); + ret = WT_NOTFOUND; + break; + } + + /* * We only want to break between key blocks. Stop if we've * processed enough entries either all we wanted or enough * and there is a reader waiting and we're on a key boundary. diff --git a/src/third_party/wiredtiger/src/docs/helium.dox b/src/third_party/wiredtiger/src/docs/helium.dox deleted file mode 100644 index aef57bba710..00000000000 --- a/src/third_party/wiredtiger/src/docs/helium.dox +++ /dev/null @@ -1,125 +0,0 @@ -/*! @page helium WiredTiger Helium support - -WiredTiger supports Levyx Inc., Helium Data Store volumes as a data-source. - -To configure one or more Helium volumes as WiredTiger data sources, take -the following steps. - -@section helium_build Building the WiredTiger Helium Support - -To build the Helium support, use the configuration option \c --with-helium=DIR. -For example: - -@code -% cd wiredtiger -% ls /usr/local/lib/Helium -Helium Programmer's Reference.pdf libhe.a -README.TXT libhe.so -he.h -% ./configure --with-helium=/usr/local/lib/Helium && make -@endcode - -@section helium_load Loading the WiredTiger Helium Support - -Next, add code to your application to load the Helium shared library. - -The following example loads the Helium shared library, configuring and -naming two separate Helium volumes. The first volume is named \c dev1, -the second volume is named \c dev2. Volume \c dev1 has two underlying -physical Helium devices, \c /dev/disk3s1 and \c /dev/disk4s1. Volume -\c dev2 has a single underlying physical Helium device, \c /dev/disk5s1. - -@code -#define HELIUM_LIBRARY_PATH "test/helium/.libs/libwiredtiger_helium.so"" -ret = connection->load_extension(connection, HELIUM_LIBRARY_PATH, - "config=[" - "dev1=[helium_devices=[\"he://.//dev/disk3s1,/dev/disk4s1\"]," - "helium_o_volume_truncate=1]," - "dev2=[helium_devices=[\"he://.//dev/disk5s1\"]," - "helium_o_volume_truncate=1]]"); -@endcode - -The \c helium_devices configuration string takes a WiredTiger string -which is a comma-separated list of Helium devices. (Note the quoting -required for that to be possible.) - -In this example, both Helium volumes are configured to be truncated when -first opened, and all previously existing contents discarded. - -When configuring a Helium volume, the following non-standard configuration -strings are supported: - -<table> -@hrow{String, Type, Meaning} -@row{helium_devices, list, WiredTiger URI to Helium volume mapping} -@row{helium_o_volume_truncate, boolean, HE_O_VOLUME_TRUNCATE flag} -@row{helium_read_cache, int, struct he_env read_cache value} -@row{helium_write_cache, int, struct he_env write_cache value} -</table> - -With the exception of the configuration string \c helium_devices (which -is WiredTiger specific), see the Helium documentation for details on -their use. - -@section helium_objects Creating WiredTiger objects on Helium volumes - -When creating WiredTiger objects on Helium volumes, the volume names are -used as part of the URI specified to WiredTiger methods such as -WT_SESSION::create or WT_SESSION::rename, separated from the object name -by a single slash character. - -Additionally, the \c helium \c type configuration string must be included. - -The following example creates a table named \c access on the Helium -volume \c dev1, and then opens a cursor on the table: - -@code -WT_CURSOR *cursor; -WT_SESSION *session; - -/* Create the access table. */ -ret = session->create( - session, "table:dev1/access", "key_format=S,value_format=S,type=helium"); - -/* Open a cursor on the access table. */ -ret = session->open_cursor(session, "table:dev1/access", NULL, NULL, &cursor); -@endcode - -When calling WT_SESSION::create to create an object on a Helium volume, -the following additional configuration strings are supported: - -<table> -@hrow{String, Type, Meaning} -@row{helium_o_compress, boolean, HE_I_COMPRESS flag} -@row{helium_o_truncate, boolean, HE_O_TRUNCATE flag} -</table> - -See the Helium device documentation for details on their use. - -For example, creating and truncating a table could be done as follows: - -@code -WT_SESSION *session; - -/* Create and truncate the access table. */ -ret = session->create(session, "table:dev1/access", - "key_format=S,value_format=S,type=helium,helium_o_truncate=1"); -@endcode - -@section helium_notes Helium notes - -- Helium volumes do not support database backups. -- Helium volumes do not support named checkpoints. -- Helium volumes do not support compression of any kind. -- Helium volumes do not support bulk load as a special case, and configuring -cursors for bulk load has no effect. -- Inserting a new record after the current maximum record in a fixed-length -bit field column-store (that is, a store with an 'r' type key and 't' type -value) does not implicitly create the missing records. - -@section helium_limitations Helium limitations - -- WiredTiger transactions cannot include operations on both Helium volumes -and other stores; this will be corrected in a future release. - -*/ diff --git a/src/third_party/wiredtiger/src/docs/programming.dox b/src/third_party/wiredtiger/src/docs/programming.dox index 960babfc146..722b67fbebf 100644 --- a/src/third_party/wiredtiger/src/docs/programming.dox +++ b/src/third_party/wiredtiger/src/docs/programming.dox @@ -58,7 +58,6 @@ each of which is ordered by one or more columns. - @subpage custom_extractors - @subpage custom_data_sources - @subpage custom_file_systems -- @subpage helium @m_endif <h2>Performance monitoring and tuning</h2> diff --git a/src/third_party/wiredtiger/src/docs/top/main.dox b/src/third_party/wiredtiger/src/docs/top/main.dox index d802443a9d8..4c1e4fe5d85 100644 --- a/src/third_party/wiredtiger/src/docs/top/main.dox +++ b/src/third_party/wiredtiger/src/docs/top/main.dox @@ -6,12 +6,12 @@ WiredTiger is an high performance, scalable, production quality, NoSQL, @section releases Releases <table> -@row{<b>WiredTiger 3.1.0</b> (current), +@row{<b>WiredTiger 3.2.0</b> (current), + <a href="releases/wiredtiger-3.2.0.tar.bz2"><b>[Release package]</b></a>, + <a href="3.2.0/index.html"><b>[Documentation]</b></a>} +@row{<b>WiredTiger 3.1.0</b> (previous), <a href="releases/wiredtiger-3.1.0.tar.bz2"><b>[Release package]</b></a>, <a href="3.1.0/index.html"><b>[Documentation]</b></a>} -@row{<b>WiredTiger 3.0.0</b> (previous), - <a href="releases/wiredtiger-3.0.0.tar.bz2"><b>[Release package]</b></a>, - <a href="3.0.0/index.html"><b>[Documentation]</b></a>} @row{<b>Development branch</b>, <a href="https://github.com/wiredtiger/wiredtiger"><b>[Source code]</b></a>, <a href="develop/index.html"><b>[Documentation]</b></a>} diff --git a/src/third_party/wiredtiger/src/docs/upgrading.dox b/src/third_party/wiredtiger/src/docs/upgrading.dox index dd9f8fe5be7..b4c7f5b7d2b 100644 --- a/src/third_party/wiredtiger/src/docs/upgrading.dox +++ b/src/third_party/wiredtiger/src/docs/upgrading.dox @@ -22,6 +22,12 @@ related Basho Technologies Riak, HyperLevelDB HyperDex and Facebook RocksDB compatibility layers), has been removed in the 3.2.0 release. </dd> +<dt>Helium Data Store volume support</dt> +<dd> +Support for the Levyx Inc., Helium Data Store volumes as a data-source has +been removed in the 3.2.0 release. +</dd> + <dt>WiredTiger timestamps</dt> <dd> In previous releases of WiredTiger, it was possible to disable timestamp diff --git a/src/third_party/wiredtiger/src/include/cache.h b/src/third_party/wiredtiger/src/include/cache.h index 7923e965e75..9c485b5e693 100644 --- a/src/third_party/wiredtiger/src/include/cache.h +++ b/src/third_party/wiredtiger/src/include/cache.h @@ -209,6 +209,7 @@ struct __wt_cache { uint32_t las_sweep_dropmin; /* Minimum btree ID in current set. */ uint8_t *las_sweep_dropmap; /* Bitmap of dropped btree IDs. */ uint32_t las_sweep_dropmax; /* Maximum btree ID in current set. */ + uint64_t las_sweep_max_pageid; /* Maximum page ID for sweep. */ uint32_t *las_dropped; /* List of dropped btree IDs. */ size_t las_dropped_next; /* Next index into drop list. */ diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index b036fb30616..13354d4996b 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -215,6 +215,7 @@ extern bool __wt_las_page_skip(WT_SESSION_IMPL *session, WT_REF *ref) WT_GCC_FUN extern int __wt_las_insert_block(WT_CURSOR *cursor, WT_BTREE *btree, WT_PAGE *page, WT_MULTI *multi, WT_ITEM *key) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_cursor_position(WT_CURSOR *cursor, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_remove_block(WT_SESSION_IMPL *session, uint64_t pageid) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern void __wt_las_remove_dropped(WT_SESSION_IMPL *session); extern int __wt_las_save_dropped(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_las_sweep(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern uint32_t __wt_checksum_sw(const void *chunk, size_t len); diff --git a/src/third_party/wiredtiger/src/include/optrack.h b/src/third_party/wiredtiger/src/include/optrack.h index 1144dfc2ef5..e5b97e1b5d7 100644 --- a/src/third_party/wiredtiger/src/include/optrack.h +++ b/src/third_party/wiredtiger/src/include/optrack.h @@ -8,7 +8,7 @@ #define WT_OPTRACK_MAXRECS (16384) #define WT_OPTRACK_BUFSIZE (WT_OPTRACK_MAXRECS * sizeof(WT_OPTRACK_RECORD)) -#define WT_OPTRACK_VERSION 2 +#define WT_OPTRACK_VERSION 3 /* * WT_OPTRACK_HEADER -- @@ -19,6 +19,8 @@ struct __wt_optrack_header { uint32_t optrack_version; uint32_t optrack_session_internal; uint32_t optrack_tsc_nsec_ratio; + uint32_t padding; + uint64_t optrack_seconds_epoch; }; /* diff --git a/src/third_party/wiredtiger/src/optrack/optrack.c b/src/third_party/wiredtiger/src/optrack/optrack.c index 2aeede1f4df..737293d30bf 100644 --- a/src/third_party/wiredtiger/src/optrack/optrack.c +++ b/src/third_party/wiredtiger/src/optrack/optrack.c @@ -57,11 +57,12 @@ err: WT_PANIC_MSG(session, ret, static int __optrack_open_file(WT_SESSION_IMPL *session) { + struct timespec ts; WT_CONNECTION_IMPL *conn; WT_DECL_ITEM(buf); WT_DECL_RET; WT_OPTRACK_HEADER optrack_header = { WT_OPTRACK_VERSION, 0, - (uint32_t)WT_TSC_DEFAULT_RATIO * WT_THOUSAND }; + (uint32_t)WT_TSC_DEFAULT_RATIO * WT_THOUSAND, 0,0}; conn = S2C(session); @@ -86,6 +87,10 @@ __optrack_open_file(WT_SESSION_IMPL *session) optrack_header.optrack_tsc_nsec_ratio = (uint32_t)(__wt_process.tsc_nsec_ratio * WT_THOUSAND); + /* Record the time in seconds since the Epoch. */ + __wt_epoch(session, &ts); + optrack_header.optrack_seconds_epoch = (uint64_t)ts.tv_sec; + /* Write the header into the operation-tracking file. */ WT_ERR(session->optrack_fh->handle->fh_write( session->optrack_fh->handle, (WT_SESSION *)session, diff --git a/src/third_party/wiredtiger/src/txn/txn_ckpt.c b/src/third_party/wiredtiger/src/txn/txn_ckpt.c index 580e5b1190f..143bec4e445 100644 --- a/src/third_party/wiredtiger/src/txn/txn_ckpt.c +++ b/src/third_party/wiredtiger/src/txn/txn_ckpt.c @@ -56,14 +56,13 @@ __checkpoint_name_check(WT_SESSION_IMPL *session, const char *uri) /* * This function exists as a place for this comment: named checkpoints - * are only supported on file objects, and not on LSM trees or Helium - * devices. If a target list is configured for the checkpoint, this - * function is called with each target list entry; check the entry to - * make sure it's backed by a file. If no target list is configured, - * confirm the metadata file contains no non-file objects. Skip any - * internal system objects. We don't want spurious error messages, - * other code will skip over them and the user has no control over - * their existence. + * are only supported on file objects, and not on LSM trees. If a target + * list is configured for the checkpoint, this function is called with + * each target list entry; check the entry to make sure it's backed by + * a file. If no target list is configured, confirm the metadata file + * contains no non-file objects. Skip any internal system objects. We + * don't want spurious error messages, other code will skip over them + * and the user has no control over their existence. */ if (uri == NULL) { WT_RET(__wt_metadata_cursor(session, &cursor)); @@ -234,12 +233,12 @@ __checkpoint_data_source(WT_SESSION_IMPL *session, const char *cfg[]) WT_NAMED_DATA_SOURCE *ndsrc; /* - * A place-holder, to support Helium devices: we assume calling the + * A place-holder, to support data sources: we assume calling the * underlying data-source session checkpoint function is sufficient to * checkpoint all objects in the data source, open or closed, and we * don't attempt to optimize the checkpoint of individual targets. - * Those assumptions is correct for the Helium device, but it's not - * necessarily going to be true for other data sources. + * Those assumptions are not necessarily going to be true for all + * data sources. * * It's not difficult to support data-source checkpoints of individual * targets (__wt_schema_worker is the underlying function that will do diff --git a/src/third_party/wiredtiger/test/format/backup.c b/src/third_party/wiredtiger/test/format/backup.c index fc44e7010c8..1bddb2e1a13 100644 --- a/src/third_party/wiredtiger/test/format/backup.c +++ b/src/third_party/wiredtiger/test/format/backup.c @@ -105,7 +105,7 @@ backup(void *arg) conn = g.wts_conn; /* Backups aren't supported for non-standard data sources. */ - if (DATASOURCE("helium") || DATASOURCE("kvsbdb")) + if (DATASOURCE("kvsbdb")) return (WT_THREAD_RET_VALUE); /* Open a session. */ diff --git a/src/third_party/wiredtiger/test/format/bulk.c b/src/third_party/wiredtiger/test/format/bulk.c index f794fb7a499..196cdb6b7ac 100644 --- a/src/third_party/wiredtiger/test/format/bulk.c +++ b/src/third_party/wiredtiger/test/format/bulk.c @@ -53,7 +53,7 @@ wts_load(void) * match the collation order. */ is_bulk = true; - if (DATASOURCE("kvsbdb") && DATASOURCE("helium")) + if (DATASOURCE("kvsbdb")) is_bulk = false; if (g.c_reverse) is_bulk = false; diff --git a/src/third_party/wiredtiger/test/format/compact.c b/src/third_party/wiredtiger/test/format/compact.c index da4b1fae252..95160dc1595 100644 --- a/src/third_party/wiredtiger/test/format/compact.c +++ b/src/third_party/wiredtiger/test/format/compact.c @@ -43,7 +43,7 @@ compact(void *arg) (void)(arg); /* Compaction isn't supported for all data sources. */ - if (DATASOURCE("helium") || DATASOURCE("kvsbdb")) + if (DATASOURCE("kvsbdb")) return (WT_THREAD_RET_VALUE); /* Open a session. */ diff --git a/src/third_party/wiredtiger/test/format/config.c b/src/third_party/wiredtiger/test/format/config.c index 2d8ec6ef465..f20ef7a762d 100644 --- a/src/third_party/wiredtiger/test/format/config.c +++ b/src/third_party/wiredtiger/test/format/config.c @@ -36,7 +36,6 @@ static void config_compression(const char *); static void config_encryption(void); static const char *config_file_type(u_int); static bool config_fix(void); -static void config_helium_reset(void); static void config_in_memory(void); static void config_in_memory_reset(void); static int config_is_perm(const char *); @@ -140,8 +139,6 @@ config_setup(void) */ g.uri = dmalloc(256); strcpy(g.uri, DATASOURCE("file") ? "file:" : "table:"); - if (DATASOURCE("helium")) - strcat(g.uri, "dev1/"); strcat(g.uri, WT_NAME); /* Fill in random values for the rest of the run. */ @@ -167,8 +164,6 @@ config_setup(void) } /* Required shared libraries. */ - if (DATASOURCE("helium") && access(HELIUM_PATH, R_OK) != 0) - testutil_die(errno, "Helium shared library: %s", HELIUM_PATH); if (DATASOURCE("kvsbdb") && access(KVS_BDB_PATH, R_OK) != 0) testutil_die(errno, "kvsbdb shared library: %s", KVS_BDB_PATH); @@ -196,9 +191,7 @@ config_setup(void) config_pct(); config_cache(); - /* Give Helium, in-memory and LSM configurations a final review. */ - if (DATASOURCE("helium")) - config_helium_reset(); + /* Give in-memory and LSM configurations a final review. */ if (g.c_in_memory != 0) config_in_memory_reset(); if (DATASOURCE("lsm")) @@ -466,36 +459,6 @@ config_fix(void) } /* - * config_helium_reset -- - * Helium configuration review. - */ -static void -config_helium_reset(void) -{ - /* Turn off a lot of stuff. */ - if (!config_is_perm("alter")) - config_single("alter=off", 0); - if (!config_is_perm("backups")) - config_single("backups=off", 0); - if (!config_is_perm("checkpoints")) - config_single("checkpoints=off", 0); - if (!config_is_perm("compression")) - config_single("compression=none", 0); - if (!config_is_perm("in_memory")) - config_single("in_memory=off", 0); - if (!config_is_perm("logging")) - config_single("logging=off", 0); - if (!config_is_perm("rebalance")) - config_single("rebalance=off", 0); - if (!config_is_perm("reverse")) - config_single("reverse=off", 0); - if (!config_is_perm("salvage")) - config_single("salvage=off", 0); - if (!config_is_perm("transaction_timestamps")) - config_single("transaction_timestamps=off", 0); -} - -/* * config_in_memory -- * Periodically set up an in-memory configuration. */ @@ -1005,7 +968,6 @@ config_single(const char *s, int perm) } else if (strncmp(s, "data_source", strlen("data_source")) == 0 && strncmp("file", ep, strlen("file")) != 0 && - strncmp("helium", ep, strlen("helium")) != 0 && strncmp("kvsbdb", ep, strlen("kvsbdb")) != 0 && strncmp("lsm", ep, strlen("lsm")) != 0 && strncmp("table", ep, strlen("table")) != 0) { diff --git a/src/third_party/wiredtiger/test/format/config.h b/src/third_party/wiredtiger/test/format/config.h index cd187aa3c84..1df810c0702 100644 --- a/src/third_party/wiredtiger/test/format/config.h +++ b/src/third_party/wiredtiger/test/format/config.h @@ -138,7 +138,7 @@ static CONFIG c[] = { C_BOOL, 5, 0, 0, &g.c_data_extend, NULL }, { "data_source", - "data source (file | helium | kvsbdb | lsm | table)", + "data source (file | kvsbdb | lsm | table)", C_IGNORE|C_STRING, 0, 0, 0, NULL, &g.c_data_source }, { "delete_pct", diff --git a/src/third_party/wiredtiger/test/format/format.h b/src/third_party/wiredtiger/test/format/format.h index 7e95e696550..4806f883f72 100644 --- a/src/third_party/wiredtiger/test/format/format.h +++ b/src/third_party/wiredtiger/test/format/format.h @@ -61,8 +61,6 @@ #define KVS_BDB_PATH \ EXTPATH "test/kvs_bdb/.libs/libwiredtiger_kvs_bdb.so" -#define HELIUM_PATH \ - EXTPATH "datasources/helium/.libs/libwiredtiger_helium.so" #undef M #define M(v) ((v) * WT_MILLION) /* Million */ @@ -92,8 +90,6 @@ typedef struct { char *home_salvage_copy; /* Salvage copy command */ char *home_stats; /* Statistics file path */ - char *helium_mount; /* Helium volume */ - char wiredtiger_open_config[8 * 1024]; /* Database open config */ #ifdef HAVE_BERKELEY_DB diff --git a/src/third_party/wiredtiger/test/format/ops.c b/src/third_party/wiredtiger/test/format/ops.c index 33a5cae580b..7d0cbd00b0b 100644 --- a/src/third_party/wiredtiger/test/format/ops.c +++ b/src/third_party/wiredtiger/test/format/ops.c @@ -808,7 +808,7 @@ ops(void *arg) * Skip if we are using data-sources or LSM, they don't * support reading from checkpoints. */ - if (!SINGLETHREADED && !DATASOURCE("helium") && + if (!SINGLETHREADED && !DATASOURCE("kvsbdb") && !DATASOURCE("lsm") && mmrand(&tinfo->rnd, 1, 10) == 1) { /* diff --git a/src/third_party/wiredtiger/test/format/salvage.c b/src/third_party/wiredtiger/test/format/salvage.c index 75450fffb96..b7dc9d43201 100644 --- a/src/third_party/wiredtiger/test/format/salvage.c +++ b/src/third_party/wiredtiger/test/format/salvage.c @@ -145,7 +145,7 @@ wts_salvage(void) WT_DECL_RET; /* Some data-sources don't support salvage. */ - if (DATASOURCE("helium") || DATASOURCE("kvsbdb")) + if (DATASOURCE("kvsbdb")) return; if (g.c_salvage == 0) diff --git a/src/third_party/wiredtiger/test/format/t.c b/src/third_party/wiredtiger/test/format/t.c index c6deec34fa6..7cdd9e38a2f 100644 --- a/src/third_party/wiredtiger/test/format/t.c +++ b/src/third_party/wiredtiger/test/format/t.c @@ -94,7 +94,7 @@ main(int argc, char *argv[]) home = NULL; onerun = 0; while ((ch = __wt_getopt( - progname, argc, argv, "1C:c:H:h:Llqrt:")) != EOF) + progname, argc, argv, "1C:c:h:Llqrt:")) != EOF) switch (ch) { case '1': /* One run */ onerun = 1; @@ -105,9 +105,6 @@ main(int argc, char *argv[]) case 'c': /* Configuration from a file */ config = __wt_optarg; break; - case 'H': - g.helium_mount = __wt_optarg; - break; case 'h': home = __wt_optarg; break; @@ -364,14 +361,12 @@ usage(void) { fprintf(stderr, "usage: %s [-1Llqr] [-C wiredtiger-config]\n " - "[-c config-file] [-H mount] [-h home] " - "[name=value ...]\n", + "[-c config-file] [-h home] [name=value ...]\n", progname); fprintf(stderr, "%s", "\t-1 run once\n" "\t-C specify wiredtiger_open configuration arguments\n" "\t-c read test program configuration from a file\n" - "\t-H mount Helium volume mount point\n" "\t-h home (default 'RUNDIR')\n" "\t-L output to a log file\n" "\t-l log operations (implies -L)\n" diff --git a/src/third_party/wiredtiger/test/format/util.c b/src/third_party/wiredtiger/test/format/util.c index 87acc06bc6b..6214f604378 100644 --- a/src/third_party/wiredtiger/test/format/util.c +++ b/src/third_party/wiredtiger/test/format/util.c @@ -535,8 +535,7 @@ checkpoint(void *arg) */ ckpt_config = NULL; backup_locked = false; - if (!DATASOURCE("helium") && !DATASOURCE("kvsbdb") && - !DATASOURCE("lsm")) + if (!DATASOURCE("kvsbdb") && !DATASOURCE("lsm")) switch (mmrand(NULL, 1, 20)) { case 1: /* diff --git a/src/third_party/wiredtiger/test/format/wts.c b/src/third_party/wiredtiger/test/format/wts.c index fc12c381a23..75f43c6922b 100644 --- a/src/third_party/wiredtiger/test/format/wts.c +++ b/src/third_party/wiredtiger/test/format/wts.c @@ -147,9 +147,8 @@ void wts_open(const char *home, bool set_api, WT_CONNECTION **connp) { WT_CONNECTION *conn; - WT_DECL_RET; size_t max; - char *config, *p, helium_config[1024]; + char *config, *p; *connp = NULL; @@ -300,28 +299,6 @@ wts_open(const char *home, bool set_api, WT_CONNECTION **connp) if (set_api) g.wt_api = conn->get_extension_api(conn); - /* - * Load the Helium shared library: it would be possible to do this as - * part of the extensions configured for wiredtiger_open, there's no - * difference, I am doing it here because it's easier to work with the - * configuration strings. - */ - if (DATASOURCE("helium")) { - if (g.helium_mount == NULL) - testutil_die(EINVAL, "no Helium mount point specified"); - testutil_check( - __wt_snprintf(helium_config, sizeof(helium_config), - "entry=wiredtiger_extension_init,config=[" - "helium_verbose=0," - "dev1=[helium_devices=\"he://./%s\"," - "helium_o_volume_truncate=1]]", - g.helium_mount)); - if ((ret = conn->load_extension( - conn, HELIUM_PATH, helium_config)) != 0) - testutil_die(ret, - "WT_CONNECTION.load_extension: %s:%s", - HELIUM_PATH, helium_config); - } *connp = conn; } @@ -437,11 +414,6 @@ wts_init(void) CONFIG_APPEND(p, ",split_pct=%" PRIu32, g.c_split_pct); /* Configure LSM and data-sources. */ - if (DATASOURCE("helium")) - CONFIG_APPEND(p, - ",type=helium,helium_o_compress=%d,helium_o_truncate=1", - g.c_compression_flag == COMPRESS_NONE ? 0 : 1); - if (DATASOURCE("kvsbdb")) CONFIG_APPEND(p, ",type=kvsbdb"); @@ -507,7 +479,7 @@ wts_dump(const char *tag, int dump_bdb) */ if (g.c_in_memory != 0) return; - if (DATASOURCE("helium") || DATASOURCE("kvsbdb")) + if (DATASOURCE("kvsbdb")) return; track("dump files and compare", 0ULL, NULL); @@ -586,7 +558,7 @@ wts_stats(void) return; /* Some data-sources don't support statistics. */ - if (DATASOURCE("helium") || DATASOURCE("kvsbdb")) + if (DATASOURCE("kvsbdb")) return; conn = g.wts_conn; diff --git a/src/third_party/wiredtiger/test/suite/test_compat02.py b/src/third_party/wiredtiger/test/suite/test_compat02.py index 759677f699a..bcdff19b6c1 100644 --- a/src/third_party/wiredtiger/test/suite/test_compat02.py +++ b/src/third_party/wiredtiger/test/suite/test_compat02.py @@ -59,13 +59,13 @@ class test_compat02(wttest.WiredTigerTestCase, suite_subprocess): # compat_create = [ ('def', dict(create_rel='none', log_create=3)), - ('31', dict(create_rel="3.1", log_create=3)), + ('32', dict(create_rel="3.2", log_create=3)), ('30', dict(create_rel="3.0", log_create=2)), ('26', dict(create_rel="2.6", log_create=1)), ] compat_release = [ ('def_rel', dict(rel='none', log_rel=3)), - ('31_rel', dict(rel="3.1", log_rel=3)), + ('32_rel', dict(rel="3.2", log_rel=3)), ('30_rel', dict(rel="3.0", log_rel=2)), ('26_rel', dict(rel="2.6", log_rel=1)), ('26_patch_rel', dict(rel="2.6.1", log_rel=1)), @@ -73,7 +73,7 @@ class test_compat02(wttest.WiredTigerTestCase, suite_subprocess): compat_max = [ ('future_max', dict(max_req=future_rel, log_max=future_logv)), ('def_max', dict(max_req='none', log_max=3)), - ('31_max', dict(max_req="3.1", log_max=3)), + ('32_max', dict(max_req="3.2", log_max=3)), ('30_max', dict(max_req="3.0", log_max=2)), ('26_max', dict(max_req="2.6", log_max=1)), ('26_patch_max', dict(max_req="2.6.1", log_max=1)), @@ -81,7 +81,7 @@ class test_compat02(wttest.WiredTigerTestCase, suite_subprocess): compat_min = [ ('future_min', dict(min_req=future_rel, log_min=future_logv)), ('def_min', dict(min_req='none', log_min=3)), - ('31_min', dict(min_req="3.1", log_min=3)), + ('32_min', dict(min_req="3.2", log_min=3)), ('30_min', dict(min_req="3.0", log_min=2)), ('26_min', dict(min_req="2.6", log_min=1)), ('26_patch_min', dict(min_req="2.6.1", log_min=1)), diff --git a/src/third_party/wiredtiger/test/suite/test_compat03.py b/src/third_party/wiredtiger/test/suite/test_compat03.py index de0511c5b94..064fe5ab7ed 100644 --- a/src/third_party/wiredtiger/test/suite/test_compat03.py +++ b/src/third_party/wiredtiger/test/suite/test_compat03.py @@ -60,7 +60,7 @@ class test_compat03(wttest.WiredTigerTestCase, suite_subprocess): compat_release = [ ('def_rel', dict(rel='none', log_rel=3)), ('future_rel', dict(rel=future_rel, log_rel=future_logv)), - ('31_rel', dict(rel="3.1", log_rel=3)), + ('32_rel', dict(rel="3.2", log_rel=3)), ('30_rel', dict(rel="3.0", log_rel=2)), ('26_rel', dict(rel="2.6", log_rel=1)), ('26_patch_rel', dict(rel="2.6.1", log_rel=1)), @@ -68,7 +68,7 @@ class test_compat03(wttest.WiredTigerTestCase, suite_subprocess): compat_max = [ ('def_max', dict(max_req='none', log_max=3)), ('future_max', dict(max_req=future_rel, log_max=future_logv)), - ('31_max', dict(max_req="3.1", log_max=3)), + ('32_max', dict(max_req="3.2", log_max=3)), ('30_max', dict(max_req="3.0", log_max=2)), ('26_max', dict(max_req="2.6", log_max=1)), ('26_patch_max', dict(max_req="2.6.1", log_max=1)), @@ -76,7 +76,7 @@ class test_compat03(wttest.WiredTigerTestCase, suite_subprocess): compat_min = [ ('def_min', dict(min_req='none', log_min=3)), ('future_min', dict(min_req=future_rel, log_min=future_logv)), - ('31_min', dict(min_req="3.1", log_min=3)), + ('32_min', dict(min_req="3.2", log_min=3)), ('30_min', dict(min_req="3.0", log_min=2)), ('26_min', dict(min_req="2.6", log_min=1)), ('26_patch_min', dict(min_req="2.6.1", log_min=1)), diff --git a/src/third_party/wiredtiger/tools/optrack/find-latency-spikes.py b/src/third_party/wiredtiger/tools/optrack/find-latency-spikes.py index 8a0dfb00e88..93eed8fea8b 100755 --- a/src/third_party/wiredtiger/tools/optrack/find-latency-spikes.py +++ b/src/third_party/wiredtiger/tools/optrack/find-latency-spikes.py @@ -97,6 +97,9 @@ perFuncDF = {}; perFileDataFrame = {}; perFileLargestStackDepth = {}; +# Each file has a timestamp indicating when the logging began +perFileTimeStamps = {}; + plotWidth = 1200; pixelsForTitle = 30; pixelsPerHeightUnit = 30; @@ -899,17 +902,40 @@ def dumpCleanData(fname, df): newDF.to_csv(newfname, sep=' ', index=False, header=False, columns = ['enterExit', 'function', 'timestamp']); +def checkForTimestampAndGetRowSkip(fname): + + global perFileTimeStamps; + + with open(fname) as f: + firstLine = f.readline(); + + firstLine = firstLine.strip(); + words = firstLine.split(" "); + + if (len(words) == 1): + try: + perFileTimeStamps[fname] = long(words[0]); + except ValueError: + print(color.BOLD + color.RED + + "Could not parse seconds since Epoch on first line" + + + color.END); + return 1; + else: + return 0; + def processFile(fname, dumpCleanDataBool): global perFileDataFrame; global perFuncDF; + skipRows = checkForTimestampAndGetRowSkip(fname); + rawData = pd.read_csv(fname, - header=None, delimiter=" ", - index_col=2, - names=["Event", "Function", "Timestamp"], - dtype={"Event": np.int32, "Timestamp": np.int64}, - thousands=","); + header=None, delimiter=" ", + index_col=2, + names=["Event", "Function", "Timestamp"], + dtype={"Event": np.int32, "Timestamp": np.int64}, + thousands=",", skiprows = skipRows); print(color.BOLD + color.BLUE + "Processing file " + str(fname) + color.END); diff --git a/src/third_party/wiredtiger/tools/optrack/optrack_to_t2.py b/src/third_party/wiredtiger/tools/optrack/optrack_to_t2.py new file mode 100755 index 00000000000..80d004376f2 --- /dev/null +++ b/src/third_party/wiredtiger/tools/optrack/optrack_to_t2.py @@ -0,0 +1,525 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2019 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +#!/usr/bin/env python + +import argparse +import multiprocessing +from multiprocessing import Process +import numpy as np +import os +import pandas as pd +import sys +import time + +# The time units used in the input files is nanoseconds. Presently the +# operation tracking code does not produce data using any other time +# units. +# +unitsPerSecond = 1000000000; + +# We aggregate data for intervals with the duration specified by +# the following variable. +intervalLength = 1; + +# Each file has a timestamp indicating when the logging began +perFileTimeStamps = {}; + +# Codes for various colors for printing of informational and error messages. +# +class color: + PURPLE = '\033[95m' + CYAN = '\033[96m' + DARKCYAN = '\033[36m' + BLUE = '\033[94m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + +# +# Go over all operation records in the dataframe and assign stack depths. +# +def assignStackDepths(dataframe): + + stack = []; + + df = dataframe.sort_values(by=['start']); + df = df.reset_index(drop = True); + + for i in range(len(df.index)): + + myEndTime = df.at[i, 'end']; + + # Pop all items off stack whose end time is earlier than my + # end time. They are not the callers on my stack, so I don't want to + # count them. + # + while (len(stack) > 0 and stack[-1] < myEndTime): + stack.pop(); + + df.at[i, 'stackdepth'] = len(stack); + stack.append(df.at[i, 'end']); + + return df; + +def reportDataError(logfile, logfilename): + + if (logfile is not sys.stdout): + print(color.BOLD + color.RED + "Your data may have errors. " + + "Check the file " + logfilename + " for details." + color.END); + return True; + +# +# An intervalEnd is a tuple of three items. +# item #0 is the timestamp, +# item #1 is the event type, +# item #2 is the function name. +# +def getIntervalData(intervalBeginningsStack, intervalEnd, logfile): + + errorOccurred = False; + matchFound = False; + + beginTimestamp = 0; + beginFunctionName = ""; + + endTimestamp = intervalEnd[0]; + eventType = intervalEnd[1]; + endFunctionName = intervalEnd[2]; + + if (eventType != 1): + logfile.write( + "getIntervaldata: only rows with event type 1 can be used.\n"); + logfile.write(str(intervalEnd) + "\n"); + return None; + + if (len(intervalBeginningsStack) < 1): + logfile.write("Nothing on the intervalBeginningsStack. " + + "I cannot find the beginning for this interval.\n"); + logfile.write(str(intervalEnd) + "\n"); + return None; + + while (not matchFound): + intervalBegin = intervalBeginningsStack.pop(); + if (intervalBegin is None): + logfile.write("Could not find the matching operation begin record" + + " for the following operation end record: \n"); + logfile.write(str(intervalEnd) + "\n"); + return None; + + beginTimestamp = intervalBegin[0]; + beginFunctionName = intervalBegin[2]; + if (beginFunctionName != endFunctionName): + logfile.write("Operation end record does not match the available " + + "operation begin record. " + + "Your log file may be incomplete.\n" + + "Skipping the begin record.\n"); + logfile.write("Begin: " + str(intervalBegin) + "\n"); + logfile.write("End: " + str(intervalEnd) + "\n"); + errorOccurred = True; + else: + matchFound = True; + + return beginTimestamp, endTimestamp, endFunctionName, errorOccurred; + +def createCallstackSeries(data, logfilename): + + beginIntervals = []; + dataFrame = None; + endIntervals = []; + errorReported = False; + functionNames = []; + intervalBeginningsStack = []; + largestStackDepth = 0; + logfile = None; + thisIsFirstRow = True; + + # Let's open the log file. + try: + logfile = open(logfilename, "w"); + except: + logfile = sys.stdout; + + for row in data.itertuples(): + + timestamp = row[0]; + eventType = row[1]; + function = row[2]; + + if (eventType == 0): + intervalBeginningsStack.append(row); + elif (eventType == 1): + try: + intervalBegin, intervalEnd, function, error\ + = getIntervalData(intervalBeginningsStack, row, logfile); + if (error and (not errorReported)): + errorReported = reportDataError(logfile, logfilename); + except: + if (not errorReported): + errorReported = reportDataError(logfile, logfilename); + continue; + + beginIntervals.append(intervalBegin); + endIntervals.append(intervalEnd); + functionNames.append(function); + + else: + print("Invalid event in this line:"); + print(str(timestamp) + " " + str(eventType) + " " + str(function)); + continue; + + if (len(intervalBeginningsStack) > 0): + logfile.write(str(len(intervalBeginningsStack)) + " operations had a " + + "begin record, but no matching end records. " + + "Please check that your operation tracking macros " + + "are properly inserted.\n"); + if (not errorReported): + errorReported = reportDataError(logfile, logfilename); + intervalBeginningsStack = []; + + dataDict = {}; + dataDict['start'] = beginIntervals; + dataDict['end'] = endIntervals; + dataDict['function'] = functionNames; + dataDict['stackdepth'] = [0] * len(beginIntervals); + + dataframe = pd.DataFrame(data=dataDict); + dataframe = assignStackDepths(dataframe); + + dataframe['durations'] = dataframe['end'] - dataframe['start']; + dataframe['stackdepthNext'] = dataframe['stackdepth'] + 1; + + return dataframe; + +def checkForTimestampAndGetRowSkip(fname): + + firstTimeStamp = 0; + + with open(fname) as f: + firstLine = f.readline(); + + firstLine = firstLine.strip(); + words = firstLine.split(" "); + + if (len(words) == 1): + try: + firstTimeStamp = long(words[0]); + except ValueError: + print(color.BOLD + color.RED + + "Could not parse seconds since Epoch on first line" + + color.END); + firstTimeStamp = 0; + return firstTimeStamp, 1; + else: + return firstTimeStamp, 0; + +# +# Find the session ID in the file name. The format of the input file name is +# optrack.<PID>.<session-id>-<internal/external>.txt +# +def getSessionFromFileName(fname): + + words = fname.split("."); + + if (len(words) < 4): + return 0; + + words = words[2].split("-"); + if (len(words) > 1): + try: + sid = int(words[0]); + return sid; + except: + return 0; + else: + return 0; + +def makeCSVFname(fname): + + words = fname.split("."); + + if (len(words) > 0): + words[len(words)-1] = "csv"; + + return ".".join(words); +# +# The input is the dataframe, where each record has a function name, its +# begin timestamp, its end timestamp and its stackdepth. This funciton will +# aggregate this data to determine the percentage of time we spent in each +# function in each interval. +# +def parseIntervals(df, firstTimeStamp, fname): + + global intervalLength; + global unitsPerSecond; + + # The output dataframe has a time column and then a column for + # each unique function in this file. Then there is one row + # per interval. + # + outputDict = {}; + outputDict['time'] = []; + + sessionID = getSessionFromFileName(fname); + columnNamePrefix = "#units=%;section=Session " + str(sessionID) + ";name="; + + # Get a list of all functions that we have in the input data frame. + # Each function will be a column in the output dataframe. + + allFuncs = df['function'].unique(); + for i in range (0, len(allFuncs)): + outputDict[columnNamePrefix + allFuncs[i]] = []; + + # We have two time formats. The data in the file is using fine-granular + # time units, mostly likely from the CPU's cycle counter. The output + # format will use coarse-granular time intervals in seconds. So we need + # to convert the units of the input data to seconds. + # + firstTimestampUnits = df['start'].iloc[0]; + lastTimestampUnits = df['end'].iloc[-1]; + + firstIntervalTimestampSeconds = firstTimeStamp; + lastIntervalTimestampSeconds = firstIntervalTimestampSeconds + \ + (lastTimestampUnits - firstTimestampUnits) \ + / unitsPerSecond; + + if (lastIntervalTimestampSeconds < firstIntervalTimestampSeconds): + print(color.BOLD + color.RED + + "The first timestamp in seconds is " + + str(firstIntervalTimestampSeconds) + ", but the last one " + + "appears to be smaller: " + str(lastIntervalTimestampSeconds) + + ". Skipping this file." + color.END); + return; + + currentIntervalSeconds = firstIntervalTimestampSeconds; + currentIntBeginUnits = firstTimestampUnits; + + # For each function in the current interval compute the aggregate + # duration that it executed in the current interval. + while currentIntervalSeconds <= lastIntervalTimestampSeconds: + + thisIntDict = {}; + + outputDict['time'].append(currentIntervalSeconds); + + currentIntEndUnits = currentIntBeginUnits + \ + intervalLength * unitsPerSecond; + + # Select all functions, whose begin and end time fall within the + # current interval. + # Entire function duration gets added for functions that begin and + # end during this interval. + + beginAndEndInInterval = df.loc[(df['start'] >= currentIntBeginUnits) + & (df['start'] <= currentIntEndUnits) + & (df['end'] >= currentIntBeginUnits) + & (df['end'] <= currentIntEndUnits)]; + + for index, row in beginAndEndInInterval.iterrows(): + func = row['function']; + duration = row['end'] - row['start']; + if (func not in thisIntDict): + thisIntDict[func] = duration; + else: + thisIntDict[func] += duration; + + # Select all functions, whose begin timestamp is within this + # interval, but the end timestamp is outside of it. + # Only the duration up to the end of the interval gets added + # for functions that begin during this interval, but end + # outside of it. + + beginInInterval = df.loc[(df['start'] >= currentIntBeginUnits) + & (df['start'] <= currentIntEndUnits) + & (df['end'] > currentIntEndUnits)]; + + for index, row in beginInInterval.iterrows(): + func = row['function']; + duration = currentIntEndUnits - row['start']; + if (func not in thisIntDict): + thisIntDict[func] = duration; + else: + thisIntDict[func] += duration; + + # Select all functions, whose end timestamp is within this + # interval, but the begin timestamp is in an earlier interval. + # For functions that end in the interval, but begin outside it + # we add the portion of the duration from the beginning of the + # interval and until the function end time. + + endInInterval = df.loc[(df['start'] < currentIntBeginUnits) + & (df['end'] >= currentIntBeginUnits) + & (df['end'] <= currentIntEndUnits)]; + + for index, row in endInInterval.iterrows(): + func = row['function']; + duration = row['end'] - currentIntBeginUnits; + if (func not in thisIntDict): + thisIntDict[func] = duration; + else: + thisIntDict[func] += duration; + + # Select all functions, whose begin timestamp is in an earlier + # interval and end timestamp is in a later interval. + # For functions that last during the entire interval the duration + # equivalent to the interval's length gets added. + + beginEndOutsideInterval = df.loc[(df['start'] < currentIntBeginUnits) + & (df['end'] > currentIntEndUnits)]; + + for index, row in beginEndOutsideInterval.iterrows(): + func = row['function']; + duration = intervalLength * unitsPerSecond; + if (func not in thisIntDict): + thisIntDict[func] = duration; + else: + thisIntDict[func] += duration; + + # Convert the durations to percentages and record them + # in the output dictionary + for func, duration in thisIntDict.iteritems(): + outputDictKey = columnNamePrefix + func; + percentDuration = float(duration) / \ + float(intervalLength * unitsPerSecond) * 100; + outputDict[outputDictKey].append(percentDuration); + + # In the output dictionary find all functions that did not + # execute during this interval and append zero. + # The list at each function's key should be as long as the list + # at key 'time'. + targetLen = len(outputDict['time']); + for key, theList in outputDict.iteritems(): + if len(theList) < targetLen: + theList.append(0); + + currentIntervalSeconds += intervalLength; + currentIntBeginUnits = currentIntEndUnits + 1; + + # Make the dataframe from the dictionary. Arrange the columns + # such that 'time' is first. + # + targetColumns = ['time']; + + for key, value in outputDict.iteritems(): + if key != 'time': + targetColumns.append(key); + + outputDF = pd.DataFrame(data=outputDict, columns = targetColumns); + + # Write the data to file + outputCSV = makeCSVFname(fname); + outputDF.to_csv(path_or_buf=outputCSV, index=False, header=True); + + +def processFile(fname): + + firstTimeStamp, skipRows = checkForTimestampAndGetRowSkip(fname); + + rawData = pd.read_csv(fname, + header=None, delimiter=" ", + index_col=2, + names=["Event", "Function", "Timestamp"], + dtype={"Event": np.int32, "Timestamp": np.int64}, + thousands=",", skiprows = skipRows); + + print(color.BOLD + color.BLUE + + "Processing file " + str(fname) + color.END); + + iDF = createCallstackSeries(rawData, "." + fname + ".log"); + + if not iDF.empty: + parseIntervals(iDF, firstTimeStamp, fname); + +def waitOnOneProcess(runningProcesses): + + i = 0; + success = False; + while i < len(runningProcesses): + p = runningProcesses[i]; + if (not p.is_alive()): + del runningProcesses[i]; + success = True; + else: + i+=1; + + # If we have not found a terminated process, sleep for a while + if (not success): + time.sleep(1); + +def main(): + + runnableProcesses = []; + runningProcesses = []; + + # Set up the argument parser + # + parser = argparse.ArgumentParser(description= + 'Convert operation tracking log files \ + to the csv for visualization with t2.'); + parser.add_argument('files', type=str, nargs='*', + help='log files to process'); + parser.add_argument('-j', dest='jobParallelism', type=int, + default='0'); + + args = parser.parse_args(); + + if (len(args.files) == 0): + parser.print_help(); + sys.exit(1); + + # Determine the target job parallelism + if (args.jobParallelism > 0): + targetParallelism = args.jobParallelism; + else: + targetParallelism = multiprocessing.cpu_count() * 2; + + # Process all files in parallel + for fname in args.files: + p = Process(target=processFile, + args=(fname,)); + runnableProcesses.append(p); + + while (len(runnableProcesses) > 0): + while (len(runningProcesses) < targetParallelism + and len(runnableProcesses) > 0): + + p = runnableProcesses.pop(); + p.start(); + runningProcesses.append(p); + + # Find at least one terminated process + waitOnOneProcess(runningProcesses); + + # Wait for all processes to terminate + while (len(runningProcesses) > 0): + waitOnOneProcess(runningProcesses); + +if __name__ == '__main__': + main() diff --git a/src/third_party/wiredtiger/tools/optrack/wt_optrack_decode.py b/src/third_party/wiredtiger/tools/optrack/wt_optrack_decode.py index 971f3729981..a00ab0d2c7b 100755 --- a/src/third_party/wiredtiger/tools/optrack/wt_optrack_decode.py +++ b/src/third_party/wiredtiger/tools/optrack/wt_optrack_decode.py @@ -142,20 +142,38 @@ def validateHeader(file): global currentLogVersion; bytesRead = ""; - HEADER_SIZE = 12; + MIN_HEADER_SIZE = 12; try: - bytesRead = file.read(HEADER_SIZE); + bytesRead = file.read(MIN_HEADER_SIZE); except: return False, -1; - if (len(bytesRead) < HEADER_SIZE): + if (len(bytesRead) < MIN_HEADER_SIZE): return False, -1; - version, threadType, tsc_nsec = struct.unpack('III', bytesRead); + version, threadType, tsc_nsec = struct.unpack('=III', bytesRead); + print("VERSION IS " + str(version)); - if (version == currentLogVersion): - return True, threadType, tsc_nsec; + # If the version number is 2, the header contains three fields: + # version, thread type, and clock ticks per nanosecond). + # If the version number is 3 or greater, the header also contains + # field: an 8-byte timestamp in seconds since the Epoch, as + # would be returned by a call to time() on Unix. + # + if (version == 2): + return True, threadType, tsc_nsec, 0; + elif(version >= 3): + ADDITIONAL_HEADER_SIZE = 12; + try: + bytesRead = file.read(ADDITIONAL_HEADER_SIZE); + if (len(bytesRead) < ADDITIONAL_HEADER_SIZE): + return False, -1; + + padding, sec_from_epoch = struct.unpack('=IQ', bytesRead); + return True, threadType, tsc_nsec, sec_from_epoch; + except: + return False, -1; else: return False, -1, 1; @@ -192,7 +210,8 @@ def parseFile(fileName): return; # Read and validate log header - validVersion, threadType, tsc_nsec_ratio = validateHeader(file); + validVersion, threadType, tsc_nsec_ratio, sec_from_epoch = \ + validateHeader(file); if (not validVersion): return; @@ -222,6 +241,9 @@ def parseFile(fileName): print(color.BOLD + color.PURPLE + "Writing to output file " + outputFileName + "." + color.END); + # The first line of the output file contains the seconds from Epoch + outputFile.write(str(sec_from_epoch) + "\n"); + while (not done): record = parseOneRecord(file); diff --git a/src/third_party/wiredtiger/tools/wt_ckpt_decode.py b/src/third_party/wiredtiger/tools/wt_ckpt_decode.py index e611766e3c7..65f46de9a80 100755 --- a/src/third_party/wiredtiger/tools/wt_ckpt_decode.py +++ b/src/third_party/wiredtiger/tools/wt_ckpt_decode.py @@ -81,7 +81,7 @@ def decode_arg(arg, allocsize): print(arg + ': ') if version != 1: print('**** ERROR: unknown version ' + str(version)) - addr = addr[1:] + addr = bytes(addr[1:]) result = unpack('iiiiiiiiiiiiii',addr) if len(result) != 14: print('**** ERROR: result len unexpected: ' + str(len(result))) |