summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--SConstruct63
-rw-r--r--src/mongo/SConscript6
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript76
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp87
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h59
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp769
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.h204
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp92
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp68
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp372
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h138
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp66
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp66
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp1006
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h246
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp410
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp268
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h137
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp81
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h51
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp184
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h108
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp212
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h81
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp137
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_util.h139
-rw-r--r--src/third_party/SConscript31
-rw-r--r--src/third_party/shim_wiredtiger.cpp3
28 files changed, 5138 insertions, 22 deletions
diff --git a/SConstruct b/SConstruct
index 9976740d74c..274d716eef4 100644
--- a/SConstruct
+++ b/SConstruct
@@ -230,6 +230,8 @@ add_option( "mm", "use main memory instead of memory mapped files" , 0 , True )
add_option( "ssl" , "Enable SSL" , 0 , True )
add_option( "ssl-fips-capability", "Enable the ability to activate FIPS 140-2 mode", 0, True );
add_option( "rocksdb" , "Enable RocksDB" , 0 , False )
+add_option( "wiredtiger", "Enable wiredtiger", "?", True, "wiredtiger",
+ type="choice", choices=["on", "off"], const="on", default="on")
add_option( "replication-implementation",
"Controls what implementation is used for the replication system", "?", False,
type="choice", choices=["impl", "legacy"], const="impl", default="impl" )
@@ -290,6 +292,8 @@ add_option( "use-system-tcmalloc", "use system version of tcmalloc library", 0,
add_option( "use-system-pcre", "use system version of pcre library", 0, True )
+add_option( "use-system-wiredtiger", "use system version of wiredtiger library", 0, True)
+
# library choices
boost_choices = ['1.49', '1.56']
add_option( "internal-boost", "Specify internal boost version to use", 1, True,
@@ -472,7 +476,7 @@ envDict = dict(BUILD_ROOT=buildDir,
ARCHIVE_ADDITIONS=[],
PYTHON=utils.find_python(),
SERVER_ARCHIVE='${SERVER_DIST_BASENAME}${DIST_ARCHIVE_SUFFIX}',
- tools=["default", "gch", "jsheader", "mergelib", "mongo_unittest"],
+ tools=["default", "gch", "jsheader", "mergelib", "mongo_unittest", "textfile"],
UNITTEST_ALIAS='unittests',
# TODO: Move unittests.txt to $BUILD_DIR, but that requires
# changes to MCI.
@@ -987,6 +991,7 @@ else:
env.Append( MONGO_CRYPTO=["tom"] )
env['MONGO_REPL_IMPL'] = get_option('replication-implementation')
+wiredtiger = (get_option('wiredtiger') == 'on')
try:
umask = os.umask(022)
@@ -1014,6 +1019,7 @@ env['MONGO_MODULES'] = [m.name for m in mongo_modules]
# --- check system ---
def doConfigure(myenv):
+ global wiredtiger
# Check that the compilers work.
#
@@ -1154,7 +1160,25 @@ def doConfigure(myenv):
env.Append( CPPDEFINES=[("_WIN32_WINNT", "0x" + win_version_min[0])] )
env.Append( CPPDEFINES=[("NTDDI_VERSION", "0x" + win_version_min[0] + win_version_min[1])] )
- if using_gcc() or using_clang():
+ def CheckForx86(context):
+ # See http://nadeausoftware.com/articles/2012/02/c_c_tip_how_detect_processor_type_using_compiler_predefined_macros
+ test_body = """
+ #if defined(__i386) || defined(_M_IX86)
+ /* x86 32-bit */
+ #else
+ #error not 32-bit x86
+ #endif
+ """
+ context.Message('Checking if target architecture is 32-bit x86...')
+ ret = context.TryCompile(textwrap.dedent(test_body), ".c")
+ context.Result(ret)
+ return ret
+
+ conf = Configure(myenv, help=False, custom_tests = {
+ 'CheckForx86' : CheckForx86,
+ })
+
+ if conf.CheckForx86():
# If we are using GCC or clang to target 32 or x86, set the ISA minimum to 'nocona',
# and the tuning to 'generic'. The choice of 'nocona' is selected because it
@@ -1167,27 +1191,15 @@ def doConfigure(myenv):
# contemporaries, the generic scheduling should be appropriate for a wide range of
# deployed hardware.
- def CheckForx86(context):
- # See http://nadeausoftware.com/articles/2012/02/c_c_tip_how_detect_processor_type_using_compiler_predefined_macros
- test_body = """
- #if defined(__i386) || defined(_M_IX86)
- /* x86 32-bit */
- #else
- #error not 32-bit x86
- #endif
- """
- context.Message('Checking if target architecture is 32-bit x86...')
- ret = context.TryCompile(textwrap.dedent(test_body), ".c")
- context.Result(ret)
- return ret
-
- conf = Configure(myenv, help=False, custom_tests = {
- 'CheckForx86' : CheckForx86,
- })
+ if using_gcc() or using_clang():
+ myenv.Append( CCFLAGS=['-march=nocona', '-mtune=generic'] )
- if conf.CheckForx86():
- myenv.Append( CCFLAGS=['-march=nocona', '-mtune=generic'] )
- conf.Finish()
+ # Wiredtiger only supports 64-bit architecture, and will fail to compile on 32-bit
+ # so disable WiredTiger automatically on 32-bit since wiredtiger is on by default
+ if wiredtiger == True:
+ print "WARNING: WiredTiger is not supported on 32-bit platforms, disabling support"
+ wiredtiger = False
+ conf.Finish()
# Enable PCH if we are on using gcc or clang and the 'Gch' tool is enabled. Otherwise,
# remove any pre-compiled header since the compiler may try to use it if it exists.
@@ -1887,6 +1899,12 @@ def doConfigure(myenv):
if use_system_version_of_library("yaml"):
conf.FindSysLibDep("yaml", ["yaml-cpp"])
+ if wiredtiger and use_system_version_of_library("wiredtiger"):
+ if not conf.CheckCXXHeader( "wiredtiger.h" ):
+ print( "Cannot find wiredtiger headers" )
+ Exit(1)
+ conf.FindSysLibDep("wiredtiger", ["wiredtiger"])
+
if use_system_version_of_library("boost"):
if not conf.CheckCXXHeader( "boost/filesystem/operations.hpp" ):
print( "can't find boost headers" )
@@ -2192,6 +2210,7 @@ Export("debugBuild optBuild")
Export("enforce_glibc")
Export("s3push")
Export("use_clang")
+Export("wiredtiger")
def injectMongoIncludePaths(thisEnv):
thisEnv.AppendUnique(CPPPATH=['$BUILD_DIR'])
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 7ee4c58ca91..021cf28b652 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -13,6 +13,7 @@ Import("usev8")
Import("v8suffix")
Import("enforce_glibc")
Import("darwin windows solaris linux nix")
+Import("wiredtiger")
# Boost we need everywhere. 's2' is spammed in all over the place by
# db/geo unfortunately. pcre is also used many places.
@@ -40,6 +41,7 @@ env.SConscript(['base/SConscript',
'db/storage/kv/SConscript',
'db/storage/mmap_v1/SConscript',
'db/storage/rocks/SConscript',
+ 'db/storage/wiredtiger/SConscript',
'db/SConscript',
'installer/msi/SConscript',
'logger/SConscript',
@@ -955,6 +957,10 @@ serveronlyLibdeps = ["coreshard",
if has_option("rocksdb" ):
serveronlyLibdeps.append( 'db/storage/rocks/storage_rocks' )
+if wiredtiger:
+ serveronlyLibdeps.append( 'db/storage/wiredtiger/storage_wiredtiger' )
+ serveronlyLibdeps.append( '$BUILD_DIR/third_party/shim_wiredtiger')
+
serveronlyEnv.Library("serveronly", serverOnlyFiles,
LIBDEPS=serveronlyLibdeps )
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
new file mode 100644
index 00000000000..7c6116eae4c
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -0,0 +1,76 @@
+Import("env")
+Import("wiredtiger")
+
+if wiredtiger:
+ wtEnv = env.Clone()
+ wtEnv.InjectThirdPartyIncludePaths(libraries=['wiredtiger'])
+
+ # This is the smallest possible set of files that wraps WT
+ wtEnv.Library(
+ target= 'storage_wiredtiger_core',
+ source= [
+ 'wiredtiger_index.cpp',
+ 'wiredtiger_kv_engine.cpp',
+ 'wiredtiger_record_store.cpp',
+ 'wiredtiger_recovery_unit.cpp',
+ 'wiredtiger_session_cache.cpp',
+ 'wiredtiger_size_storer.cpp',
+ 'wiredtiger_util.cpp',
+ ],
+ LIBDEPS= [
+ '$BUILD_DIR/mongo/bson',
+ '$BUILD_DIR/mongo/db/catalog/collection_options',
+ '$BUILD_DIR/mongo/db/index/index_descriptor',
+ '$BUILD_DIR/mongo/db/storage/index_entry_comparison',
+ '$BUILD_DIR/mongo/db/storage/oplog_hack',
+ '$BUILD_DIR/mongo/elapsed_tracker',
+ '$BUILD_DIR/mongo/foundation',
+ '$BUILD_DIR/mongo/processinfo',
+ '$BUILD_DIR/third_party/shim_wiredtiger',
+ '$BUILD_DIR/third_party/shim_snappy',
+ ],
+ )
+
+ wtEnv.Library(
+ target='storage_wiredtiger',
+ source=[
+ 'wiredtiger_global_options.cpp',
+ 'wiredtiger_init.cpp',
+ 'wiredtiger_options_init.cpp',
+ 'wiredtiger_server_status.cpp',
+ ],
+ LIBDEPS=['storage_wiredtiger_core',
+ '$BUILD_DIR/mongo/db/storage/kv/kv_engine',
+ ]
+ )
+
+ wtEnv.CppUnitTest(
+ target='storage_wiredtiger_record_store_test',
+ source=['wiredtiger_record_store_test.cpp',
+ ],
+ LIBDEPS=[
+ 'storage_wiredtiger_core',
+ '$BUILD_DIR/mongo/db/storage/record_store_test_harness',
+ ],
+ )
+
+ wtEnv.CppUnitTest(
+ target='storage_wiredtiger_index_test',
+ source=['wiredtiger_index_test.cpp',
+ ],
+ LIBDEPS=[
+ 'storage_wiredtiger_core',
+ '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness',
+ ],
+ )
+
+ wtEnv.CppUnitTest(
+ target='storage_wiredtiger_kv_engine_test',
+ source=['wiredtiger_kv_engine_test.cpp',
+ ],
+ LIBDEPS=[
+ 'storage_wiredtiger_core',
+ '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness',
+ ],
+ )
+
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp
new file mode 100644
index 00000000000..345bc8cbd10
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp
@@ -0,0 +1,87 @@
+// wiredtiger_global_options.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/base/status.h"
+#include "mongo/util/log.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
+
+namespace mongo {
+
+ Status WiredTigerGlobalOptions::add(moe::OptionSection* options) {
+ moe::OptionSection wiredTigerOptions("WiredTiger options");
+
+ // Add WiredTiger storage engine specific options.
+ wiredTigerOptions.addOptionChaining("storage.wiredtiger.databaseConfig",
+ "wiredTigerDatabaseConfig", moe::String, "WiredTiger database configuration settings");
+ wiredTigerOptions.addOptionChaining("storage.wiredtiger.collectionConfig",
+ "wiredTigerCollectionConfig", moe::String, "WiredTiger collection configuration settings");
+ wiredTigerOptions.addOptionChaining("storage.wiredtiger.indexConfig",
+ "wiredTigerIndexConfig", moe::String, "WiredTiger index configuration settings");
+
+ return options->addSection(wiredTigerOptions);
+ }
+
+ void WiredTigerGlobalOptions::printHelp(std::ostream* out) {
+ *out << "storage.wiredtiger.databaseConfig : " << databaseConfig << std::endl;
+ *out << "storage.wiredtiger.collectionConfig : " << collectionConfig << std::endl;
+ *out << "storage.wiredtiger.indexConfig : " << indexConfig << std::endl;
+ *out << std::flush;
+ }
+
+ bool WiredTigerGlobalOptions::handlePreValidation(const moe::Environment& params) {
+ if (params.count("help")) {
+ printHelp(&std::cout);
+ return true;
+ }
+ return true;
+ }
+
+ Status WiredTigerGlobalOptions::store(const moe::Environment& params,
+ const std::vector<std::string>& args) {
+ if (params.count("storage.wiredtiger.databaseConfig")) {
+ wiredTigerGlobalOptions.databaseConfig =
+ params["storage.wiredtiger.databaseConfig"].as<string>();
+ std::cerr << "DB option: " << wiredTigerGlobalOptions.databaseConfig << std::endl;
+ }
+ if (params.count("storage.wiredtiger.collectionConfig")) {
+ wiredTigerGlobalOptions.collectionConfig =
+ params["storage.wiredtiger.collectionConfig"].as<string>();
+ std::cerr << "Collection option: " << wiredTigerGlobalOptions.collectionConfig << std::endl;
+ }
+ if (params.count("storage.wiredtiger.indexConfig")) {
+ wiredTigerGlobalOptions.indexConfig =
+ params["storage.wiredtiger.indexConfig"].as<string>();
+ std::cerr << "Index option: " << wiredTigerGlobalOptions.indexConfig << std::endl;
+ }
+ return Status::OK();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h
new file mode 100644
index 00000000000..7709e62d6c7
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h
@@ -0,0 +1,59 @@
+// wiredtiger_global_options.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/util/options_parser/startup_option_init.h"
+#include "mongo/util/options_parser/startup_options.h"
+
+namespace mongo {
+
+ namespace moe = mongo::optionenvironment;
+
+ class WiredTigerGlobalOptions {
+ public:
+ WiredTigerGlobalOptions() {};
+
+ Status add(moe::OptionSection* options);
+ bool handlePreValidation(const moe::Environment& params);
+ Status store(const moe::Environment& params, const std::vector<std::string>& args);
+ private:
+ void printHelp(std::ostream* out);
+
+ public:
+ std::string databaseConfig;
+ std::string collectionConfig;
+ std::string indexConfig;
+ };
+
+ extern WiredTigerGlobalOptions wiredTigerGlobalOptions;
+
+}
+
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
new file mode 100644
index 00000000000..1422c8c2d5b
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -0,0 +1,769 @@
+// wiredtiger_index.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_index.h"
+
+#include <set>
+
+#include "mongo/db/json.h"
+#include "mongo/db/catalog/index_catalog_entry.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/db/storage_options.h"
+#include "mongo/util/log.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace {
+ static const int TempKeyMaxSize = 1024; // this goes away with SERVER-3372
+
+ static const WiredTigerItem emptyItem(NULL, 0);
+
+ bool hasFieldNames(const BSONObj& obj) {
+ BSONForEach(e, obj) {
+ if (e.fieldName()[0])
+ return true;
+ }
+ return false;
+ }
+
+ BSONObj stripFieldNames(const BSONObj& query) {
+ if (!hasFieldNames(query))
+ return query;
+
+ BSONObjBuilder bb;
+ BSONForEach(e, query) {
+ bb.appendAs(e, StringData());
+ }
+ return bb.obj();
+ }
+
+ /**
+ * Constructs an IndexKeyEntry from a slice containing the bytes of a BSONObject followed
+ * by the bytes of a DiskLoc
+ */
+ static IndexKeyEntry makeIndexKeyEntry(const WT_ITEM *keyCols) {
+ const char* data = reinterpret_cast<const char*>( keyCols->data );
+ BSONObj key( data );
+ if ( keyCols->size == static_cast<size_t>( key.objsize() ) ) {
+ // in unique mode
+ return IndexKeyEntry( key, DiskLoc() );
+ }
+ invariant( keyCols->size == key.objsize() + sizeof(DiskLoc) );
+ DiskLoc loc = reinterpret_cast<const DiskLoc*>( data + key.objsize() )[0];
+ return IndexKeyEntry( key, loc );
+ }
+
+ WiredTigerItem _toItem( const BSONObj& key, const DiskLoc& loc,
+ boost::scoped_array<char>*out ) {
+ size_t keyLen = key.objsize() + sizeof(DiskLoc);
+ out->reset( new char[keyLen] );
+ memcpy( out->get(), key.objdata(), key.objsize() );
+ memcpy( out->get() + key.objsize(), reinterpret_cast<const char*>(&loc), sizeof(DiskLoc) );
+
+ return WiredTigerItem( out->get(), keyLen );
+ }
+
+ DiskLoc _toDiskLoc( const WT_ITEM& item ) {
+ DiskLoc l;
+ memcpy( &l, item.data, sizeof(DiskLoc) );
+ return l;
+ }
+
+ /**
+ * Custom comparator used to compare Index Entries by BSONObj and DiskLoc
+ */
+ struct WiredTigerIndexCollator : public WT_COLLATOR {
+ public:
+ WiredTigerIndexCollator(const Ordering& order)
+ :WT_COLLATOR(), _indexComparator(order) {
+ compare = _compare;
+ terminate = _terminate;
+ }
+
+ int Compare(WT_SESSION *s, const WT_ITEM *a, const WT_ITEM *b) const {
+ const IndexKeyEntry lhs = makeIndexKeyEntry(a);
+ const IndexKeyEntry rhs = makeIndexKeyEntry(b);
+ int cmp = _indexComparator.compare( lhs, rhs );
+ if (cmp < 0)
+ cmp = -1;
+ else if (cmp > 0)
+ cmp = 1;
+ return cmp;
+ }
+
+ static int _compare(WT_COLLATOR *coll, WT_SESSION *s, const WT_ITEM *a, const WT_ITEM *b, int *cmp) {
+ WiredTigerIndexCollator *c = static_cast<WiredTigerIndexCollator *>(coll);
+ *cmp = c->Compare(s, a, b);
+ return 0;
+ }
+
+ static int _terminate(WT_COLLATOR *coll, WT_SESSION *s) {
+ WiredTigerIndexCollator *c = static_cast<WiredTigerIndexCollator *>(coll);
+ delete c;
+ return 0;
+ }
+
+ private:
+ const IndexEntryComparison _indexComparator;
+ };
+
+ extern "C" int index_collator_customize(WT_COLLATOR *coll, WT_SESSION *s, const char *uri, WT_CONFIG_ITEM *metadata, WT_COLLATOR **collp) {
+ IndexDescriptor desc(0, "unknown", fromjson(std::string(metadata->str, metadata->len)));
+ *collp = new WiredTigerIndexCollator(Ordering::make(desc.keyPattern()));
+ return 0;
+ }
+
+ extern "C" MONGO_COMPILER_API_EXPORT int index_collator_extension(WT_CONNECTION *conn, WT_CONFIG_ARG *cfg) {
+ static WT_COLLATOR idx_static;
+
+ idx_static.customize = index_collator_customize;
+ return conn->add_collator(conn, "mongo_index", &idx_static, NULL);
+ }
+
+ // taken from btree_logic.cpp
+ Status dupKeyError(const BSONObj& key) {
+ StringBuilder sb;
+ sb << "E11000 duplicate key error ";
+ sb << "dup key: " << key;
+ return Status(ErrorCodes::DuplicateKey, sb.str());
+ }
+} // namespace
+
+ int WiredTigerIndex::Create(OperationContext* txn,
+ const std::string& uri,
+ const std::string& extraConfig,
+ const IndexDescriptor* desc ) {
+ WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession()->getSession();
+
+ // Separate out a prefix and suffix in the default string. User configuration will
+ // override values in the prefix, but not values in the suffix.
+ string default_config_pfx = "type=file,leaf_page_max=16k,";
+ // Indexes need to store the metadata for collation to work as expected.
+ string default_config_sfx = ",key_format=u,value_format=u,collator=mongo_index,app_metadata=";
+
+ std::string config = default_config_pfx + extraConfig + default_config_sfx + desc->infoObj().jsonString();
+ LOG(1) << "create uri: " << uri << " config: " << config;
+ return s->create(s, uri.c_str(), config.c_str());
+ }
+
+ WiredTigerIndex::WiredTigerIndex(const std::string &uri )
+ : _uri( uri ),
+ _instanceId( WiredTigerSession::genCursorId() ) {
+ }
+
+ Status WiredTigerIndex::insert(OperationContext* txn,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed) {
+ invariant(!loc.isNull());
+ invariant(loc.isValid());
+ invariant(!hasFieldNames(key));
+
+ if ( key.objsize() >= TempKeyMaxSize ) {
+ string msg = mongoutils::str::stream()
+ << "WiredTigerIndex::insert: key too large to index, failing "
+ << ' ' << key.objsize() << ' ' << key;
+ return Status(ErrorCodes::KeyTooLong, msg);
+ }
+
+ WiredTigerCursor curwrap(_uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+
+ return _insert( c, key, loc, dupsAllowed );
+ }
+
+ void WiredTigerIndex::unindex(OperationContext* txn,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) {
+ invariant(!loc.isNull());
+ invariant(loc.isValid());
+ invariant(!hasFieldNames(key));
+
+ WiredTigerCursor curwrap(_uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ invariant( c );
+
+ _unindex( c, key, loc, dupsAllowed );
+ }
+
+ void WiredTigerIndex::fullValidate(OperationContext* txn, bool full, long long *numKeysOut,
+ BSONObjBuilder* output) const {
+ IndexCursor cursor(*this, txn, true );
+ cursor.locate( minKey, minDiskLoc );
+ long long count = 0;
+ while ( !cursor.isEOF() ) {
+ cursor.advance();
+ count++;
+ }
+ if ( numKeysOut ) {
+ *numKeysOut = count;
+ }
+
+ // Nothing further to do if 'full' validation is not requested.
+ if (!full) {
+ return;
+ }
+
+ invariant(output);
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WT_SESSION* s = session->getSession();
+ Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + uri(),
+ "statistics=(fast)", output);
+ if (!status.isOK()) {
+ output->append("error", "unable to retrieve statistics");
+ output->append("code", static_cast<int>(status.code()));
+ output->append("reason", status.reason());
+ }
+ }
+
+ Status WiredTigerIndex::dupKeyCheck( OperationContext* txn,
+ const BSONObj& key,
+ const DiskLoc& loc) {
+ invariant(!hasFieldNames(key));
+ invariant(unique());
+
+ WiredTigerCursor curwrap(_uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+
+ if ( isDup(c, key, loc) )
+ return dupKeyError(key);
+ return Status::OK();
+ }
+
+ bool WiredTigerIndex::isEmpty(OperationContext* txn) {
+ WiredTigerCursor curwrap(_uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ if (!c)
+ return true;
+ int ret = c->next(c);
+ if (ret == WT_NOTFOUND)
+ return true;
+ invariantWTOK(ret);
+ return false;
+ }
+
+ Status WiredTigerIndex::touch(OperationContext* txn) const {
+ // already in memory...
+ return Status::OK();
+ }
+
+ long long WiredTigerIndex::getSpaceUsedBytes( OperationContext* txn ) const {
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ return static_cast<long long>( WiredTigerUtil::getIdentSize( session->getSession(),
+ _uri ) );
+ }
+
+ bool WiredTigerIndex::isDup(WT_CURSOR *c, const BSONObj& key, const DiskLoc& loc ) {
+ invariant( unique() );
+ // First check whether the key exists.
+ WiredTigerItem item( key.objdata(), key.objsize() );
+ c->set_key( c, item.Get() );
+ int ret = c->search(c);
+ if ( ret == WT_NOTFOUND )
+ return false;
+ invariantWTOK( ret );
+
+ WT_ITEM value;
+ invariantWTOK( c->get_value(c,&value) );
+ DiskLoc found = _toDiskLoc( value );
+ return found != loc;
+ }
+
+ SortedDataInterface::Cursor* WiredTigerIndex::newCursor(OperationContext* txn,
+ int direction) const {
+ invariant((direction == 1) || (direction == -1));
+ return new IndexCursor(*this, txn, direction == 1);
+ }
+
+ Status WiredTigerIndex::initAsEmpty(OperationContext* txn) {
+ // No-op
+ return Status::OK();
+ }
+
+ class WiredTigerBuilderImpl : public SortedDataBuilderInterface {
+ public:
+ WiredTigerBuilderImpl(WiredTigerIndex* idx,
+ OperationContext *txn,
+ bool dupsAllowed)
+ : _idx(idx), _txn(txn), _dupsAllowed(dupsAllowed), _count(0) {
+ }
+
+ ~WiredTigerBuilderImpl() {
+ }
+
+ Status addKey(const BSONObj& key, const DiskLoc& loc) {
+ Status s = _idx->insert(_txn, key, loc, _dupsAllowed);
+ if (s.isOK())
+ _count++;
+ return s;
+ }
+
+ void commit(bool mayInterrupt) {
+ // this is bizarre, but required as part of the contract
+ WriteUnitOfWork uow( _txn );
+ uow.commit();
+ }
+
+ private:
+ WiredTigerIndex* _idx;
+ OperationContext* _txn;
+ bool _dupsAllowed;
+ unsigned long long _count;
+ };
+
+ SortedDataBuilderInterface* WiredTigerIndex::getBulkBuilder( OperationContext* txn,
+ bool dupsAllowed ) {
+ if ( !dupsAllowed ) {
+ // if we don't allow dups, we better be unique
+ invariant( unique() );
+ }
+ return new WiredTigerBuilderImpl(this, txn, dupsAllowed);
+ }
+
+
+
+ // ----------------------
+
+ WiredTigerIndex::IndexCursor::IndexCursor(const WiredTigerIndex &idx,
+ OperationContext *txn,
+ bool forward)
+ : _txn(txn),
+ _cursor(idx.uri(), idx.instanceId(), txn ),
+ _idx(idx),
+ _forward(forward),
+ _eof(true),
+ _uniqueLen( -1 ) {
+ }
+
+ bool WiredTigerIndex::IndexCursor::pointsToSamePlaceAs( const SortedDataInterface::Cursor &genother) const {
+ const WiredTigerIndex::IndexCursor &other =
+ dynamic_cast<const WiredTigerIndex::IndexCursor &>(genother);
+
+ if ( _eof && other._eof )
+ return true;
+ else if ( _eof || other._eof )
+ return false;
+
+ if ( getDiskLoc() != other.getDiskLoc() )
+ return false;
+
+ return getKey() == other.getKey();
+ }
+
+ void WiredTigerIndex::IndexCursor::aboutToDeleteBucket(const DiskLoc& bucket) {
+ invariant(!"aboutToDeleteBucket should not be called");
+ }
+
+ bool WiredTigerIndex::IndexCursor::_locate(const BSONObj &key, const DiskLoc& loc) {
+ _uniqueLen = -1;
+ WT_CURSOR *c = _cursor.get();
+
+ DiskLoc searchLoc = loc;
+ // Null cursors should start at the zero key to maintain search ordering in the
+ // collator.
+ // Reverse cursors should start on the last matching key.
+ if (loc.isNull())
+ searchLoc = _forward ? DiskLoc(0, 0) : DiskLoc(INT_MAX, INT_MAX);
+
+ boost::scoped_array<char> data;
+ WiredTigerItem myKey = _toItem( key, searchLoc, &data );
+
+ int cmp = -1;
+ c->set_key(c, myKey.Get() );
+
+
+ int ret = c->search_near(c, &cmp);
+ if ( ret == WT_NOTFOUND ) {
+ _eof = true;
+ return false;
+ }
+ invariantWTOK( ret );
+ // Make sure we land on a matching key
+ if ( _forward ? cmp < 0 : cmp > 0 )
+ ret = _forward ? c->next(c) : c->prev(c);
+
+ _eof = ret != 0;
+
+ if ( _eof ) {
+ return false;
+ }
+
+ if ( key != getKey() ) {
+ return false;
+ }
+
+ if ( !_idx.unique() ) {
+ return true;
+ }
+
+ // now we need to check if we have an array situation
+
+ if ( loc.isNull() ) {
+ // no loc specified means start and beginning or end of array as needed
+ // so nothing to do
+ return true;
+ }
+
+ // we're looking for a specific DiskLoc, lets see if we can find
+
+ WT_ITEM item;
+ invariantWTOK( c->get_value(c, &item ) );
+ _uniqueLen = item.size / sizeof(DiskLoc);
+ invariant( _uniqueLen > 0 );
+
+ if ( _forward ) {
+ _uniquePos = 0;
+ for ( ; _uniquePos < _uniqueLen; _uniquePos++ ) {
+ DiskLoc temp;
+ memcpy( &temp,
+ reinterpret_cast<const char*>(item.data) + ( _uniquePos * sizeof(DiskLoc) ),
+ sizeof(DiskLoc) );
+ if ( temp == loc )
+ break;
+
+ if ( loc < temp )
+ break;
+ }
+ }
+ else {
+ _uniquePos = _uniqueLen-1;
+ for ( ; _uniquePos >= 0; _uniquePos-- ) {
+ DiskLoc temp;
+ memcpy( &temp,
+ reinterpret_cast<const char*>(item.data) + ( _uniquePos * sizeof(DiskLoc) ),
+ sizeof(DiskLoc) );
+ if ( temp == loc )
+ break;
+
+ if ( temp < loc )
+ break;
+ }
+ _uniquePos = _uniqueLen - 1 - _uniquePos;
+ }
+
+ if ( _uniquePos == _uniqueLen ) {
+ // we need to move to next slot
+ advance();
+ }
+
+ return true;
+ }
+
+ bool WiredTigerIndex::IndexCursor::locate(const BSONObj &key, const DiskLoc& loc) {
+ const BSONObj finalKey = stripFieldNames(key);
+ bool result = _locate(finalKey, loc);
+
+ // An explicit search at the start of the range should always return false
+ if (loc == minDiskLoc || loc == maxDiskLoc )
+ return false;
+ return result;
+ }
+
+ void WiredTigerIndex::IndexCursor::advanceTo(const BSONObj &keyBegin,
+ int keyBeginLen,
+ bool afterKey,
+ const vector<const BSONElement*>& keyEnd,
+ const vector<bool>& keyEndInclusive) {
+
+ BSONObj key = IndexEntryComparison::makeQueryObject(
+ keyBegin, keyBeginLen,
+ afterKey, keyEnd, keyEndInclusive, getDirection() );
+
+ _locate(key, DiskLoc());
+ }
+
+ void WiredTigerIndex::IndexCursor::customLocate(const BSONObj& keyBegin,
+ int keyBeginLen,
+ bool afterKey,
+ const vector<const BSONElement*>& keyEnd,
+ const vector<bool>& keyEndInclusive) {
+ advanceTo(keyBegin, keyBeginLen, afterKey, keyEnd, keyEndInclusive);
+ }
+
+ BSONObj WiredTigerIndex::IndexCursor::getKey() const {
+ WT_CURSOR *c = _cursor.get();
+ WT_ITEM keyItem;
+ int ret = c->get_key(c, &keyItem);
+ invariantWTOK(ret);
+ return makeIndexKeyEntry(&keyItem).key;
+ }
+
+ DiskLoc WiredTigerIndex::IndexCursor::getDiskLoc() const {
+ if ( _eof )
+ return DiskLoc();
+
+ WT_CURSOR *c = _cursor.get();
+ WT_ITEM item;
+ if ( _idx.unique() ) {
+ invariantWTOK( c->get_value(c, &item ) );
+ if ( _uniqueLen == -1 ) {
+ // first time at this spot
+ _uniqueLen = item.size / sizeof(DiskLoc);
+ invariant( _uniqueLen > 0 );
+ _uniquePos = 0;
+ }
+
+ DiskLoc loc;
+ int posToUse = _uniquePos;
+ if ( !_forward )
+ posToUse = _uniqueLen - 1 - _uniquePos;
+
+
+
+ memcpy( &loc,
+ reinterpret_cast<const char*>(item.data) + ( posToUse * sizeof(DiskLoc) ),
+ sizeof(DiskLoc) );
+
+ invariant( posToUse >= 0 && posToUse < _uniqueLen );
+
+ return loc;
+ }
+ invariantWTOK( c->get_key(c, &item) );
+ return makeIndexKeyEntry( &item ).loc;
+ }
+
+ void WiredTigerIndex::IndexCursor::advance() {
+ // Advance on a cursor at the end is a no-op
+ if ( _eof )
+ return;
+
+ if ( _idx.unique() ) {
+ if ( _uniqueLen == -1 ) {
+ // we need to investigate
+ getDiskLoc();
+ }
+
+ _uniquePos++; // advance
+
+ if ( _uniquePos < _uniqueLen ) {
+ return;
+ }
+
+ }
+
+ _uniqueLen = -1;
+
+
+ WT_CURSOR *c = _cursor.get();
+ int ret = _forward ? c->next(c) : c->prev(c);
+ if ( ret == WT_NOTFOUND ) {
+ _eof = true;
+ return;
+ }
+ invariantWTOK(ret);
+ _eof = false;
+ }
+
+ void WiredTigerIndex::IndexCursor::savePosition() {
+ _savedForCheck = _txn->recoveryUnit();
+
+ if ( !wt_keeptxnopen() && !_eof ) {
+ _savedKey = getKey().getOwned();
+ _savedLoc = getDiskLoc();
+ _cursor.reset();
+ }
+
+ _txn = NULL;
+ }
+
+ void WiredTigerIndex::IndexCursor::restorePosition( OperationContext *txn ) {
+ // Update the session handle with our new operation context.
+ _txn = txn;
+ invariant( _savedForCheck == txn->recoveryUnit() );
+
+ if ( !wt_keeptxnopen() && !_eof ) {
+ _locate(_savedKey, _savedLoc);
+ }
+ }
+
+ // ------------------------------
+
+ WiredTigerIndexUnique::WiredTigerIndexUnique( const std::string& uri )
+ : WiredTigerIndex( uri ) {
+ }
+
+ Status WiredTigerIndexUnique::_insert( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) {
+
+ WiredTigerItem keyItem( key.objdata(), key.objsize() );
+ WiredTigerItem valueItem( &loc, sizeof(loc) );
+ c->set_key( c, keyItem.Get() );
+ c->set_value( c, valueItem.Get() );
+ int ret = c->insert( c );
+
+ if ( ret != WT_DUPLICATE_KEY )
+ return wtRCToStatus( ret );
+
+ // we're in weird mode where there might be multiple values
+ // we put them all in the "list"
+ ret = c->search(c);
+ invariantWTOK( ret );
+
+ WT_ITEM old;
+ invariantWTOK( c->get_value(c, &old ) );
+
+ std::set<DiskLoc> all;
+
+ // see if its already in the array
+ for ( size_t i = 0; i < (old.size/sizeof(DiskLoc)); i++ ) {
+ DiskLoc temp;
+ memcpy( &temp,
+ reinterpret_cast<const char*>( old.data ) + ( i * sizeof(DiskLoc) ),
+ sizeof(DiskLoc) );
+ if ( loc == temp )
+ return Status::OK();
+ all.insert( temp );
+ }
+
+ if ( !dupsAllowed ) {
+ return dupKeyError(key);
+ }
+
+ all.insert( loc );
+
+ // not in the array, add it to the back
+ size_t newSize = all.size() * sizeof(DiskLoc);
+ boost::scoped_array<char> bigger( new char[newSize] );
+
+ size_t offset = 0;
+ for ( std::set<DiskLoc>::const_iterator it = all.begin(); it != all.end(); ++it ) {
+ DiskLoc dl = *it;
+ memcpy( bigger.get() + offset, &dl, sizeof(DiskLoc) );
+ offset += sizeof(DiskLoc);
+ }
+
+ valueItem = WiredTigerItem( bigger.get(), newSize );
+ c->set_value( c, valueItem.Get() );
+ return wtRCToStatus( c->update( c ) );
+ }
+
+ void WiredTigerIndexUnique::_unindex( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) {
+ WiredTigerItem keyItem( key.objdata(), key.objsize() );
+ c->set_key( c, keyItem.Get() );
+
+ if ( !dupsAllowed ) {
+ // nice and clear
+ int ret = c->remove(c);
+ if (ret == WT_NOTFOUND) {
+ return;
+ }
+ invariantWTOK(ret);
+ return;
+ }
+
+ // ups are allowed, so we have to deal with a vector of DiskLoc
+
+ int ret = c->search(c);
+ if ( ret == WT_NOTFOUND )
+ return;
+ invariantWTOK( ret );
+
+ WT_ITEM old;
+ invariantWTOK( c->get_value(c, &old ) );
+
+ // see if its in the array
+ size_t num = old.size / sizeof(DiskLoc);
+ for ( size_t i = 0; i < num; i++ ) {
+ DiskLoc temp;
+ memcpy( &temp,
+ reinterpret_cast<const char*>( old.data ) + ( i * sizeof(DiskLoc) ),
+ sizeof(DiskLoc) );
+ if ( loc != temp )
+ continue;
+
+ // we found it, now lets re-save array without it
+ size_t newSize = old.size - sizeof(DiskLoc);
+
+ if ( newSize == 0 ) {
+ // nothing left, just delete entry
+ invariantWTOK( c->remove(c) );
+ return;
+ }
+
+ boost::scoped_array<char> smaller( new char[newSize] );
+ size_t offset = i * sizeof(DiskLoc);
+ memcpy( smaller.get(), old.data, offset );
+ memcpy( smaller.get() + offset,
+ reinterpret_cast<const char*>( old.data ) + offset + sizeof(DiskLoc),
+ old.size - sizeof(DiskLoc) - offset );
+ WiredTigerItem valueItem = WiredTigerItem( smaller.get(), newSize );
+ c->set_value( c, valueItem.Get() );
+ invariantWTOK( c->update( c ) );
+ }
+ }
+
+ // ------------------------------
+
+ WiredTigerIndexStandard::WiredTigerIndexStandard( const std::string& uri )
+ : WiredTigerIndex( uri ) {
+ }
+
+ Status WiredTigerIndexStandard::_insert( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) {
+ invariant( dupsAllowed );
+
+ boost::scoped_array<char> data;
+ WiredTigerItem item = _toItem( key, loc, &data );
+ c->set_key(c, item.Get() );
+ c->set_value(c, &emptyItem);
+ return wtRCToStatus( c->insert(c) );
+ }
+
+ void WiredTigerIndexStandard::_unindex( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) {
+ invariant( dupsAllowed );
+ boost::scoped_array<char> data;
+ WiredTigerItem item = _toItem( key, loc, &data);
+ c->set_key(c, item.Get() );
+ int ret = c->remove(c);
+ if (ret != WT_NOTFOUND) {
+ invariantWTOK(ret);
+ }
+ }
+
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h
new file mode 100644
index 00000000000..c390bb121cf
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h
@@ -0,0 +1,204 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+#include <wiredtiger.h>
+
+#include "mongo/db/storage/index_entry_comparison.h"
+#include "mongo/db/storage/sorted_data_interface.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+
+namespace mongo {
+
+ class IndexCatalogEntry;
+ class IndexDescriptor;
+ struct WiredTigerItem;
+
+ class WiredTigerIndex : public SortedDataInterface {
+ public:
+
+ static int Create(OperationContext* txn,
+ const std::string& uri,
+ const std::string& extraConfig,
+ const IndexDescriptor* desc);
+
+ /**
+ * @param unique - If this is a unique index.
+ * Note: even if unique, it may be allowed ot be non-unique at times.
+ */
+ WiredTigerIndex(const std::string &uri );
+
+ virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed);
+
+ virtual Status insert(OperationContext* txn,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed);
+
+ virtual void unindex(OperationContext* txn,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed);
+
+ virtual void fullValidate(OperationContext* txn, bool full, long long *numKeysOut,
+ BSONObjBuilder* output) const;
+
+ virtual Status dupKeyCheck(OperationContext* txn, const BSONObj& key, const DiskLoc& loc);
+
+ virtual bool isEmpty(OperationContext* txn);
+
+ virtual Status touch(OperationContext* txn) const;
+
+ virtual long long getSpaceUsedBytes( OperationContext* txn ) const;
+
+ bool isDup(WT_CURSOR *c, const BSONObj& key, const DiskLoc& loc );
+
+ virtual SortedDataInterface::Cursor* newCursor(
+ OperationContext* txn, int direction) const;
+
+ virtual Status initAsEmpty(OperationContext* txn);
+
+ const std::string& uri() const { return _uri; }
+
+ uint64_t instanceId() const { return _instanceId; }
+
+ virtual bool unique() const = 0;
+
+ protected:
+
+ virtual Status _insert( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) = 0;
+
+ virtual void _unindex( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed ) = 0;
+
+ class IndexCursor : public SortedDataInterface::Cursor {
+ public:
+ IndexCursor(const WiredTigerIndex& idx,
+ OperationContext *txn,
+ bool forward);
+
+ virtual ~IndexCursor() { }
+
+ virtual int getDirection() const { return _forward ? 1 : -1; }
+
+ virtual bool isEOF() const { return _eof; }
+
+ virtual bool pointsToSamePlaceAs(const SortedDataInterface::Cursor &genother) const;
+
+ virtual void aboutToDeleteBucket(const DiskLoc& bucket);
+
+ virtual bool locate(const BSONObj &key, const DiskLoc& loc);
+
+ virtual void customLocate(const BSONObj& keyBegin,
+ int keyBeginLen,
+ bool afterKey,
+ const vector<const BSONElement*>& keyEnd,
+ const vector<bool>& keyEndInclusive);
+
+ void advanceTo(const BSONObj &keyBegin,
+ int keyBeginLen,
+ bool afterKey,
+ const vector<const BSONElement*>& keyEnd,
+ const vector<bool>& keyEndInclusive);
+
+ virtual BSONObj getKey() const;
+
+ virtual DiskLoc getDiskLoc() const;
+
+ virtual void advance();
+
+ virtual void savePosition();
+
+ virtual void restorePosition( OperationContext *txn );
+
+ private:
+ bool _locate(const BSONObj &key, const DiskLoc& loc);
+
+ OperationContext *_txn;
+ WiredTigerCursor _cursor;
+ const WiredTigerIndex& _idx; // not owned
+ bool _forward;
+ bool _eof;
+
+ mutable int _uniquePos;
+ mutable int _uniqueLen;
+
+ // For save/restorePosition check
+ RecoveryUnit* _savedForCheck;
+ BSONObj _savedKey;
+ DiskLoc _savedLoc;
+ };
+
+ std::string _uri;
+ uint64_t _instanceId;
+ };
+
+
+ class WiredTigerIndexUnique : public WiredTigerIndex {
+ public:
+ WiredTigerIndexUnique( const std::string& uri );
+
+ virtual bool unique() const { return true; }
+
+ virtual Status _insert( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed );
+
+ virtual void _unindex( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed );
+ };
+
+ class WiredTigerIndexStandard : public WiredTigerIndex {
+ public:
+ WiredTigerIndexStandard( const std::string& uri );
+
+ virtual bool unique() const { return false; }
+
+ virtual Status _insert( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed );
+
+ virtual void _unindex( WT_CURSOR* c,
+ const BSONObj& key,
+ const DiskLoc& loc,
+ bool dupsAllowed );
+
+ };
+
+} // namespace
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp
new file mode 100644
index 00000000000..b9455ab88be
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp
@@ -0,0 +1,92 @@
+// wiredtiger_index_test.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/catalog/index_catalog_entry.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/storage/sorted_data_interface_test_harness.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_index.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+ class MyHarnessHelper : public HarnessHelper {
+ public:
+ MyHarnessHelper() : _dbpath( "wt_test" ), _conn( NULL ) {
+
+ const char* config = "create,cache_size=1G,extensions=[local=(entry=index_collator_extension)],";
+ int ret = wiredtiger_open( _dbpath.path().c_str(), NULL, config, &_conn);
+ invariantWTOK( ret );
+
+ _sessionCache = new WiredTigerSessionCache( _conn );
+ }
+
+ ~MyHarnessHelper() {
+ delete _sessionCache;
+ _conn->close(_conn, NULL);
+ }
+
+ virtual SortedDataInterface* newSortedDataInterface( bool unique ) {
+ std::string ns = "test.wt";
+ OperationContextNoop txn( newRecoveryUnit() );
+
+ BSONObj spec = BSON( "key" << BSON( "a" << 1 ) <<
+ "name" << "testIndex" <<
+ "ns" << ns );
+
+ IndexDescriptor desc( NULL, "", spec );
+
+ string uri = "table:" + ns;
+ invariantWTOK( WiredTigerIndex::Create( &txn, uri, "", &desc ) );
+
+ if ( unique )
+ return new WiredTigerIndexUnique( uri );
+ return new WiredTigerIndexStandard( uri );
+ }
+
+ virtual RecoveryUnit* newRecoveryUnit() {
+ return new WiredTigerRecoveryUnit( _sessionCache );
+ }
+
+ private:
+ unittest::TempDir _dbpath;
+ WT_CONNECTION* _conn;
+ WiredTigerSessionCache* _sessionCache;
+ };
+
+ HarnessHelper* newHarnessHelper() {
+ return new MyHarnessHelper();
+ }
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp
new file mode 100644
index 00000000000..b4681a456a4
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp
@@ -0,0 +1,68 @@
+// wiredtiger_init.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/base/init.h"
+#include "mongo/db/global_environment_d.h"
+#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/storage/kv/kv_storage_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_server_status.h"
+#include "mongo/db/storage_options.h"
+
+namespace mongo {
+
+ namespace {
+ class WiredTigerFactory : public StorageEngine::Factory {
+ public:
+ virtual ~WiredTigerFactory(){}
+ virtual StorageEngine* create( const StorageGlobalParams& params ) const {
+ WiredTigerKVEngine* kv = new WiredTigerKVEngine( params.dbpath,
+ wiredTigerGlobalOptions.databaseConfig,
+ params.dur );
+ kv->setRecordStoreExtraOptions( wiredTigerGlobalOptions.collectionConfig );
+ kv->setSortedDataInterfaceExtraOptions( wiredTigerGlobalOptions.indexConfig );
+ // Intentionally leaked.
+ new WiredTigerServerStatusSection(kv);
+ return new KVStorageEngine( kv );
+ }
+ };
+ } // namespace
+
+ MONGO_INITIALIZER_WITH_PREREQUISITES(WiredTigerEngineInit,
+ ("SetGlobalEnvironment"))
+ (InitializerContext* context ) {
+ getGlobalEnvironment()->registerStorageEngine("wiredtiger", new WiredTigerFactory() );
+ return Status::OK();
+ }
+
+}
+
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
new file mode 100644
index 00000000000..cb4ae9925af
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -0,0 +1,372 @@
+// wiredtiger_kv_engine.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+
+#include <boost/filesystem.hpp>
+#include <boost/filesystem/operations.hpp>
+
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_index.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/processinfo.h"
+
+namespace mongo {
+
+ namespace {
+ int mdb_handle_error(WT_EVENT_HANDLER *handler, WT_SESSION *session,
+ int errorCode, const char *message) {
+ error() << "WiredTiger (" << errorCode << ") " << message;
+ return 0;
+ }
+
+ int mdb_handle_message( WT_EVENT_HANDLER *handler, WT_SESSION *session,
+ const char *message) {
+ log() << "WiredTiger " << message;
+ return 0;
+ }
+
+ int mdb_handle_progress( WT_EVENT_HANDLER *handler, WT_SESSION *session,
+ const char *operation, uint64_t progress) {
+ log() << "WiredTiger progress " << operation << " " << progress;
+ return 0;
+ }
+
+ int mdb_handle_close( WT_EVENT_HANDLER *handler, WT_SESSION *session,
+ WT_CURSOR *cursor) {
+ return 0;
+ }
+
+ }
+
+ WiredTigerKVEngine::WiredTigerKVEngine( const std::string& path,
+ const std::string& extraOpenOptions,
+ bool durable )
+ : _durable( durable ),
+ _epoch( 0 ),
+ _sizeStorerSyncTracker( 100000, 60 * 1000 ) {
+
+ _eventHandler.handle_error = mdb_handle_error;
+ _eventHandler.handle_message = mdb_handle_message;
+ _eventHandler.handle_progress = mdb_handle_progress;
+ _eventHandler.handle_close = mdb_handle_close;
+
+ int cacheSizeGB = 1;
+
+ {
+ ProcessInfo pi;
+ BSONObjBuilder b;
+ pi.appendSystemDetails( b );
+ BSONObj obj = b.obj();
+ BSONObj extra = obj["extra"].Obj();
+ double pageSize = extra["pageSize"].number();
+ double numPages = extra["numPages"].number();
+ if ( pageSize > 0 && numPages > 0 ) {
+ double totalBytes = numPages * pageSize;
+ double cacheBytes = totalBytes / 10;
+ cacheSizeGB = static_cast<int>( cacheBytes / ( 1024 * 1024 * 1024 ) );
+ if ( cacheSizeGB < 1 )
+ cacheSizeGB = 1;
+ }
+ }
+
+ if ( _durable ) {
+ boost::filesystem::path journalPath = path;
+ journalPath /= "journal";
+ if ( !boost::filesystem::exists( journalPath ) ) {
+ try {
+ boost::filesystem::create_directory( journalPath );
+ }
+ catch( std::exception& e) {
+ log() << "error creating journal dir " << journalPath.string() << ' ' << e.what();
+ throw;
+ }
+ }
+ }
+
+ std::stringstream ss;
+ ss << "create,";
+ ss << "cache_size=" << cacheSizeGB << "G,";
+ ss << "session_max=20000,";
+ ss << "extensions=[local=(entry=index_collator_extension)],";
+ ss << "statistics=(all),";
+ if ( _durable ) {
+ ss << "log=(enabled=true,archive=true,path=journal),";
+ }
+ ss << "checkpoint=(wait=60,log_size=2GB),";
+ ss << extraOpenOptions;
+ string config = ss.str();
+ log() << "wiredtiger_open config: " << config;
+ invariantWTOK(wiredtiger_open(path.c_str(), &_eventHandler, config.c_str(), &_conn));
+ _sessionCache.reset( new WiredTigerSessionCache( this ) );
+
+ _sizeStorerUri = "table:sizeStorer";
+ {
+ WiredTigerSession session( _conn, -1 );
+ WiredTigerSizeStorer* ss = new WiredTigerSizeStorer();
+ ss->loadFrom( &session, _sizeStorerUri );
+ _sizeStorer.reset( ss );
+ }
+ }
+
+
+ WiredTigerKVEngine::~WiredTigerKVEngine() {
+ log() << "WiredTigerKVEngine shutting down";
+ syncSizeInfo();
+ _sizeStorer.reset( NULL );
+
+ _sessionCache.reset( NULL );
+
+ if ( _conn ) {
+ invariantWTOK( _conn->close(_conn, NULL) );
+ _conn = NULL;
+ }
+
+ }
+
+ Status WiredTigerKVEngine::okToRename( OperationContext* opCtx,
+ const StringData& fromNS,
+ const StringData& toNS,
+ const StringData& ident,
+ const RecordStore* originalRecordStore ) const {
+ _sizeStorer->store( _uri( ident ),
+ originalRecordStore->numRecords( opCtx ),
+ originalRecordStore->dataSize( opCtx ) );
+ syncSizeInfo();
+ return Status::OK();
+ }
+
+ int64_t WiredTigerKVEngine::getIdentSize( OperationContext* opCtx,
+ const StringData& ident ) {
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession();
+ return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident) );
+ }
+
+ Status WiredTigerKVEngine::repairIdent( OperationContext* opCtx,
+ const StringData& ident ) {
+ WiredTigerSession session( _conn, -1 );
+ WT_SESSION* s = session.getSession();
+ string uri = _uri(ident);
+ return wtRCToStatus( s->compact(s, uri.c_str(), NULL ) );
+ }
+
+ int WiredTigerKVEngine::flushAllFiles( bool sync ) {
+ LOG(1) << "WiredTigerKVEngine::flushAllFiles";
+ syncSizeInfo();
+
+ WiredTigerSession session( _conn, -1 );
+ WT_SESSION* s = session.getSession();
+ invariantWTOK( s->checkpoint(s, NULL ) );
+
+ return 1;
+ }
+
+ void WiredTigerKVEngine::syncSizeInfo() const {
+ if ( !_sizeStorer )
+ return;
+
+ try {
+ WiredTigerSession session( _conn, -1 );
+ WT_SESSION* s = session.getSession();
+ invariantWTOK( s->begin_transaction( s, "sync=true" ) );
+ _sizeStorer->storeInto( &session, _sizeStorerUri );
+ invariantWTOK( s->commit_transaction( s, NULL ) );
+ }
+ catch ( const WriteConflictException& de ) {
+ // ignore, it means someone else is doing it
+ }
+ }
+
+ RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() {
+ return new WiredTigerRecoveryUnit( _sessionCache.get() );
+ }
+
+ void WiredTigerKVEngine::setRecordStoreExtraOptions( const std::string& options ) {
+ _rsOptions = options;
+ }
+
+ void WiredTigerKVEngine::setSortedDataInterfaceExtraOptions( const std::string& options ) {
+ _indexOptions = options;
+ }
+
+ Status WiredTigerKVEngine::createRecordStore( OperationContext* opCtx,
+ const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options ) {
+ WiredTigerSession session( _conn, -1 );
+
+ StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(ns, options, _rsOptions);
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+ std::string config = result.getValue();
+
+ string uri = _uri( ident );
+ WT_SESSION* s = session.getSession();
+ LOG(1) << "WiredTigerKVEngine::createRecordStore uri: " << uri << " config: " << config;
+ return wtRCToStatus( s->create( s, uri.c_str(), config.c_str() ) );
+ }
+
+ RecordStore* WiredTigerKVEngine::getRecordStore( OperationContext* opCtx,
+ const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options ) {
+
+ if (options.capped) {
+ return new WiredTigerRecordStore(opCtx, ns, _uri(ident), options.capped,
+ options.cappedSize ? options.cappedSize : 4096,
+ options.cappedMaxDocs ? options.cappedMaxDocs : -1,
+ NULL,
+ _sizeStorer.get() );
+ }
+ else {
+ return new WiredTigerRecordStore(opCtx, ns, _uri(ident),
+ false, -1, -1, NULL, _sizeStorer.get() );
+ }
+ }
+
+ Status WiredTigerKVEngine::dropRecordStore( OperationContext* opCtx,
+ const StringData& ident ) {
+ _drop( ident );
+ return Status::OK();
+ }
+
+ string WiredTigerKVEngine::_uri( const StringData& ident ) const {
+ return string("table:") + ident.toString();
+ }
+
+ Status WiredTigerKVEngine::createSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc ) {
+ return wtRCToStatus( WiredTigerIndex::Create( opCtx, _uri( ident ), _indexOptions, desc ) );
+ }
+
+ SortedDataInterface* WiredTigerKVEngine::getSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc ) {
+ if ( desc->unique() )
+ return new WiredTigerIndexUnique( _uri( ident ) );
+ return new WiredTigerIndexStandard( _uri( ident ) );
+ }
+
+ Status WiredTigerKVEngine::dropSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident ) {
+ _drop( ident );
+ return Status::OK();
+ }
+
+ bool WiredTigerKVEngine::_drop( const StringData& ident ) {
+ string uri = _uri( ident );
+
+ WiredTigerSession session( _conn, -1 );
+
+ int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" );
+ LOG(1) << "WT drop of " << uri << " res " << ret;
+
+ if ( ret == 0 ) {
+ // yay, it worked
+ return true;
+ }
+
+ if ( ret == EBUSY ) {
+ // this is expected, queue it up
+ {
+ boost::mutex::scoped_lock lk( _identToDropMutex );
+ _identToDrop.insert( uri );
+ _epoch++;
+ }
+ _sessionCache->closeAll();
+ return false;
+ }
+
+ invariantWTOK( ret );
+ return false;
+ }
+
+ bool WiredTigerKVEngine::haveDropsQueued() const {
+ if ( _sizeStorerSyncTracker.intervalHasElapsed() ) {
+ _sizeStorerSyncTracker.resetLastTime();
+ syncSizeInfo();
+ }
+ boost::mutex::scoped_lock lk( _identToDropMutex );
+ return !_identToDrop.empty();
+ }
+
+ void WiredTigerKVEngine::dropAllQueued() {
+ set<string> mine;
+ {
+ boost::mutex::scoped_lock lk( _identToDropMutex );
+ mine = _identToDrop;
+ }
+
+ set<string> deleted;
+
+ {
+ WiredTigerSession session( _conn, -1 );
+ for ( set<string>::const_iterator it = mine.begin(); it != mine.end(); ++it ) {
+ string uri = *it;
+ int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" );
+ LOG(1) << "WT queued drop of " << uri << " res " << ret;
+
+ if ( ret == 0 ) {
+ deleted.insert( uri );
+ continue;
+ }
+
+ if ( ret == EBUSY ) {
+ // leave in qeuue
+ continue;
+ }
+
+ invariantWTOK( ret );
+ }
+ }
+
+ {
+ boost::mutex::scoped_lock lk( _identToDropMutex );
+ for ( set<string>::const_iterator it = deleted.begin(); it != deleted.end(); ++it ) {
+ _identToDrop.erase( *it );
+ }
+ }
+ }
+
+ bool WiredTigerKVEngine::supportsDocLocking() const {
+ return true;
+ }
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
new file mode 100644
index 00000000000..b635c121e1f
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -0,0 +1,138 @@
+// wiredtiger_kv_engine.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <set>
+#include <string>
+
+#include <boost/thread/mutex.hpp>
+
+#include <wiredtiger.h>
+
+#include "mongo/bson/ordering.h"
+#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/util/elapsed_tracker.h"
+
+namespace mongo {
+
+ class WiredTigerSessionCache;
+ class WiredTigerSizeStorer;
+
+ class WiredTigerKVEngine : public KVEngine {
+ public:
+ WiredTigerKVEngine( const std::string& path,
+ const std::string& extraOpenOptions = "",
+ bool durable = true );
+ virtual ~WiredTigerKVEngine();
+
+ void setRecordStoreExtraOptions( const std::string& options );
+ void setSortedDataInterfaceExtraOptions( const std::string& options );
+
+ virtual bool supportsDocLocking() const;
+
+ virtual bool isDurable() const { return _durable; }
+
+ virtual RecoveryUnit* newRecoveryUnit();
+
+ virtual Status createRecordStore( OperationContext* opCtx,
+ const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options );
+
+ virtual RecordStore* getRecordStore( OperationContext* opCtx,
+ const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options );
+
+ virtual Status dropRecordStore( OperationContext* opCtx,
+ const StringData& ident );
+
+ virtual Status createSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc );
+
+ virtual SortedDataInterface* getSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc );
+
+ virtual Status dropSortedDataInterface( OperationContext* opCtx,
+ const StringData& ident );
+
+ virtual Status okToRename( OperationContext* opCtx,
+ const StringData& fromNS,
+ const StringData& toNS,
+ const StringData& ident,
+ const RecordStore* originalRecordStore ) const;
+
+ virtual int flushAllFiles( bool sync );
+
+ virtual int64_t getIdentSize( OperationContext* opCtx,
+ const StringData& ident );
+
+ virtual Status repairIdent( OperationContext* opCtx,
+ const StringData& ident );
+
+ // wiredtiger specific
+
+ WT_CONNECTION* getConnection() { return _conn; }
+ void dropAllQueued();
+ bool haveDropsQueued() const;
+
+ int currentEpoch() const { return _epoch; }
+
+ void syncSizeInfo() const;
+
+ private:
+
+ string _uri( const StringData& ident ) const;
+ bool _drop( const StringData& ident );
+
+ WT_CONNECTION* _conn;
+ WT_EVENT_HANDLER _eventHandler;
+ boost::scoped_ptr<WiredTigerSessionCache> _sessionCache;
+ bool _durable;
+
+ string _rsOptions;
+ string _indexOptions;
+
+ std::set<std::string> _identToDrop;
+ mutable boost::mutex _identToDropMutex;
+
+ int _epoch; // this is how we keep track of if a session is too old
+
+ scoped_ptr<WiredTigerSizeStorer> _sizeStorer;
+ string _sizeStorerUri;
+ mutable ElapsedTracker _sizeStorerSyncTracker;
+ };
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
new file mode 100644
index 00000000000..409f0452404
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
@@ -0,0 +1,66 @@
+// wiredtiger_kv_engine_test.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/storage/kv/kv_engine_test_harness.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include "mongo/unittest/temp_dir.h"
+
+namespace mongo {
+
+ class WiredTigerKVHarnessHelper : public KVHarnessHelper {
+ public:
+ WiredTigerKVHarnessHelper()
+ : _dbpath( "wt-kv-harness" ) {
+ _engine.reset( new WiredTigerKVEngine( _dbpath.path() ) );
+ }
+
+ virtual ~WiredTigerKVHarnessHelper() {
+ _engine.reset( NULL );
+ }
+
+ virtual KVEngine* restartEngine() {
+ _engine.reset( NULL );
+ _engine.reset( new WiredTigerKVEngine( _dbpath.path() ) );
+ return _engine.get();
+ }
+
+ virtual KVEngine* getEngine() { return _engine.get(); }
+
+ private:
+ unittest::TempDir _dbpath;
+ boost::scoped_ptr<WiredTigerKVEngine> _engine;
+ };
+
+ KVHarnessHelper* KVHarnessHelper::create() {
+ return new WiredTigerKVHarnessHelper();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp
new file mode 100644
index 00000000000..459d0b6d4f7
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp
@@ -0,0 +1,66 @@
+// wiredtiger_options_init.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/util/options_parser/startup_option_init.h"
+#include "mongo/util/options_parser/startup_options.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
+
+namespace mongo {
+
+ // Interface to MongoDB option parsing.
+ WiredTigerGlobalOptions wiredTigerGlobalOptions;
+
+ MONGO_GENERAL_STARTUP_OPTIONS_REGISTER(WiredTigerOptions)(InitializerContext* context) {
+ return wiredTigerGlobalOptions.add(&moe::startupOptions);
+ }
+
+ MONGO_STARTUP_OPTIONS_VALIDATE(WiredTigerOptions)(InitializerContext* context) {
+ if (!wiredTigerGlobalOptions.handlePreValidation(moe::startupOptionsParsed)) {
+ ::_exit(EXIT_SUCCESS);
+ }
+ Status ret = moe::startupOptionsParsed.validate();
+ if (!ret.isOK()) {
+ return ret;
+ }
+ return Status::OK();
+ }
+
+ MONGO_STARTUP_OPTIONS_STORE(WiredTigerOptions)(InitializerContext* context) {
+ Status ret = wiredTigerGlobalOptions.store(moe::startupOptionsParsed, context->args());
+ if (!ret.isOK()) {
+ std::cerr << ret.toString() << std::endl;
+ std::cerr << "try '" << context->args()[0] << " --help' for more information"
+ << std::endl;
+ ::_exit(EXIT_BADOPTIONS);
+ }
+ return Status::OK();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
new file mode 100644
index 00000000000..256cb63e429
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -0,0 +1,1006 @@
+// wiredtiger_record_store.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include <wiredtiger.h>
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/oplog_hack.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+//#define RS_ITERATOR_TRACE(x) log() << "WTRS::Iterator " << x
+#define RS_ITERATOR_TRACE(x)
+
+namespace mongo {
+namespace {
+ bool shouldUseOplogHack(OperationContext* opCtx, const std::string& uri) {
+ WiredTigerCursor curwrap("metadata:", WiredTigerSession::kMetadataCursorId, opCtx);
+ WT_CURSOR* c = curwrap.get();
+ c->set_key(c, uri.c_str());
+ int ret = c->search(c);
+ if (ret == WT_NOTFOUND)
+ return false;
+ invariantWTOK(ret);
+
+ const char* config = NULL;
+ c->get_value(c, &config);
+ invariant(config);
+
+ WiredTigerConfigParser topParser(config);
+ WT_CONFIG_ITEM metadata;
+ if (topParser.get("app_metadata", &metadata) != 0)
+ return false;
+
+ if (metadata.len == 0)
+ return false;
+
+ WiredTigerConfigParser parser(metadata);
+ WT_CONFIG_ITEM keyItem;
+ WT_CONFIG_ITEM value;
+ while (parser.next(&keyItem, &value) == 0) {
+ const StringData key(keyItem.str, keyItem.len);
+ if (key == "oplogKeyExtractionVersion") {
+ if (value.type == WT_CONFIG_ITEM::WT_CONFIG_ITEM_NUM && value.val == 1)
+ return true;
+ }
+
+ // This prevents downgrades with unsupported metadata settings.
+ severe() << "Unrecognized WiredTiger metadata setting: " << key << '=' << value.str;
+ fassertFailedNoTrace(28548);
+ }
+
+ return false;
+ }
+} // namespace
+
+ StatusWith<std::string> WiredTigerRecordStore::generateCreateString(const StringData& ns,
+ const CollectionOptions& options,
+ const StringData& extraStrings) {
+ // Separate out a prefix and suffix in the default string. User configuration will
+ // override values in the prefix, but not values in the suffix.
+ str::stream ss;
+ ss << "type=file,";
+ ss << "memory_page_max=100m,";
+ ss << "block_compressor=snappy,";
+
+ ss << extraStrings << ",";
+
+ // Validate configuration object.
+ // Warn about unrecognized fields that may be introduced in newer versions of this
+ // storage engine instead of raising an error.
+ // Ensure that 'configString' field is a string. Warn if this is not the case.
+ BSONForEach(elem, options.storageEngine.getObjectField("wiredtiger")) {
+ if (elem.fieldNameStringData() == "configString") {
+ if (elem.type() != String) {
+ return StatusWith<std::string>(ErrorCodes::TypeMismatch, str::stream()
+ << "storageEngine.wiredtiger.configString must be a string. "
+ << "Not adding 'configString' value "
+ << elem << " to collection configuration");
+ continue;
+ }
+ ss << elem.valueStringData() << ",";
+ }
+ else {
+ // Return error on first unrecognized field.
+ return StatusWith<std::string>(ErrorCodes::InvalidOptions, str::stream()
+ << '\'' << elem.fieldNameStringData() << '\''
+ << " is not a supported option in storageEngine.wiredtiger");
+ }
+ }
+
+ if ( NamespaceString::oplog(ns) ) {
+ // force file for oplog
+ ss << "type=file,";
+ ss << "app_metadata=(oplogKeyExtractionVersion=1),";
+ }
+ else {
+ // Force this to be empty since users shouldn't be allowed to change it.
+ ss << "app_metadata=(),";
+ }
+
+ ss << "key_format=q,value_format=u";
+ return StatusWith<std::string>(ss);
+ }
+
+ WiredTigerRecordStore::WiredTigerRecordStore( OperationContext* ctx,
+ const StringData& ns,
+ const StringData& uri,
+ bool isCapped,
+ int64_t cappedMaxSize,
+ int64_t cappedMaxDocs,
+ CappedDocumentDeleteCallback* cappedDeleteCallback,
+ WiredTigerSizeStorer* sizeStorer )
+ : RecordStore( ns ),
+ _uri( uri.toString() ),
+ _instanceId( WiredTigerSession::genCursorId() ),
+ _isCapped( isCapped ),
+ _isOplog( NamespaceString::oplog( ns ) ),
+ _cappedMaxSize( cappedMaxSize ),
+ _cappedMaxDocs( cappedMaxDocs ),
+ _cappedDeleteCallback( cappedDeleteCallback ),
+ _useOplogHack(shouldUseOplogHack(ctx, _uri)),
+ _sizeStorer( sizeStorer ) {
+
+ if (_isCapped) {
+ invariant(_cappedMaxSize > 0);
+ invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0);
+ }
+ else {
+ invariant(_cappedMaxSize == -1);
+ invariant(_cappedMaxDocs == -1);
+ }
+
+ // Find the largest DiskLoc currently in use and estimate the number of records.
+ scoped_ptr<RecordIterator> iterator( getIterator( ctx, DiskLoc(),
+ CollectionScanParams::BACKWARD ) );
+ if ( iterator->isEOF() ) {
+ _dataSize.store(0);
+ _numRecords.store(0);
+ // Need to start at 1 so we are always higher than minDiskLoc
+ _nextIdNum.store( 1 );
+ if ( sizeStorer )
+ _sizeStorer->onCreate( this, 0, 0 );
+
+ if (_useOplogHack) {
+ _highestLocForOplogHack = minDiskLoc;
+ }
+ }
+ else {
+ if (_useOplogHack) {
+ _highestLocForOplogHack = iterator->curr();
+ }
+
+ uint64_t max = _makeKey( iterator->curr() );
+ _nextIdNum.store( 1 + max );
+
+ if ( _sizeStorer ) {
+ long long numRecords;
+ long long dataSize;
+ _sizeStorer->load( uri, &numRecords, &dataSize );
+ _numRecords.store( numRecords );
+ _dataSize.store( dataSize );
+ _sizeStorer->onCreate( this, numRecords, dataSize );
+ }
+
+ if ( _sizeStorer == NULL || _numRecords.load() < 10000 ) {
+ LOG(1) << "doing scan of collection " << ns << " to get info";
+
+ _numRecords.store(0);
+ _dataSize.store(0);
+
+ while( !iterator->isEOF() ) {
+ DiskLoc loc = iterator->getNext();
+ RecordData data = iterator->dataFor( loc );
+ _numRecords.fetchAndAdd(1);
+ _dataSize.fetchAndAdd(data.size());
+ }
+
+ if ( _sizeStorer ) {
+ _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
+ }
+ }
+
+ }
+
+ }
+
+ WiredTigerRecordStore::~WiredTigerRecordStore() {
+ LOG(1) << "~WiredTigerRecordStore for: " << ns();
+ if ( _sizeStorer ) {
+ _sizeStorer->onDestroy( this );
+ _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
+ }
+ }
+
+ long long WiredTigerRecordStore::dataSize( OperationContext *txn ) const {
+ return _dataSize.load();
+ }
+
+ long long WiredTigerRecordStore::numRecords( OperationContext *txn ) const {
+ return _numRecords.load();
+ }
+
+ bool WiredTigerRecordStore::isCapped() const {
+ return _isCapped;
+ }
+
+ int64_t WiredTigerRecordStore::cappedMaxDocs() const {
+ invariant(_isCapped);
+ return _cappedMaxDocs;
+ }
+
+ int64_t WiredTigerRecordStore::cappedMaxSize() const {
+ invariant(_isCapped);
+ return _cappedMaxSize;
+ }
+
+ int64_t WiredTigerRecordStore::storageSize( OperationContext* txn,
+ BSONObjBuilder* extraInfo,
+ int infoLevel ) const {
+
+ BSONObjBuilder b;
+ appendCustomStats( txn, &b, 1 );
+ BSONObj obj = b.obj();
+
+ BSONObj blockManager = obj["wiredtiger"].Obj()["block manager"].Obj();
+ BSONElement fileSize = blockManager["file size in bytes"];
+ invariant( fileSize.type() );
+
+ int64_t size = 0;
+ if ( fileSize.isNumber() ) {
+ size = fileSize.safeNumberLong();
+ }
+ else {
+ invariant( fileSize.type() == String );
+ size = strtoll( fileSize.valuestrsafe(), NULL, 10 );
+ }
+
+ if ( size == 0 && _isCapped ) {
+ // Many things assume anempty capped collection still takes up space.
+ return 1;
+ }
+ return size;
+ }
+
+ // Retrieve the value from a positioned cursor.
+ RecordData WiredTigerRecordStore::_getData(const WiredTigerCursor& cursor) const {
+ WT_ITEM value;
+ int ret = cursor->get_value(cursor.get(), &value);
+ invariantWTOK(ret);
+
+ boost::shared_array<char> data( new char[value.size] );
+ memcpy( data.get(), value.data, value.size );
+ return RecordData(reinterpret_cast<const char *>(data.get()), value.size, data );
+ }
+
+ RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const DiskLoc& loc) const {
+ // ownership passes to the shared_array created below
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ invariant( c );
+ c->set_key(c, _makeKey(loc));
+ int ret = c->search(c);
+ massert( 28556,
+ "Didn't find DiskLoc in WiredTigerRecordStore",
+ ret != WT_NOTFOUND );
+ invariantWTOK(ret);
+ return _getData(curwrap);
+ }
+
+ bool WiredTigerRecordStore::findRecord( OperationContext* txn,
+ const DiskLoc& loc, RecordData* out ) const {
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ invariant( c );
+ c->set_key(c, _makeKey(loc));
+ int ret = c->search(c);
+ if ( ret == WT_NOTFOUND )
+ return false;
+ invariantWTOK(ret);
+ *out = _getData(curwrap);
+ return true;
+ }
+
+ void WiredTigerRecordStore::deleteRecord( OperationContext* txn, const DiskLoc& loc ) {
+ WiredTigerCursor cursor( _uri, _instanceId, txn );
+ WT_CURSOR *c = cursor.get();
+ c->set_key(c, _makeKey(loc));
+ int ret = c->search(c);
+ invariantWTOK(ret);
+
+ WT_ITEM old_value;
+ ret = c->get_value(c, &old_value);
+ invariantWTOK(ret);
+
+ int old_length = old_value.size;
+
+ ret = c->remove(c);
+ invariantWTOK(ret);
+
+ _changeNumRecords(txn, false);
+ _increaseDataSize(txn, -old_length);
+ }
+
+ bool WiredTigerRecordStore::cappedAndNeedDelete(OperationContext* txn) const {
+ if (!_isCapped)
+ return false;
+
+ if (_dataSize.load() > _cappedMaxSize)
+ return true;
+
+ if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs))
+ return true;
+
+ return false;
+ }
+
+ namespace {
+ int oplogCounter = 0;
+ }
+
+ void WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
+
+ bool useTruncate = false;
+
+ if ( _isOplog ) {
+ if ( oplogCounter++ % 100 > 0 )
+ return;
+ }
+
+ if (!cappedAndNeedDelete(txn))
+ return;
+
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ int ret = c->next(c);
+ DiskLoc oldest;
+ while ( ret == 0 && cappedAndNeedDelete(txn) ) {
+ invariant(_numRecords.load() > 0);
+
+ uint64_t key;
+ ret = c->get_key(c, &key);
+ invariantWTOK(ret);
+ oldest = _fromKey(key);
+
+ if ( _cappedDeleteCallback ) {
+ uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest));
+ }
+
+ if ( useTruncate ) {
+ _changeNumRecords( txn, false );
+ WT_ITEM temp;
+ invariantWTOK( c->get_value( c, &temp ) );
+ _increaseDataSize( txn, temp.size );
+ }
+ else {
+ deleteRecord( txn, oldest );
+ }
+
+ ret = c->next(c);
+ }
+
+ if (ret != WT_NOTFOUND) invariantWTOK(ret);
+
+ if ( useTruncate && !oldest.isNull() ) {
+ c->reset( c );
+ c->set_key( c, _makeKey( oldest ) );
+ invariantWTOK( curwrap.getWTSession()->truncate( curwrap.getWTSession(),
+ NULL, NULL, c, "" ) );
+ }
+
+ }
+
+ StatusWith<DiskLoc> WiredTigerRecordStore::extractAndCheckLocForOplog(const char* data,
+ int len) {
+ StatusWith<DiskLoc> status = oploghack::extractKey(data, len);
+ if (!status.isOK())
+ return status;
+
+ if (status.getValue() <= _highestLocForOplogHack)
+ return StatusWith<DiskLoc>(ErrorCodes::BadValue, "ts not higher than highest");
+
+ _highestLocForOplogHack = status.getValue();
+ return status;
+ }
+
+ StatusWith<DiskLoc> WiredTigerRecordStore::insertRecord( OperationContext* txn,
+ const char* data,
+ int len,
+ bool enforceQuota ) {
+ if ( _isCapped && len > _cappedMaxSize ) {
+ return StatusWith<DiskLoc>( ErrorCodes::BadValue,
+ "object to insert exceeds cappedMaxSize" );
+ }
+
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ invariant( c );
+
+ DiskLoc loc;
+ if (_useOplogHack) {
+ StatusWith<DiskLoc> status = extractAndCheckLocForOplog(data, len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ }
+ else {
+ loc = _nextId();
+ }
+
+ c->set_key(c, _makeKey(loc));
+ WiredTigerItem value(data, len);
+ c->set_value(c, value.Get());
+ int ret = c->insert(c);
+ invariantWTOK(ret);
+
+ _changeNumRecords( txn, true );
+ _increaseDataSize( txn, len );
+
+ cappedDeleteAsNeeded(txn);
+
+ return StatusWith<DiskLoc>( loc );
+ }
+
+ StatusWith<DiskLoc> WiredTigerRecordStore::insertRecord( OperationContext* txn,
+ const DocWriter* doc,
+ bool enforceQuota ) {
+ const int len = doc->documentSize();
+
+ if ( _isCapped && len > _cappedMaxSize ) {
+ return StatusWith<DiskLoc>( ErrorCodes::BadValue,
+ "object to insert exceeds cappedMaxSize" );
+ }
+
+ boost::shared_array<char> buf( new char[len] );
+ doc->writeDocument( buf.get() );
+
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+
+ DiskLoc loc;
+ if (_useOplogHack) {
+ StatusWith<DiskLoc> status = extractAndCheckLocForOplog(buf.get(), len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ }
+ else {
+ loc = _nextId();
+ }
+
+ c->set_key(c, _makeKey(loc));
+ WiredTigerItem value(buf.get(), len);
+ c->set_value(c, value.Get());
+ int ret = c->insert(c);
+ invariantWTOK(ret);
+
+ _changeNumRecords( txn, true );
+ _increaseDataSize( txn, len );
+
+ cappedDeleteAsNeeded( txn );
+
+ return StatusWith<DiskLoc>( loc );
+ }
+
+ StatusWith<DiskLoc> WiredTigerRecordStore::updateRecord( OperationContext* txn,
+ const DiskLoc& loc,
+ const char* data,
+ int len,
+ bool enforceQuota,
+ UpdateMoveNotifier* notifier ) {
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ invariant( c );
+ c->set_key(c, _makeKey(loc));
+ int ret = c->search(c);
+ invariantWTOK(ret);
+
+ WT_ITEM old_value;
+ ret = c->get_value(c, &old_value);
+ invariantWTOK(ret);
+
+ int old_length = old_value.size;
+
+ c->set_key(c, _makeKey(loc));
+ WiredTigerItem value(data, len);
+ c->set_value(c, value.Get());
+ ret = c->update(c);
+ invariantWTOK(ret);
+
+ _increaseDataSize(txn, len - old_length);
+
+ cappedDeleteAsNeeded(txn);
+
+ return StatusWith<DiskLoc>( loc );
+ }
+
+ Status WiredTigerRecordStore::updateWithDamages( OperationContext* txn,
+ const DiskLoc& loc,
+ const RecordData& oldRec,
+ const char* damangeSource,
+ const mutablebson::DamageVector& damages ) {
+
+ // apply changes to our copy
+
+ std::string data(reinterpret_cast<const char *>(oldRec.data()), oldRec.size());
+
+ char* root = const_cast<char*>( data.c_str() );
+ for( size_t i = 0; i < damages.size(); i++ ) {
+ mutablebson::DamageEvent event = damages[i];
+ const char* sourcePtr = damangeSource + event.sourceOffset;
+ char* targetPtr = root + event.targetOffset;
+ std::memcpy(targetPtr, sourcePtr, event.size);
+ }
+
+ // write back
+
+ WiredTigerItem value(data);
+
+ WiredTigerCursor curwrap( _uri, _instanceId, txn);
+ WT_CURSOR *c = curwrap.get();
+ c->set_key(c, _makeKey(loc));
+ c->set_value(c, value.Get());
+
+ int ret = c->update(c);
+ invariantWTOK(ret);
+
+ return Status::OK();
+ }
+
+ RecordIterator* WiredTigerRecordStore::getIterator( OperationContext* txn,
+ const DiskLoc& start,
+ const CollectionScanParams::Direction& dir ) const {
+ return new Iterator(*this, txn, start, dir, false);
+ }
+
+
+ RecordIterator* WiredTigerRecordStore::getIteratorForRepair( OperationContext* txn ) const {
+ return getIterator( txn );
+ }
+
+ std::vector<RecordIterator*> WiredTigerRecordStore::getManyIterators(
+ OperationContext* txn ) const {
+ // XXX do we want this to actually return a set of iterators?
+
+ std::vector<RecordIterator*> iterators;
+ iterators.push_back( new Iterator(*this, txn, DiskLoc(),
+ CollectionScanParams::FORWARD, true) );
+
+ return iterators;
+ }
+
+ Status WiredTigerRecordStore::truncate( OperationContext* txn ) {
+ // TODO: use a WiredTiger fast truncate
+ boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
+ while( !iter->isEOF() ) {
+ DiskLoc loc = iter->getNext();
+ deleteRecord( txn, loc );
+ }
+
+ // WiredTigerRecoveryUnit* ru = _getRecoveryUnit( txn );
+
+ _highestLocForOplogHack = DiskLoc();
+
+ return Status::OK();
+ }
+
+ Status WiredTigerRecordStore::compact( OperationContext* txn,
+ RecordStoreCompactAdaptor* adaptor,
+ const CompactOptions* options,
+ CompactStats* stats ) {
+ WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(txn)->getSessionCache();
+ WiredTigerSession* session = cache->getSession();
+ WT_SESSION *s = session->getSession();
+ int ret = s->compact(s, GetURI().c_str(), NULL);
+ invariantWTOK(ret);
+ cache->releaseSession(session);
+ return Status::OK();
+ }
+
+ Status WiredTigerRecordStore::validate( OperationContext* txn,
+ bool full, bool scanData,
+ ValidateAdaptor* adaptor,
+ ValidateResults* results,
+ BSONObjBuilder* output ) const {
+
+ long long nrecords = 0;
+ boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
+ results->valid = true;
+ while( !iter->isEOF() ) {
+ ++nrecords;
+ if ( full && scanData ) {
+ size_t dataSize;
+ DiskLoc loc = iter->curr();
+ RecordData data = dataFor( txn, loc );
+ Status status = adaptor->validate( data, &dataSize );
+ if ( !status.isOK() ) {
+ results->valid = false;
+ results->errors.push_back( loc.toString() + " is corrupted" );
+ }
+ }
+ iter->getNext();
+ }
+
+ output->appendNumber( "nrecords", nrecords );
+
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WT_SESSION* s = session->getSession();
+ BSONObjBuilder bob(output->subobjStart("wiredtiger"));
+ Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + GetURI(),
+ "statistics=(fast)", &bob);
+ if (!status.isOK()) {
+ bob.append("error", "unable to retrieve statistics");
+ bob.append("code", static_cast<int>(status.code()));
+ bob.append("reason", status.reason());
+ }
+ return Status::OK();
+ }
+
+ void WiredTigerRecordStore::appendCustomStats( OperationContext* txn,
+ BSONObjBuilder* result,
+ double scale ) const {
+ result->appendBool( "capped", _isCapped );
+ if ( _isCapped ) {
+ result->appendIntOrLL( "max", _cappedMaxDocs );
+ result->appendIntOrLL( "maxSize", _cappedMaxSize );
+ }
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession();
+ WT_SESSION* s = session->getSession();
+ BSONObjBuilder bob(result->subobjStart("wiredtiger"));
+ Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + GetURI(),
+ "statistics=(fast)", &bob);
+ if (!status.isOK()) {
+ bob.append("error", "unable to retrieve statistics");
+ bob.append("code", static_cast<int>(status.code()));
+ bob.append("reason", status.reason());
+ }
+ }
+
+
+ Status WiredTigerRecordStore::touch( OperationContext* txn, BSONObjBuilder* output ) const {
+ if (output) {
+ output->append("numRanges", 1);
+ output->append("millis", 0);
+ }
+ return Status::OK();
+ }
+
+ Status WiredTigerRecordStore::setCustomOption( OperationContext* txn,
+ const BSONElement& option,
+ BSONObjBuilder* info ) {
+ string optionName = option.fieldName();
+ if ( !option.isBoolean() ) {
+ return Status( ErrorCodes::BadValue, "Invalid Value" );
+ }
+ // TODO: expose some WiredTiger configurations
+ if ( optionName == "usePowerOf2Sizes" ) {
+ return Status::OK();
+ } else
+ if ( optionName.compare( "verify_checksums" ) == 0 ) {
+ }
+ else
+ return Status( ErrorCodes::InvalidOptions, "Invalid Option" );
+
+ return Status::OK();
+ }
+
+ DiskLoc WiredTigerRecordStore::oplogStartHack(OperationContext* txn,
+ const DiskLoc& startingPosition) const {
+ if (!_useOplogHack)
+ return DiskLoc().setInvalid();
+
+ WiredTigerCursor cursor(_uri, _instanceId, txn);
+ WT_CURSOR* c = cursor.get();
+
+ int cmp;
+ c->set_key(c, _makeKey(startingPosition));
+ int ret = c->search_near(c, &cmp);
+ if (ret == 0 && cmp > 0) ret = c->prev(c); // landed one higher than startingPosition
+ if (ret == WT_NOTFOUND) return DiskLoc(); // nothing <= startingPosition
+ invariantWTOK(ret);
+
+ uint64_t key;
+ ret = c->get_key(c, &key);
+ invariantWTOK(ret);
+ return _fromKey(key);
+ }
+
+ DiskLoc WiredTigerRecordStore::_nextId() {
+ invariant(!_useOplogHack);
+ const uint64_t myId = _nextIdNum.fetchAndAdd(1);
+ int a = myId >> 32;
+ // This masks the lowest 4 bytes of myId
+ int ofs = myId & 0x00000000FFFFFFFF;
+ DiskLoc loc( a, ofs );
+ return loc;
+ }
+
+ WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit( OperationContext* txn ) {
+ return dynamic_cast<WiredTigerRecoveryUnit*>( txn->recoveryUnit() );
+ }
+
+ class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change {
+ public:
+ NumRecordsChange(WiredTigerRecordStore* rs, bool insert) :_rs(rs), _insert(insert) {}
+ virtual void commit() {}
+ virtual void rollback() {
+ _rs->_numRecords.fetchAndAdd(_insert ? -1 : 1);
+ }
+
+ private:
+ WiredTigerRecordStore* _rs;
+ bool _insert;
+ };
+
+ void WiredTigerRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) {
+ txn->recoveryUnit()->registerChange(new NumRecordsChange(this, insert));
+ if ( _numRecords.fetchAndAdd(insert ? 1 : -1) < 0 ) {
+ if ( insert )
+ _numRecords.store( 1 );
+ else
+ _numRecords.store( 0 );
+ }
+ }
+
+ class WiredTigerRecordStore::DataSizeChange : public RecoveryUnit::Change {
+ public:
+ DataSizeChange(WiredTigerRecordStore* rs, int amount) :_rs(rs), _amount(amount) {}
+ virtual void commit() {}
+ virtual void rollback() {
+ _rs->_increaseDataSize( NULL, -_amount );
+ }
+
+ private:
+ WiredTigerRecordStore* _rs;
+ bool _amount;
+ };
+
+ void WiredTigerRecordStore::_increaseDataSize( OperationContext* txn, int amount ) {
+ if ( txn )
+ txn->recoveryUnit()->registerChange(new DataSizeChange(this, amount));
+
+ if ( _dataSize.fetchAndAdd(amount) < 0 ) {
+ if ( amount > 0 ) {
+ _dataSize.store( amount );
+ }
+ else {
+ _dataSize.store( 0 );
+ }
+ }
+
+ if ( _sizeStorer && _sizeStorerCounter++ % 1000 == 0 ) {
+ _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() );
+ }
+ }
+
+ uint64_t WiredTigerRecordStore::_makeKey( const DiskLoc& loc ) {
+ return ((uint64_t)loc.a() << 32 | loc.getOfs());
+ }
+ DiskLoc WiredTigerRecordStore::_fromKey( uint64_t key ) {
+ uint32_t a = key >> 32;
+ uint32_t ofs = (uint32_t)key;
+ return DiskLoc(a, ofs);
+ }
+
+ // --------
+
+ WiredTigerRecordStore::Iterator::Iterator(
+ const WiredTigerRecordStore& rs,
+ OperationContext *txn,
+ const DiskLoc& start,
+ const CollectionScanParams::Direction& dir,
+ bool forParallelCollectionScan)
+ : _rs( rs ),
+ _txn( txn ),
+ _dir( dir ),
+ _forParallelCollectionScan( forParallelCollectionScan ),
+ _cursor( new WiredTigerCursor( rs.GetURI(), rs.instanceId(), txn ) ) {
+ RS_ITERATOR_TRACE("start");
+ _locate(start, true);
+ }
+
+ WiredTigerRecordStore::Iterator::~Iterator() {
+ }
+
+ void WiredTigerRecordStore::Iterator::_locate(const DiskLoc &loc, bool exact) {
+ RS_ITERATOR_TRACE("_locate " << loc);
+ WT_CURSOR *c = _cursor->get();
+ invariant( c );
+ int ret;
+ if (loc.isNull()) {
+ ret = _forward() ? c->next(c) : c->prev(c);
+ if (ret != WT_NOTFOUND) invariantWTOK(ret);
+ _eof = (ret == WT_NOTFOUND);
+ RS_ITERATOR_TRACE("_locate null loc eof: " << _eof);
+ return;
+ }
+ c->set_key(c, _makeKey(loc));
+ if (exact) {
+ ret = c->search(c);
+ }
+ else {
+ // If loc doesn't exist, inexact matches should find the first existing record before
+ // it, in the direction of the scan. Note that inexact callers will call _getNext()
+ // after locate so they actually return the record *after* the one we seek to.
+ int cmp;
+ ret = c->search_near(c, &cmp);
+ if ( ret == WT_NOTFOUND ) {
+ _eof = true;
+ return;
+ }
+ invariantWTOK(ret);
+ if (_forward()) {
+ // return >= loc
+ if (cmp < 0)
+ ret = c->next(c);
+ }
+ else {
+ // return <= loc
+ if (cmp > 0)
+ ret = c->prev(c);
+ }
+ }
+ if (ret != WT_NOTFOUND) invariantWTOK(ret);
+ _eof = (ret == WT_NOTFOUND);
+ RS_ITERATOR_TRACE("_locate not null loc eof: " << _eof);
+ }
+
+ bool WiredTigerRecordStore::Iterator::isEOF() {
+ RS_ITERATOR_TRACE( "isEOF " << _eof << " " << _lastLoc );
+ return _eof;
+ }
+
+ // Allow const functions to use curr to find current location.
+ DiskLoc WiredTigerRecordStore::Iterator::_curr() const {
+ RS_ITERATOR_TRACE( "_curr" );
+ if (_eof)
+ return DiskLoc();
+
+ WT_CURSOR *c = _cursor->get();
+ invariant( c );
+ uint64_t key;
+ int ret = c->get_key(c, &key);
+ invariantWTOK(ret);
+ return _fromKey(key);
+ }
+
+ DiskLoc WiredTigerRecordStore::Iterator::curr() {
+ return _curr();
+ }
+
+ void WiredTigerRecordStore::Iterator::_getNext() {
+ RS_ITERATOR_TRACE("_getNext");
+ WT_CURSOR *c = _cursor->get();
+ int ret = _forward() ? c->next(c) : c->prev(c);
+ if (ret != WT_NOTFOUND) invariantWTOK(ret);
+ _eof = (ret == WT_NOTFOUND);
+ RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof );
+ if ( !_eof ) {
+ RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof << " " << _curr() );
+ }
+ }
+
+ DiskLoc WiredTigerRecordStore::Iterator::getNext() {
+ RS_ITERATOR_TRACE( "getNext" );
+ /* Take care not to restart a scan if we have hit the end */
+ if (isEOF())
+ return DiskLoc();
+
+ /* MongoDB expects "natural" ordering - which is the order that items are inserted. */
+ DiskLoc toReturn = curr();
+ RS_ITERATOR_TRACE( "getNext toReturn: " << toReturn );
+ _getNext();
+ RS_ITERATOR_TRACE( " ----" );
+ _lastLoc = toReturn;
+ return toReturn;
+ }
+
+ void WiredTigerRecordStore::Iterator::invalidate( const DiskLoc& dl ) {
+ // this should never be called
+ }
+
+ void WiredTigerRecordStore::Iterator::saveState() {
+ RS_ITERATOR_TRACE("saveState");
+
+ // the cursor and recoveryUnit are valid on restore
+ // so we just record the recoveryUnit to make sure
+ _savedRecoveryUnit = _txn->recoveryUnit();
+ if ( !wt_keeptxnopen() ) {
+ _cursor->reset();
+ }
+
+ if ( _forParallelCollectionScan ) {
+ _cursor.reset( NULL );
+ }
+ _txn = NULL;
+ }
+
+ bool WiredTigerRecordStore::Iterator::restoreState( OperationContext *txn ) {
+
+ // This is normally already the case, but sometimes we are given a new
+ // OperationContext on restore - update the iterators context in that
+ // case
+ _txn = txn;
+
+ bool needRestore = false;
+
+ if ( _forParallelCollectionScan ) {
+ invariant( _savedRecoveryUnit != txn->recoveryUnit() );
+ // parallel collection scan or something
+ needRestore = true;
+ _savedRecoveryUnit = txn->recoveryUnit();
+ _cursor.reset( new WiredTigerCursor( _rs.GetURI(), _rs.instanceId(), txn ) );
+ _forParallelCollectionScan = false; // we only do this the first time
+ }
+
+ invariant( _savedRecoveryUnit == txn->recoveryUnit() );
+ if ( needRestore || !wt_keeptxnopen() ) {
+ DiskLoc saved = _lastLoc;
+ _locate(_lastLoc, false);
+ RS_ITERATOR_TRACE( "isEOF check " << _eof );
+ if ( _eof ) {
+ _lastLoc = DiskLoc();
+ }
+ else if ( _curr() != saved ) {
+ // old doc deleted, we're ok
+ }
+ else {
+ // we found where we left off!
+ // now we advance to the next one
+ RS_ITERATOR_TRACE( "isEOF found " << _curr() );
+ _getNext();
+ }
+ }
+
+ return true;
+ }
+
+ RecordData WiredTigerRecordStore::Iterator::dataFor( const DiskLoc& loc ) const {
+ // Retrieve the data if the iterator is already positioned at loc, otherwise
+ // open a new cursor and find the data to avoid upsetting the iterators
+ // cursor position.
+ if (loc == _curr())
+ return (_rs._getData(*_cursor));
+ else
+ return (_rs.dataFor( _txn, loc ));
+ }
+
+ bool WiredTigerRecordStore::Iterator::_forward() const {
+ return _dir == CollectionScanParams::FORWARD;
+ }
+
+ void WiredTigerRecordStore::temp_cappedTruncateAfter( OperationContext* txn,
+ DiskLoc end,
+ bool inclusive ) {
+ WriteUnitOfWork wuow(txn);
+ boost::scoped_ptr<RecordIterator> iter( getIterator( txn, end ) );
+ while( !iter->isEOF() ) {
+ DiskLoc loc = iter->getNext();
+ if ( end < loc || ( inclusive && end == loc ) ) {
+ deleteRecord( txn, loc );
+ }
+ }
+ wuow.commit();
+
+ iter.reset(getIterator(txn, DiskLoc(), CollectionScanParams::BACKWARD));
+ _highestLocForOplogHack = iter->isEOF() ? DiskLoc() : iter->curr();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
new file mode 100644
index 00000000000..6f8634e8436
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -0,0 +1,246 @@
+// wiredtiger_record_store.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/storage/record_store.h"
+#include "mongo/db/storage/capped_callback.h"
+#include "mongo/platform/atomic_word.h"
+
+namespace mongo {
+
+ class RecoveryUnit;
+ class WiredTigerCursor;
+ class WiredTigerRecoveryUnit;
+ class WiredTigerSizeStorer;
+
+ class WiredTigerRecordStore : public RecordStore {
+ public:
+
+ /**
+ * Creates a configuration string suitable for 'config' parameter in WT_SESSION::create().
+ * Configuration string is constructed from:
+ * built-in defaults
+ * storageEngine.wiredtiger.configString in 'options'
+ * 'extraStrings'
+ * Performs simple validation on the supplied parameters.
+ * Returns error status if validation fails.
+ * Note that even if this function returns an OK status, WT_SESSION:create() may still
+ * fail with the constructed configuration string.
+ */
+ static StatusWith<std::string> generateCreateString(const StringData& ns,
+ const CollectionOptions &options,
+ const StringData& extraStrings);
+
+ WiredTigerRecordStore(OperationContext* txn,
+ const StringData& ns,
+ const StringData& uri,
+ bool isCapped = false,
+ int64_t cappedMaxSize = -1,
+ int64_t cappedMaxDocs = -1,
+ CappedDocumentDeleteCallback* cappedDeleteCallback = NULL,
+ WiredTigerSizeStorer* sizeStorer = NULL );
+
+ virtual ~WiredTigerRecordStore();
+
+ // name of the RecordStore implementation
+ virtual const char* name() const { return "wiredtiger"; }
+
+ virtual long long dataSize( OperationContext *txn ) const;
+
+ virtual long long numRecords( OperationContext* txn ) const;
+
+ virtual bool isCapped() const;
+
+ virtual int64_t storageSize( OperationContext* txn,
+ BSONObjBuilder* extraInfo = NULL,
+ int infoLevel = 0 ) const;
+
+ // CRUD related
+
+ virtual RecordData dataFor( OperationContext* txn, const DiskLoc& loc ) const;
+
+ virtual bool findRecord( OperationContext* txn, const DiskLoc& loc, RecordData* out ) const;
+
+ virtual void deleteRecord( OperationContext* txn, const DiskLoc& dl );
+
+ virtual StatusWith<DiskLoc> insertRecord( OperationContext* txn,
+ const char* data,
+ int len,
+ bool enforceQuota );
+
+ virtual StatusWith<DiskLoc> insertRecord( OperationContext* txn,
+ const DocWriter* doc,
+ bool enforceQuota );
+
+ virtual StatusWith<DiskLoc> updateRecord( OperationContext* txn,
+ const DiskLoc& oldLocation,
+ const char* data,
+ int len,
+ bool enforceQuota,
+ UpdateMoveNotifier* notifier );
+
+ virtual Status updateWithDamages( OperationContext* txn,
+ const DiskLoc& loc,
+ const RecordData& oldRec,
+ const char* damangeSource,
+ const mutablebson::DamageVector& damages );
+
+ virtual RecordIterator* getIterator( OperationContext* txn,
+ const DiskLoc& start = DiskLoc(),
+ const CollectionScanParams::Direction& dir =
+ CollectionScanParams::FORWARD ) const;
+
+ virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const;
+
+ virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const;
+
+ virtual Status truncate( OperationContext* txn );
+
+ virtual bool compactSupported() const { return true; }
+
+ virtual Status compact( OperationContext* txn,
+ RecordStoreCompactAdaptor* adaptor,
+ const CompactOptions* options,
+ CompactStats* stats );
+
+ virtual Status validate( OperationContext* txn,
+ bool full, bool scanData,
+ ValidateAdaptor* adaptor,
+ ValidateResults* results, BSONObjBuilder* output ) const;
+
+ virtual void appendCustomStats( OperationContext* txn,
+ BSONObjBuilder* result,
+ double scale ) const;
+
+ virtual Status touch( OperationContext* txn, BSONObjBuilder* output ) const;
+
+ virtual Status setCustomOption( OperationContext* txn,
+ const BSONElement& option,
+ BSONObjBuilder* info = NULL );
+
+ virtual void temp_cappedTruncateAfter(OperationContext* txn,
+ DiskLoc end,
+ bool inclusive);
+
+ virtual DiskLoc oplogStartHack(OperationContext* txn,
+ const DiskLoc& startingPosition) const;
+
+ void setCappedDeleteCallback(CappedDocumentDeleteCallback* cb) {
+ _cappedDeleteCallback = cb;
+ }
+ int64_t cappedMaxDocs() const;
+ int64_t cappedMaxSize() const;
+
+ const std::string& GetURI() const { return _uri; }
+ uint64_t instanceId() const { return _instanceId; }
+
+ void setSizeStorer( WiredTigerSizeStorer* ss ) { _sizeStorer = ss; }
+
+ private:
+
+ class Iterator : public RecordIterator {
+ public:
+ Iterator( const WiredTigerRecordStore& rs,
+ OperationContext* txn,
+ const DiskLoc& start,
+ const CollectionScanParams::Direction& dir,
+ bool forParallelCollectionScan );
+
+ virtual ~Iterator();
+
+ virtual bool isEOF();
+ virtual DiskLoc curr();
+ virtual DiskLoc getNext();
+ virtual void invalidate(const DiskLoc& dl);
+ virtual void saveState();
+ virtual bool restoreState(OperationContext *txn);
+ virtual RecordData dataFor( const DiskLoc& loc ) const;
+
+ private:
+ bool _forward() const;
+ void _getNext();
+ void _locate( const DiskLoc &loc, bool exact );
+ void _checkStatus();
+ DiskLoc _curr() const; // const version of public curr method
+
+ const WiredTigerRecordStore& _rs;
+ OperationContext* _txn;
+ RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore
+ CollectionScanParams::Direction _dir;
+ bool _forParallelCollectionScan;
+ scoped_ptr<WiredTigerCursor> _cursor;
+ bool _eof;
+
+ DiskLoc _lastLoc; // the last thing returned from getNext()
+ };
+
+ class NumRecordsChange;
+ class DataSizeChange;
+
+ static WiredTigerRecoveryUnit* _getRecoveryUnit( OperationContext* txn );
+
+ static uint64_t _makeKey(const DiskLoc &loc);
+ static DiskLoc _fromKey(uint64_t k);
+
+ DiskLoc _nextId();
+ void _setId(DiskLoc loc);
+ bool cappedAndNeedDelete(OperationContext* txn) const;
+ void cappedDeleteAsNeeded(OperationContext* txn);
+ void _changeNumRecords(OperationContext* txn, bool insert);
+ void _increaseDataSize(OperationContext* txn, int amount);
+ RecordData _getData( const WiredTigerCursor& cursor) const;
+ StatusWith<DiskLoc> extractAndCheckLocForOplog(const char* data, int len);
+
+ const std::string _uri;
+ const uint64_t _instanceId; // not persisted
+
+ // The capped settings should not be updated once operations have started
+ const bool _isCapped;
+ const bool _isOplog;
+ const int64_t _cappedMaxSize;
+ const int64_t _cappedMaxDocs;
+ CappedDocumentDeleteCallback* _cappedDeleteCallback;
+
+ const bool _useOplogHack;
+ DiskLoc _highestLocForOplogHack;
+
+ AtomicUInt64 _nextIdNum;
+ AtomicInt64 _dataSize;
+ AtomicInt64 _numRecords;
+
+ WiredTigerSizeStorer* _sizeStorer; // not owned, can be NULL
+ int _sizeStorerCounter;
+ };
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
new file mode 100644
index 00000000000..0f2f0541764
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -0,0 +1,410 @@
+// wiredtiger_record_store_test.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <sstream>
+#include <string>
+
+#include "mongo/db/json.h"
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/storage/record_store_test_harness.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+ using std::string;
+ using std::stringstream;
+
+ class WiredTigerHarnessHelper : public HarnessHelper {
+ public:
+ WiredTigerHarnessHelper() : _dbpath( "wt_test" ), _conn( NULL ) {
+
+ std::stringstream ss;
+ ss << "create,";
+ ss << "statistics=(all),";
+ string config = ss.str();
+ int ret = wiredtiger_open( _dbpath.path().c_str(), NULL, config.c_str(), &_conn);
+ invariantWTOK( ret );
+
+ _sessionCache = new WiredTigerSessionCache( _conn );
+ }
+
+ ~WiredTigerHarnessHelper() {
+ delete _sessionCache;
+ _conn->close(_conn, NULL);
+ }
+
+ virtual RecordStore* newNonCappedRecordStore() { return newNonCappedRecordStore("a.b"); }
+ RecordStore* newNonCappedRecordStore(const std::string& ns) {
+ WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit( _sessionCache );
+ OperationContextNoop txn( ru );
+ string uri = "table:" + ns;
+
+ StatusWith<std::string> result =
+ WiredTigerRecordStore::generateCreateString(ns, CollectionOptions(), "");
+ ASSERT_TRUE(result.isOK());
+ std::string config = result.getValue();
+
+ {
+ WriteUnitOfWork uow(&txn);
+ WT_SESSION* s = ru->getSession()->getSession();
+ invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) );
+ uow.commit();
+ }
+
+ return new WiredTigerRecordStore( &txn, ns, uri );
+ }
+
+ virtual RecordStore* newCappedRecordStore( int64_t cappedMaxSize,
+ int64_t cappedMaxDocs ) {
+ std::string ns = "a.b";
+
+ WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit( _sessionCache );
+ OperationContextNoop txn( ru );
+ string uri = "table:a.b";
+
+ StatusWith<std::string> result =
+ WiredTigerRecordStore::generateCreateString("", CollectionOptions(), "");
+ ASSERT_TRUE(result.isOK());
+ std::string config = result.getValue();
+
+ {
+ WriteUnitOfWork uow(&txn);
+ WT_SESSION* s = ru->getSession()->getSession();
+ invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) );
+ uow.commit();
+ }
+
+ return new WiredTigerRecordStore( &txn, ns, uri, true, cappedMaxSize, cappedMaxDocs );
+ }
+
+ virtual RecoveryUnit* newRecoveryUnit() {
+ return new WiredTigerRecoveryUnit( _sessionCache );
+ }
+ private:
+ unittest::TempDir _dbpath;
+ WT_CONNECTION* _conn;
+ WiredTigerSessionCache* _sessionCache;
+ };
+
+ HarnessHelper* newHarnessHelper() {
+ return new WiredTigerHarnessHelper();
+ }
+
+ TEST(WiredTigerRecordStoreTest, GenerateCreateStringUnknownField) {
+ CollectionOptions options;
+ options.storageEngine = fromjson("{wiredtiger: {unknownField: 1}}");
+ StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString("", options, "");
+ const Status& status = result.getStatus();
+ ASSERT_NOT_OK(status);
+ ASSERT_EQUALS(ErrorCodes::InvalidOptions, status.code());
+ }
+
+ TEST(WiredTigerRecordStoreTest, GenerateCreateStringNonStringConfig) {
+ CollectionOptions options;
+ options.storageEngine = fromjson("{wiredtiger: {configString: 12345}}");
+ StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString("", options, "");
+ const Status& status = result.getStatus();
+ ASSERT_NOT_OK(status);
+ ASSERT_EQUALS(ErrorCodes::TypeMismatch, status.code());
+ }
+
+ TEST(WiredTigerRecordStoreTest, Isolation1 ) {
+ scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() );
+ scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() );
+
+ DiskLoc loc1;
+ DiskLoc loc2;
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ {
+ WriteUnitOfWork uow( opCtx.get() );
+
+ StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false );
+ ASSERT_OK( res.getStatus() );
+ loc1 = res.getValue();
+
+ res = rs->insertRecord( opCtx.get(), "a", 2, false );
+ ASSERT_OK( res.getStatus() );
+ loc2 = res.getValue();
+
+ uow.commit();
+ }
+ }
+
+ {
+ scoped_ptr<OperationContext> t1( harnessHelper->newOperationContext() );
+ scoped_ptr<OperationContext> t2( harnessHelper->newOperationContext() );
+
+ scoped_ptr<WriteUnitOfWork> w1( new WriteUnitOfWork( t1.get() ) );
+ scoped_ptr<WriteUnitOfWork> w2( new WriteUnitOfWork( t2.get() ) );
+
+ rs->dataFor( t1.get(), loc1 );
+ rs->dataFor( t2.get(), loc1 );
+
+ ASSERT_OK( rs->updateRecord( t1.get(), loc1, "b", 2, false, NULL ).getStatus() );
+ ASSERT_OK( rs->updateRecord( t1.get(), loc2, "B", 2, false, NULL ).getStatus() );
+
+ try {
+ // this should fail
+ rs->updateRecord( t2.get(), loc1, "c", 2, false, NULL );
+ ASSERT( 0 );
+ }
+ catch ( WriteConflictException& dle ) {
+ w2.reset( NULL );
+ t2.reset( NULL );
+ }
+
+ w1->commit(); // this should succeed
+ }
+ }
+
+ TEST(WiredTigerRecordStoreTest, Isolation2 ) {
+ scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() );
+ scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() );
+
+ DiskLoc loc1;
+ DiskLoc loc2;
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ {
+ WriteUnitOfWork uow( opCtx.get() );
+
+ StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false );
+ ASSERT_OK( res.getStatus() );
+ loc1 = res.getValue();
+
+ res = rs->insertRecord( opCtx.get(), "a", 2, false );
+ ASSERT_OK( res.getStatus() );
+ loc2 = res.getValue();
+
+ uow.commit();
+ }
+ }
+
+ {
+ scoped_ptr<OperationContext> t1( harnessHelper->newOperationContext() );
+ scoped_ptr<OperationContext> t2( harnessHelper->newOperationContext() );
+
+ // ensure we start transactions
+ rs->dataFor( t1.get(), loc2 );
+ rs->dataFor( t2.get(), loc2 );
+
+ {
+ WriteUnitOfWork w( t1.get() );
+ ASSERT_OK( rs->updateRecord( t1.get(), loc1, "b", 2, false, NULL ).getStatus() );
+ w.commit();
+ }
+
+ {
+ WriteUnitOfWork w( t2.get() );
+ ASSERT_EQUALS( string("a"), rs->dataFor( t2.get(), loc1 ).data() );
+ try {
+ // this should fail as our version of loc1 is too old
+ rs->updateRecord( t2.get(), loc1, "c", 2, false, NULL );
+ ASSERT( 0 );
+ }
+ catch ( WriteConflictException& dle ) {
+ }
+
+ }
+
+ }
+ }
+
+ TEST(WiredTigerRecordStoreTest, SizeStorer1 ) {
+ scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() );
+ scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() );
+
+ string uri = dynamic_cast<WiredTigerRecordStore*>( rs.get() )->GetURI();
+
+ WiredTigerSizeStorer ss;
+ dynamic_cast<WiredTigerRecordStore*>( rs.get() )->setSizeStorer( &ss );
+
+ int N = 12;
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ {
+ WriteUnitOfWork uow( opCtx.get() );
+ for ( int i = 0; i < N; i++ ) {
+ StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false );
+ ASSERT_OK( res.getStatus() );
+ }
+ uow.commit();
+ }
+ }
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) );
+ }
+
+ rs.reset( NULL );
+
+ {
+ long long numRecords;
+ long long dataSize;
+ ss.load( uri, &numRecords, &dataSize );
+ ASSERT_EQUALS( N, numRecords );
+ }
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ rs.reset( new WiredTigerRecordStore( opCtx.get(), "a.b", uri,
+ false, -1, -1, NULL, &ss ) );
+ }
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) );
+ }
+
+ string indexUri = "table:myindex";
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ WiredTigerRecoveryUnit* ru =
+ dynamic_cast<WiredTigerRecoveryUnit*>( opCtx->recoveryUnit() );
+
+ {
+ WriteUnitOfWork uow( opCtx.get() );
+ WT_SESSION* s = ru->getSession()->getSession();
+ invariantWTOK( s->create( s, indexUri.c_str(), "" ) );
+ uow.commit();
+ }
+
+ {
+ WriteUnitOfWork uow( opCtx.get() );
+ ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri );
+ uow.commit();
+ }
+ }
+
+ {
+ scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() );
+ WiredTigerSizeStorer ss2;
+ ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri );
+ long long numRecords;
+ long long dataSize;
+ ss2.load( uri, &numRecords, &dataSize );
+ ASSERT_EQUALS( N, numRecords );
+ }
+
+ rs.reset( NULL ); // this has to be deleted before ss
+ }
+
+ StatusWith<DiskLoc> insertBSON(ptr<OperationContext> opCtx, ptr<RecordStore> rs,
+ const BSONObj& obj) {
+ WriteUnitOfWork wuow(opCtx);
+ StatusWith<DiskLoc> status = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false);
+ if (status.isOK())
+ wuow.commit();
+ return status;
+ }
+
+ // TODO make generic
+ TEST(WiredTigerRecordStoreTest, OplogHack) {
+ WiredTigerHarnessHelper harnessHelper;
+ scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.oplog.foo"));
+ scoped_ptr<OperationContext> opCtx(harnessHelper.newOperationContext());
+
+ // always illegal
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,-1))).getStatus(),
+ ErrorCodes::BadValue);
+
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("not_ts" << OpTime(2,1))).getStatus(),
+ ErrorCodes::BadValue);
+
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << "not an OpTime")).getStatus(),
+ ErrorCodes::BadValue);
+
+ // currently dasserts
+ // ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(-2,1))).getStatus(),
+ // ErrorCodes::BadValue);
+
+ // success cases
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(1,1))).getValue(),
+ DiskLoc(1,1));
+
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(1,2))).getValue(),
+ DiskLoc(1,2));
+
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,2))).getValue(),
+ DiskLoc(2,2));
+
+ // fails because <= highest
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,1))).getStatus(),
+ ErrorCodes::BadValue);
+
+ ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,2))).getStatus(),
+ ErrorCodes::BadValue);
+
+
+ // find start
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(0,1)), DiskLoc()); // nothing <=
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,1)), DiskLoc(1,2)); // between
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,2)), DiskLoc(2,2)); // ==
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(2,2)); // > highest
+
+ rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(2,2), false); // no-op
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(2,2));
+
+ rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(1,2), false); // deletes 2,2
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(1,2));
+
+ rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(1,2), true); // deletes 1,2
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(1,1));
+
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(rs->truncate(opCtx.get())); // deletes 1,1 and leaves collection empty
+ wuow.commit();
+ }
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc());
+ }
+
+ TEST(WiredTigerRecordStoreTest, OplogHackOnNonOplog) {
+ WiredTigerHarnessHelper harnessHelper;
+ scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.NOT_oplog.foo"));
+ scoped_ptr<OperationContext> opCtx(harnessHelper.newOperationContext());
+
+ ASSERT_OK(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,-1))).getStatus());
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(0,1)), DiskLoc().setInvalid());
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
new file mode 100644
index 00000000000..5f7694c8091
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -0,0 +1,268 @@
+// wiredtiger_recovery_unit.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/stacktrace.h"
+
+namespace mongo {
+
+ namespace {
+ struct AwaitCommitData {
+ AwaitCommitData() :
+ numWaitingForSync(0),
+ lastSyncTime(0) {
+ }
+
+ void syncHappend() {
+ boost::mutex::scoped_lock lk( mutex );
+ lastSyncTime++;
+ condvar.notify_all();
+ }
+
+ // return true if happened
+ bool awaitCommit() {
+ boost::mutex::scoped_lock lk( mutex );
+ long long start = lastSyncTime;
+ numWaitingForSync.fetchAndAdd(1);
+ condvar.timed_wait(lk,boost::posix_time::milliseconds(50));
+ numWaitingForSync.fetchAndAdd(-1);
+ return lastSyncTime > start;
+ }
+
+ AtomicUInt32 numWaitingForSync;
+
+ boost::mutex mutex; // this just protects lastSyncTime
+ boost::condition condvar;
+ long long lastSyncTime;
+ } awaitCommitData;
+ }
+
+ WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc) :
+ _sessionCache( sc ),
+ _session( NULL ),
+ _depth(0),
+ _active( false ),
+ _everStartedWrite( false ),
+ _currentlySquirreled( false ),
+ _syncing( false ) {
+ }
+
+ WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() {
+ invariant( _depth == 0 );
+ _abort();
+ if ( _session ) {
+ _sessionCache->releaseSession( _session );
+ _session = NULL;
+ }
+ }
+
+ void WiredTigerRecoveryUnit::reportState( BSONObjBuilder* b ) const {
+ b->append( "wt_depth", _depth );
+ b->append( "wt_active", _active );
+ b->append( "wt_everStartedWrite", _everStartedWrite );
+ if ( _active )
+ b->append( "wt_millisSinceCommit", _timer.millis() );
+ }
+
+ void WiredTigerRecoveryUnit::_commit() {
+ if ( _session && _active ) {
+ _txnClose( true );
+ }
+
+ for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
+ (*it)->commit();
+ }
+ _changes.clear();
+ }
+
+ void WiredTigerRecoveryUnit::_abort() {
+ if ( _session && _active ) {
+ _txnClose( false );
+ }
+
+ for (Changes::reverse_iterator it = _changes.rbegin(), end = _changes.rend();
+ it != end; ++it) {
+ (*it)->rollback();
+ }
+ _changes.clear();
+ }
+
+ void WiredTigerRecoveryUnit::beginUnitOfWork() {
+ invariant( !_currentlySquirreled );
+ _depth++;
+ _everStartedWrite = true;
+ }
+
+ void WiredTigerRecoveryUnit::commitUnitOfWork() {
+ if (_depth > 1)
+ return; // only outermost WUOW gets committed.
+ _commit();
+ }
+
+ void WiredTigerRecoveryUnit::endUnitOfWork() {
+ _depth--;
+ if ( _depth == 0 ) {
+ _abort();
+ }
+ }
+
+ void WiredTigerRecoveryUnit::goingToAwaitCommit() {
+ if ( _active ) {
+ // too late, can't change config
+ return;
+ }
+ // yay, we've configured ourselves for sync
+ _syncing = true;
+ }
+
+ bool WiredTigerRecoveryUnit::awaitCommit() {
+ if ( _syncing && _everStartedWrite ) {
+ // we did a sync, so we're good
+ return true;
+ }
+ awaitCommitData.awaitCommit();
+ return true;
+ }
+
+ void WiredTigerRecoveryUnit::registerChange(Change* change) {
+ invariant(_depth > 0);
+ _changes.push_back(ChangePtr(change));
+ }
+
+ WiredTigerRecoveryUnit* WiredTigerRecoveryUnit::get(OperationContext *txn) {
+ invariant( txn );
+ return dynamic_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit());
+ }
+
+ WiredTigerSession* WiredTigerRecoveryUnit::getSession() {
+ if ( !_session ) {
+ _session = _sessionCache->getSession();
+ }
+
+ if ( !_active ) {
+ _txnOpen();
+ }
+ return _session;
+ }
+
+ void WiredTigerRecoveryUnit::commitAndRestart() {
+ invariant( _depth == 0 );
+ if ( _active ) {
+ _txnClose( true );
+ }
+ }
+
+ void WiredTigerRecoveryUnit::_txnClose( bool commit ) {
+ invariant( _active );
+ WT_SESSION *s = _session->getSession();
+ if ( commit ) {
+ invariantWTOK( s->commit_transaction(s, NULL) );
+ LOG(2) << "WT commit_transaction";
+ if ( _syncing )
+ awaitCommitData.syncHappend();
+ }
+ else {
+ invariantWTOK( s->rollback_transaction(s, NULL) );
+ LOG(2) << "WT rollback_transaction";
+ }
+ _active = false;
+ }
+
+ void WiredTigerRecoveryUnit::_txnOpen() {
+ invariant( !_active );
+ WT_SESSION *s = _session->getSession();
+ _syncing = _syncing || awaitCommitData.numWaitingForSync.load() > 0;
+ invariantWTOK( s->begin_transaction(s, _syncing ? "sync=true" : NULL) );
+ LOG(2) << "WT begin_transaction";
+ _timer.reset();
+ _active = true;
+ }
+
+ void WiredTigerRecoveryUnit::beingReleasedFromOperationContext() {
+ LOG(2) << "WiredTigerRecoveryUnit::beingReleased";
+ _currentlySquirreled = true;
+ if ( !wt_keeptxnopen() ) {
+ _commit();
+ }
+ }
+ void WiredTigerRecoveryUnit::beingSetOnOperationContext() {
+ LOG(2) << "WiredTigerRecoveryUnit::broughtBack";
+ _currentlySquirreled = false;
+ }
+
+
+ // ---------------------
+
+ WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t id, WiredTigerRecoveryUnit* ru) {
+ _init( uri, id, ru );
+ }
+
+ WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t id, OperationContext* txn) {
+ _init( uri, id, WiredTigerRecoveryUnit::get( txn ) );
+ }
+
+ void WiredTigerCursor::_init( const std::string& uri, uint64_t id, WiredTigerRecoveryUnit* ru ) {
+ _uriID = id;
+ _ru = ru;
+ _session = _ru->getSession();
+ _cursor = _session->getCursor( uri, id );
+ if ( !_cursor ) {
+ error() << "no cursor for uri: " << uri;
+ }
+ }
+
+ WiredTigerCursor::~WiredTigerCursor() {
+ invariant( _session == _ru->getSession() );
+ _session->releaseCursor( _uriID, _cursor );
+ _cursor = NULL;
+ }
+
+ WT_CURSOR* WiredTigerCursor::get() const {
+ invariant( _session == _ru->getSession() );
+ return _cursor;
+ }
+
+ void WiredTigerCursor::reset() {
+ invariantWTOK( _cursor->reset( _cursor ) );
+ }
+
+ WT_SESSION* WiredTigerCursor::getWTSession() {
+ return _session->getSession();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
new file mode 100644
index 00000000000..53af1d01b22
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -0,0 +1,137 @@
+// wiredtiger_recovery_unit.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <wiredtiger.h>
+
+#include <memory.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/util/timer.h"
+
+namespace mongo {
+
+ class BSONObjBuilder;
+ class WiredTigerSession;
+ class WiredTigerSessionCache;
+
+ class WiredTigerRecoveryUnit : public RecoveryUnit {
+ public:
+ WiredTigerRecoveryUnit(WiredTigerSessionCache* sc);
+
+ virtual ~WiredTigerRecoveryUnit();
+
+ virtual void reportState( BSONObjBuilder* b ) const;
+
+ virtual void beginUnitOfWork();
+
+ virtual void commitUnitOfWork();
+
+ virtual void endUnitOfWork();
+
+ virtual bool awaitCommit();
+ virtual void goingToAwaitCommit();
+
+ virtual void registerChange(Change *);
+
+ virtual void beingReleasedFromOperationContext();
+ virtual void beingSetOnOperationContext();
+
+ virtual void commitAndRestart();
+
+ // un-used API
+ virtual void* writingPtr(void* data, size_t len) { invariant(!"don't call writingPtr"); }
+ virtual void syncDataAndTruncateJournal() {}
+
+ // ---- WT STUFF
+
+ WiredTigerSession* getSession();
+ WiredTigerSessionCache* getSessionCache() { return _sessionCache; }
+
+ bool everStartedWrite() const { return _everStartedWrite; }
+ int depth() const { return _depth; }
+
+ static WiredTigerRecoveryUnit* get(OperationContext *txn);
+
+ private:
+
+ void _abort();
+ void _commit();
+
+ void _txnClose( bool commit );
+ void _txnOpen();
+
+ WiredTigerSessionCache* _sessionCache; // not owned
+ WiredTigerSession* _session; // owned, but from pool
+ bool _defaultCommit;
+ int _depth;
+ bool _active;
+ bool _everStartedWrite;
+ Timer _timer;
+ bool _currentlySquirreled;
+ bool _syncing;
+
+ typedef boost::shared_ptr<Change> ChangePtr;
+ typedef std::vector<ChangePtr> Changes;
+ Changes _changes;
+ };
+
+ /**
+ * This is a smart pointer that wraps a WT_CURSOR and knows how to obtain and get from pool.
+ */
+ class WiredTigerCursor {
+ public:
+ WiredTigerCursor(const std::string& uri, uint64_t uriID, OperationContext* txn);
+ WiredTigerCursor(const std::string& uri, uint64_t uriID, WiredTigerRecoveryUnit* ru);
+ ~WiredTigerCursor();
+
+ WT_CURSOR* get() const;
+ WT_CURSOR* operator->() const { return get(); }
+
+ WiredTigerSession* getSession() { return _session; }
+ WT_SESSION* getWTSession();
+
+ void reset();
+
+ private:
+ void _init( const std::string& uri, uint64_t uriID, WiredTigerRecoveryUnit* ru );
+
+ uint64_t _uriID;
+ WiredTigerRecoveryUnit* _ru; // not owned
+ WiredTigerSession* _session;
+ WT_CURSOR* _cursor; // owned, but pulled
+ };
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp
new file mode 100644
index 00000000000..89370ac35b1
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp
@@ -0,0 +1,81 @@
+// wiredtiger_server_status.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_server_status.h"
+
+#include "boost/scoped_ptr.hpp"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+ using std::string;
+
+ WiredTigerServerStatusSection::WiredTigerServerStatusSection(WiredTigerKVEngine* engine)
+ : ServerStatusSection("wiredtiger"),
+ _engine(engine) { }
+
+ bool WiredTigerServerStatusSection::includeByDefault() const {
+ return true;
+ }
+
+ BSONObj WiredTigerServerStatusSection::generateSection(
+ const BSONElement& configElement) const {
+
+ boost::scoped_ptr<WiredTigerRecoveryUnit> recoveryUnit(
+ dynamic_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit()));
+ WiredTigerSession* session = recoveryUnit->getSession();
+ WT_SESSION* s = session->getSession();
+ invariant(s);
+ const string uri = "statistics:";
+
+ BSONObjBuilder bob;
+ Status status = WiredTigerUtil::exportTableToBSON(s, uri,
+ "statistics=(fast)", &bob);
+ if (!status.isOK()) {
+ bob.append("error", "unable to retrieve statistics");
+ bob.append("code", static_cast<int>(status.code()));
+ bob.append("reason", status.reason());
+ }
+
+ return bob.obj();
+ }
+
+} // namespace mongo
+
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h
new file mode 100644
index 00000000000..cc0cbb1a492
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h
@@ -0,0 +1,51 @@
+// wiredtiger_server_status.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/commands/server_status.h"
+
+namespace mongo {
+
+ class WiredTigerKVEngine;
+
+ /**
+ * Adds "wiredtiger" to the results of db.serverStatus().
+ */
+ class WiredTigerServerStatusSection : public ServerStatusSection {
+ public:
+ WiredTigerServerStatusSection(WiredTigerKVEngine* engine);
+ virtual bool includeByDefault() const;
+ virtual BSONObj generateSection(const BSONElement& configElement) const;
+ private:
+ WiredTigerKVEngine* _engine;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
new file mode 100644
index 00000000000..a2e861047da
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -0,0 +1,184 @@
+// wiredtiger_session_cache.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+ WiredTigerSession::WiredTigerSession( WT_CONNECTION* conn, int epoch )
+ : _epoch( epoch ), _session( NULL ), _cursorsOut( 0 ) {
+ int ret = conn->open_session(conn, NULL, "isolation=snapshot", &_session);
+ invariantWTOK(ret);
+ }
+
+ WiredTigerSession::~WiredTigerSession() {
+ if (_session) {
+ int ret = _session->close(_session, NULL);
+ invariantWTOK(ret);
+ _session = NULL;
+ }
+ }
+
+ WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id) {
+ {
+ Cursors& cursors = _curmap[id];
+ if ( !cursors.empty() ) {
+ WT_CURSOR* save = cursors.back();
+ cursors.pop_back();
+ _cursorsOut++;
+ return save;
+ }
+ }
+ WT_CURSOR* c = NULL;
+ int ret = _session->open_cursor(_session, uri.c_str(), NULL, "overwrite=false", &c);
+ if (ret != ENOENT)
+ invariantWTOK(ret);
+ if ( c ) _cursorsOut++;
+ return c;
+ }
+
+ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR *cursor) {
+ invariant( _session );
+ invariant( cursor );
+ _cursorsOut--;
+
+ Cursors& cursors = _curmap[id];
+ if ( cursors.size() > 10u ) {
+ invariantWTOK( cursor->close(cursor) );
+ }
+ else {
+ invariantWTOK( cursor->reset( cursor ) );
+ cursors.push_back( cursor );
+ }
+ }
+
+ void WiredTigerSession::closeAllCursors() {
+ invariant( _session );
+ for (CursorMap::iterator i = _curmap.begin(); i != _curmap.end(); ++i ) {
+ Cursors& cursors = i->second;
+ for ( size_t j = 0; j < cursors.size(); j++ ) {
+ WT_CURSOR *cursor = cursors[j];
+ if (cursor) {
+ int ret = cursor->close(cursor);
+ invariantWTOK(ret);
+ }
+ }
+ }
+ _curmap.clear();
+ }
+
+ namespace {
+ AtomicUInt64 nextCursorId(1);
+ }
+ // static
+ uint64_t WiredTigerSession::genCursorId() {
+ return nextCursorId.fetchAndAdd(1);
+ }
+
+ // -----------------------
+
+ WiredTigerSessionCache::WiredTigerSessionCache( WiredTigerKVEngine* engine )
+ : _engine( engine ), _conn( engine->getConnection() ) {
+ }
+
+ WiredTigerSessionCache::WiredTigerSessionCache( WT_CONNECTION* conn )
+ : _engine( NULL ), _conn( conn ) {
+ }
+
+ WiredTigerSessionCache::~WiredTigerSessionCache() {
+ _closeAll();
+ }
+
+ void WiredTigerSessionCache::closeAll() {
+ boost::mutex::scoped_lock lk( _sessionLock );
+ _closeAll();
+ }
+
+ void WiredTigerSessionCache::_closeAll() {
+ for ( size_t i = 0; i < _sessionPool.size(); i++ ) {
+ delete _sessionPool[i];
+ }
+ _sessionPool.clear();
+ }
+
+ WiredTigerSession* WiredTigerSessionCache::getSession() {
+ {
+ boost::mutex::scoped_lock lk( _sessionLock );
+ if ( !_sessionPool.empty() ) {
+ WiredTigerSession* s = _sessionPool.back();
+ _sessionPool.pop_back();
+ {
+ WT_SESSION* ss = s->getSession();
+ uint64_t range;
+ invariantWTOK( ss->transaction_pinned_range( ss, &range ) );
+ invariant( range == 0 );
+ }
+ return s;
+ }
+ }
+ return new WiredTigerSession( _conn, _engine ? _engine->currentEpoch() : -1 );
+ }
+
+ void WiredTigerSessionCache::releaseSession( WiredTigerSession* session ) {
+ invariant( session );
+ invariant( session->cursorsOut() == 0 );
+
+ {
+ WT_SESSION* ss = session->getSession();
+ uint64_t range;
+ invariantWTOK( ss->transaction_pinned_range( ss, &range ) );
+ invariant( range == 0 );
+ }
+
+ if ( _engine && _engine->haveDropsQueued() && session->epoch() < _engine->currentEpoch() ) {
+ delete session;
+ _engine->dropAllQueued();
+ return;
+ }
+
+ boost::mutex::scoped_lock lk( _sessionLock );
+ _sessionPool.push_back( session );
+ }
+
+ bool WiredTigerSessionCache::_shouldBeClosed( WiredTigerSession* session ) const {
+ if ( !_engine )
+ return false;
+ if ( !_engine->haveDropsQueued() )
+ return false;
+ return session->epoch() < _engine->currentEpoch();
+ }
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
new file mode 100644
index 00000000000..9d49311a742
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -0,0 +1,108 @@
+// wiredtiger_session_cache.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include <boost/thread/mutex.hpp>
+
+#include <wiredtiger.h>
+
+namespace mongo {
+
+ class WiredTigerKVEngine;
+
+ /**
+ * This is a structure that caches 1 cursor for each uri.
+ * The idea is that there is a pool of these somewhere.
+ * NOT THREADSAFE
+ */
+ class WiredTigerSession {
+ public:
+ WiredTigerSession( WT_CONNECTION* conn, int epoch );
+ ~WiredTigerSession();
+
+ WT_SESSION* getSession() const { return _session; }
+
+ WT_CURSOR* getCursor(const std::string& uri, uint64_t id);
+ void releaseCursor(uint64_t id, WT_CURSOR *cursor);
+
+ void closeAllCursors();
+
+ int cursorsOut() const { return _cursorsOut; }
+
+ int epoch() const { return _epoch; }
+
+ static uint64_t genCursorId();
+
+ /**
+ * For "metadata:" cursors. Guaranteed never to collide with genCursorId() ids.
+ */
+ static const uint64_t kMetadataCursorId = 0;
+
+ private:
+ int _epoch;
+ WT_SESSION* _session; // owned
+ typedef std::vector<WT_CURSOR*> Cursors;
+ typedef std::map<uint64_t, Cursors> CursorMap;
+ CursorMap _curmap; // owned
+ int _cursorsOut;
+ };
+
+ class WiredTigerSessionCache {
+ public:
+
+ WiredTigerSessionCache( WiredTigerKVEngine* engine );
+ WiredTigerSessionCache( WT_CONNECTION* conn );
+ ~WiredTigerSessionCache();
+
+ WiredTigerSession* getSession();
+ void releaseSession( WiredTigerSession* session );
+
+ void closeAll();
+
+ private:
+
+ bool _shouldBeClosed( WiredTigerSession* session ) const;
+
+ void _closeAll(); // does not lock
+
+ WiredTigerKVEngine* _engine; // not owned, might be NULL
+ WT_CONNECTION* _conn; // not owned
+ typedef std::vector<WiredTigerSession*> SessionPool;
+ SessionPool _sessionPool; // owned
+ mutable boost::mutex _sessionLock;
+ };
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
new file mode 100644
index 00000000000..91a6d9342d2
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp
@@ -0,0 +1,212 @@
+// wiredtiger_size_storer.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include <wiredtiger.h>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+ namespace {
+ int MAGIC = 123123;
+ }
+
+ WiredTigerSizeStorer::WiredTigerSizeStorer() {
+ _magic = MAGIC;
+ }
+
+ WiredTigerSizeStorer::~WiredTigerSizeStorer() {
+ _magic = 11111;
+ }
+
+ void WiredTigerSizeStorer::_checkMagic() const {
+ if ( _magic == MAGIC )
+ return;
+ log() << "WiredTigerSizeStorer magic wrong: " << _magic;
+ invariant( _magic == MAGIC );
+ }
+
+ void WiredTigerSizeStorer::onCreate( WiredTigerRecordStore* rs,
+ long long numRecords, long long dataSize ) {
+ _checkMagic();
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ Entry& entry = _entries[rs->GetURI()];
+ entry.rs = rs;
+ entry.numRecords = numRecords;
+ entry.dataSize = dataSize;
+ entry.dirty = true;
+ }
+
+ void WiredTigerSizeStorer::onDestroy( WiredTigerRecordStore* rs ) {
+ _checkMagic();
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ Entry& entry = _entries[rs->GetURI()];
+ entry.numRecords = rs->numRecords( NULL );
+ entry.dataSize = rs->dataSize( NULL );
+ entry.dirty = true;
+ entry.rs = NULL;
+ }
+
+
+ void WiredTigerSizeStorer::store( const StringData& uri,
+ long long numRecords, long long dataSize ) {
+ _checkMagic();
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ Entry& entry = _entries[uri.toString()];
+ entry.numRecords = numRecords;
+ entry.dataSize = dataSize;
+ entry.dirty = true;
+ }
+
+ void WiredTigerSizeStorer::load( const StringData& uri,
+ long long* numRecords, long long* dataSize ) const {
+ _checkMagic();
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ Map::const_iterator it = _entries.find( uri.toString() );
+ if ( it == _entries.end() ) {
+ *numRecords = 0;
+ *dataSize = 0;
+ return;
+ }
+ *numRecords = it->second.numRecords;
+ *dataSize = it->second.dataSize;
+ }
+
+ void WiredTigerSizeStorer::loadFrom( WiredTigerSession* session,
+ const std::string& uri ) {
+ _checkMagic();
+
+ Map m;
+ {
+ WT_SESSION* s = session->getSession();
+ WT_CURSOR* c = NULL;
+ int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
+ if ( ret == ENOENT ) {
+ // doesn't exist, we'll create later
+ return;
+ }
+ invariantWTOK( ret );
+
+ while ( c->next(c) == 0 ) {
+ WT_ITEM key;
+ WT_ITEM value;
+ invariantWTOK( c->get_key(c, &key ) );
+ invariantWTOK( c->get_value(c, &value ) );
+ string uri( reinterpret_cast<const char*>( key.data ), key.size );
+ BSONObj data( reinterpret_cast<const char*>( value.data ) );
+
+ LOG(2) << "WiredTigerSizeStorer::loadFrom " << uri << " -> " << data;
+
+ Entry& e = m[uri];
+ e.numRecords = data["numRecords"].safeNumberLong();
+ e.dataSize = data["dataSize"].safeNumberLong();
+ e.dirty = false;
+ e.rs = NULL;
+ }
+ invariantWTOK( c->close(c) );
+ }
+
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ _entries = m;
+ }
+
+ void WiredTigerSizeStorer::storeInto( WiredTigerSession* session,
+ const std::string& uri ) {
+ Map myMap;
+ {
+ boost::mutex::scoped_lock lk( _entriesMutex );
+ for ( Map::iterator it = _entries.begin(); it != _entries.end(); ++it ) {
+ string uri = it->first;
+ Entry& entry = it->second;
+ if ( entry.rs ) {
+ if ( entry.dataSize != entry.rs->dataSize( NULL ) ) {
+ entry.dataSize = entry.rs->dataSize( NULL );
+ entry.dirty = true;
+ }
+ if ( entry.numRecords != entry.rs->numRecords( NULL ) ) {
+ entry.numRecords = entry.rs->numRecords( NULL );
+ entry.dirty = true;
+ }
+ }
+
+ if ( !entry.dirty )
+ continue;
+ myMap[uri] = entry;
+ }
+ }
+
+ WT_SESSION* s = session->getSession();
+ WT_CURSOR* c = NULL;
+ int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
+ if ( ret == ENOENT ) {
+ invariantWTOK( s->create( s, uri.c_str(), "" ) );
+ ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c );
+ }
+ invariantWTOK( ret );
+
+ for ( Map::iterator it = myMap.begin(); it != myMap.end(); ++it ) {
+ string uri = it->first;
+ Entry& entry = it->second;
+
+ BSONObj data;
+ {
+ BSONObjBuilder b;
+ b.append( "numRecords", entry.numRecords );
+ b.append( "dataSize", entry.dataSize );
+ data = b.obj();
+ }
+
+ LOG(2) << "WiredTigerSizeStorer::storeInto " << uri << " -> " << data;
+
+ WiredTigerItem key( uri.c_str(), uri.size() );
+ WiredTigerItem value( data.objdata(), data.objsize() );
+ c->set_key( c, key.Get() );
+ c->set_value( c, value.Get() );
+ invariantWTOK( c->insert(c) );
+ entry.dirty = false;
+
+ c->reset(c);
+ }
+
+ invariantWTOK( c->close(c) );
+
+ }
+
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
new file mode 100644
index 00000000000..15df6874139
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h
@@ -0,0 +1,81 @@
+// wiredtiger_size_storer.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include <boost/thread/mutex.hpp>
+
+#include "mongo/base/string_data.h"
+
+namespace mongo {
+
+ class WiredTigerRecordStore;
+ class WiredTigerSession;
+
+ class WiredTigerSizeStorer {
+ public:
+ WiredTigerSizeStorer();
+ ~WiredTigerSizeStorer();
+
+ void onCreate( WiredTigerRecordStore* rs, long long nr, long long ds );
+ void onDestroy( WiredTigerRecordStore* rs );
+
+ void store( const StringData& uri,
+ long long numRecords, long long dataSize );
+
+ void load( const StringData& uri,
+ long long* numRecords, long long* dataSize ) const;
+
+ void loadFrom( WiredTigerSession* cursor, const std::string& uri );
+ void storeInto( WiredTigerSession* cursor, const std::string& uri );
+
+ private:
+ void _checkMagic() const;
+
+ struct Entry {
+ Entry() : numRecords(0), dataSize(0), dirty(false), rs(NULL){}
+ long long numRecords;
+ long long dataSize;
+ bool dirty;
+ WiredTigerRecordStore* rs; // not owned
+ };
+
+ int _magic;
+
+ typedef std::map<std::string,Entry> Map;
+ Map _entries;
+ mutable boost::mutex _entriesMutex;
+ };
+
+}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp
new file mode 100644
index 00000000000..b67e4870a8f
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp
@@ -0,0 +1,137 @@
+// wiredtiger_util.cpp
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+ using std::string;
+
+ int64_t WiredTigerUtil::getIdentSize(WT_SESSION* s,
+ const std::string& uri ) {
+ BSONObjBuilder b;
+ Status status = WiredTigerUtil::exportTableToBSON(s,
+ "statistics:" + uri,
+ "statistics=(fast)",
+ &b);
+ if ( !status.isOK() ) {
+ if ( status.code() == ErrorCodes::CursorNotFound ) {
+ // ident gone, so its 0
+ return 0;
+ }
+ uassertStatusOK( status );
+ }
+
+ BSONObj obj = b.obj();
+ BSONObj sub = obj["block manager"].Obj();
+ BSONElement e = sub["file size in bytes"];
+ invariant( e.type() );
+
+ if ( e.isNumber() )
+ return e.safeNumberLong();
+
+ return strtoull( e.valuestrsafe(), NULL, 10 );
+ }
+
+
+ Status WiredTigerUtil::exportTableToBSON(WT_SESSION* s,
+ const std::string& uri, const std::string& config,
+ BSONObjBuilder* bob) {
+ invariant(s);
+ invariant(bob);
+ WT_CURSOR* c = NULL;
+ const char *cursorConfig = config.empty() ? NULL : config.c_str();
+ int ret = s->open_cursor(s, uri.c_str(), NULL, cursorConfig, &c);
+ if (ret != 0) {
+ return Status(ErrorCodes::CursorNotFound, str::stream()
+ << "unable to open cursor at URI " << uri
+ << ". reason: " << wiredtiger_strerror(ret));
+ }
+ bob->append("uri", uri);
+ invariant(c);
+ ON_BLOCK_EXIT(c->close, c);
+
+ std::map<string,BSONObjBuilder*> subs;
+ const char *desc, *pvalue;
+ uint64_t value;
+ while (c->next(c) == 0 && c->get_value(c, &desc, &pvalue, &value) == 0) {
+ StringData key( desc );
+
+ StringData prefix;
+ StringData suffix;
+
+ size_t idx = key.find( ':' );
+ if ( idx != string::npos ) {
+ prefix = key.substr( 0, idx );
+ suffix = key.substr( idx + 1 );
+ }
+ else {
+ idx = key.find( ' ' );
+ }
+
+ if ( idx != string::npos ) {
+ prefix = key.substr( 0, idx );
+ suffix = key.substr( idx + 1 );
+ }
+ else {
+ prefix = key;
+ suffix = "num";
+ }
+
+ if ( prefix.size() == 0 ) {
+ bob->append(desc, pvalue);
+ }
+ else {
+ BSONObjBuilder*& sub = subs[prefix.toString()];
+ if ( !sub )
+ sub = new BSONObjBuilder();
+ sub->append( mongoutils::str::ltrim(suffix.toString()), pvalue );
+ }
+ }
+
+ for ( std::map<string,BSONObjBuilder*>::const_iterator it = subs.begin();
+ it != subs.end(); ++it ) {
+ const std::string& s = it->first;
+ bob->append( s, it->second->obj() );
+ delete it->second;
+ }
+ return Status::OK();
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.h b/src/mongo/db/storage/wiredtiger/wiredtiger_util.h
new file mode 100644
index 00000000000..5de65c30b43
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.h
@@ -0,0 +1,139 @@
+// wiredtiger_util.h
+
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <wiredtiger.h>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/stacktrace.h"
+
+namespace mongo {
+
+ class BSONObjBuilder;
+
+ inline bool wt_keeptxnopen() {
+ return false;
+ }
+
+ /**
+ * converts wiredtiger return codes to mongodb statuses.
+ */
+ inline Status wtRCToStatus(int retCode) {
+ if (MONGO_likely(retCode == 0))
+ return Status::OK();
+
+
+ if ( retCode == WT_ROLLBACK ) {
+ //printStackTrace();
+ throw WriteConflictException();
+ }
+
+ // TODO convert specific codes rather than just using UNKNOWN_ERROR for everything.
+ return Status(ErrorCodes::UnknownError,
+ str::stream() << retCode << ": " << wiredtiger_strerror(retCode));
+
+ }
+
+ inline void invariantWTOK(int retCode) {
+ if (MONGO_likely(retCode == 0))
+ return;
+
+ fassertFailedWithStatus(28519, wtRCToStatus(retCode));
+ }
+
+ struct WiredTigerItem : public WT_ITEM {
+ WiredTigerItem(const void *d, size_t s) {
+ data = d;
+ size = s;
+ }
+ WiredTigerItem(const std::string &str) {
+ data = str.c_str();
+ size = str.size();
+ }
+ // NOTE: do not call Get() on a temporary.
+ // The pointer returned by Get() must not be allowed to live longer than *this.
+ WT_ITEM *Get() { return this; }
+ const WT_ITEM *Get() const { return this; }
+ };
+
+ class WiredTigerUtil {
+ MONGO_DISALLOW_COPYING(WiredTigerUtil);
+ private:
+ WiredTigerUtil();
+
+ public:
+
+ /**
+ * Reads contents of table using URI and exports all keys to BSON as string elements.
+ * Additional, adds 'uri' field to output document.
+ */
+ static Status exportTableToBSON(WT_SESSION* s,
+ const std::string& uri, const std::string& config,
+ BSONObjBuilder* bob);
+
+ static int64_t getIdentSize(WT_SESSION* s,
+ const std::string& uri );
+ };
+
+ class WiredTigerConfigParser {
+ MONGO_DISALLOW_COPYING(WiredTigerConfigParser);
+ public:
+ WiredTigerConfigParser(const StringData& config) {
+ invariantWTOK(wiredtiger_config_parser_open(NULL, config.rawData(), config.size(),
+ &_parser));
+ }
+
+ WiredTigerConfigParser(const WT_CONFIG_ITEM& nested) {
+ invariant(nested.type == WT_CONFIG_ITEM::WT_CONFIG_ITEM_STRUCT);
+ invariantWTOK(wiredtiger_config_parser_open(NULL, nested.str, nested.len, &_parser));
+ }
+
+ ~WiredTigerConfigParser() {
+ invariantWTOK(_parser->close(_parser));
+ }
+
+ int next(WT_CONFIG_ITEM* key, WT_CONFIG_ITEM* value) {
+ return _parser->next(_parser, key, value);
+ }
+
+ int get(const char* key, WT_CONFIG_ITEM* value) {
+ return _parser->get(_parser, key, value);
+ }
+
+ private:
+ WT_CONFIG_PARSER* _parser;
+ };
+
+}
diff --git a/src/third_party/SConscript b/src/third_party/SConscript
index 7ad9fae9f32..91a74add572 100644
--- a/src/third_party/SConscript
+++ b/src/third_party/SConscript
@@ -1,6 +1,7 @@
# -*- mode: python -*-
Import("env use_system_version_of_library windows darwin usev8 v8suffix solaris boostSuffix")
+Import("wiredtiger")
snappySuffix = '-1.1.2'
@@ -33,6 +34,13 @@ if not use_system_version_of_library('stemmer'):
thirdPartyIncludePathList.append(
('stemmer', '#/src/third_party/libstemmer_c/include'))
+# Note that the wiredtiger header is generated, so
+# we want to look for it in the build directory not
+# the source directory.
+if wiredtiger and not use_system_version_of_library('wiredtiger'):
+ thirdPartyIncludePathList.append(
+ ('wiredtiger', '$BUILD_DIR/third_party/wiredtiger'))
+
if not use_system_version_of_library('yaml'):
thirdPartyIncludePathList.append(
('yaml', '#/src/third_party/yaml-cpp-0.5.1/include'))
@@ -236,3 +244,26 @@ tzEnv.Library(
source=[
'shim_tz.cpp',
])
+
+
+if wiredtiger:
+ if use_system_version_of_library("wiredtiger"):
+ wiredtigerEnv = env.Clone(
+ SYSLIBDEPS=[
+ env['LIBDEPS_WIREDTIGER_SYSLIBDEP'],
+ ])
+ else:
+ wiredtigerEnv = env.Clone()
+ wiredtigerEnv.InjectThirdPartyIncludePaths(libraries=['wiredtiger'])
+ wiredtigerEnv.SConscript('wiredtiger/SConscript', exports={ 'env' : wiredtigerEnv })
+ wiredtigerEnv = wiredtigerEnv.Clone(
+ LIBDEPS=[
+ 'wiredtiger/wiredtiger',
+ ])
+
+ wiredtigerEnv.Library(
+ target="shim_wiredtiger",
+ source=[
+ 'shim_wiredtiger.cpp'
+ ])
+
diff --git a/src/third_party/shim_wiredtiger.cpp b/src/third_party/shim_wiredtiger.cpp
new file mode 100644
index 00000000000..ab1cc40aea1
--- /dev/null
+++ b/src/third_party/shim_wiredtiger.cpp
@@ -0,0 +1,3 @@
+// This file intentionally blank. shim_wiredtiger.cpp is part of the
+// third_party/wiredtiger library, which is just a placeholder for forwarding
+// library dependencies.