diff options
28 files changed, 5138 insertions, 22 deletions
diff --git a/SConstruct b/SConstruct index 9976740d74c..274d716eef4 100644 --- a/SConstruct +++ b/SConstruct @@ -230,6 +230,8 @@ add_option( "mm", "use main memory instead of memory mapped files" , 0 , True ) add_option( "ssl" , "Enable SSL" , 0 , True ) add_option( "ssl-fips-capability", "Enable the ability to activate FIPS 140-2 mode", 0, True ); add_option( "rocksdb" , "Enable RocksDB" , 0 , False ) +add_option( "wiredtiger", "Enable wiredtiger", "?", True, "wiredtiger", + type="choice", choices=["on", "off"], const="on", default="on") add_option( "replication-implementation", "Controls what implementation is used for the replication system", "?", False, type="choice", choices=["impl", "legacy"], const="impl", default="impl" ) @@ -290,6 +292,8 @@ add_option( "use-system-tcmalloc", "use system version of tcmalloc library", 0, add_option( "use-system-pcre", "use system version of pcre library", 0, True ) +add_option( "use-system-wiredtiger", "use system version of wiredtiger library", 0, True) + # library choices boost_choices = ['1.49', '1.56'] add_option( "internal-boost", "Specify internal boost version to use", 1, True, @@ -472,7 +476,7 @@ envDict = dict(BUILD_ROOT=buildDir, ARCHIVE_ADDITIONS=[], PYTHON=utils.find_python(), SERVER_ARCHIVE='${SERVER_DIST_BASENAME}${DIST_ARCHIVE_SUFFIX}', - tools=["default", "gch", "jsheader", "mergelib", "mongo_unittest"], + tools=["default", "gch", "jsheader", "mergelib", "mongo_unittest", "textfile"], UNITTEST_ALIAS='unittests', # TODO: Move unittests.txt to $BUILD_DIR, but that requires # changes to MCI. @@ -987,6 +991,7 @@ else: env.Append( MONGO_CRYPTO=["tom"] ) env['MONGO_REPL_IMPL'] = get_option('replication-implementation') +wiredtiger = (get_option('wiredtiger') == 'on') try: umask = os.umask(022) @@ -1014,6 +1019,7 @@ env['MONGO_MODULES'] = [m.name for m in mongo_modules] # --- check system --- def doConfigure(myenv): + global wiredtiger # Check that the compilers work. # @@ -1154,7 +1160,25 @@ def doConfigure(myenv): env.Append( CPPDEFINES=[("_WIN32_WINNT", "0x" + win_version_min[0])] ) env.Append( CPPDEFINES=[("NTDDI_VERSION", "0x" + win_version_min[0] + win_version_min[1])] ) - if using_gcc() or using_clang(): + def CheckForx86(context): + # See http://nadeausoftware.com/articles/2012/02/c_c_tip_how_detect_processor_type_using_compiler_predefined_macros + test_body = """ + #if defined(__i386) || defined(_M_IX86) + /* x86 32-bit */ + #else + #error not 32-bit x86 + #endif + """ + context.Message('Checking if target architecture is 32-bit x86...') + ret = context.TryCompile(textwrap.dedent(test_body), ".c") + context.Result(ret) + return ret + + conf = Configure(myenv, help=False, custom_tests = { + 'CheckForx86' : CheckForx86, + }) + + if conf.CheckForx86(): # If we are using GCC or clang to target 32 or x86, set the ISA minimum to 'nocona', # and the tuning to 'generic'. The choice of 'nocona' is selected because it @@ -1167,27 +1191,15 @@ def doConfigure(myenv): # contemporaries, the generic scheduling should be appropriate for a wide range of # deployed hardware. - def CheckForx86(context): - # See http://nadeausoftware.com/articles/2012/02/c_c_tip_how_detect_processor_type_using_compiler_predefined_macros - test_body = """ - #if defined(__i386) || defined(_M_IX86) - /* x86 32-bit */ - #else - #error not 32-bit x86 - #endif - """ - context.Message('Checking if target architecture is 32-bit x86...') - ret = context.TryCompile(textwrap.dedent(test_body), ".c") - context.Result(ret) - return ret - - conf = Configure(myenv, help=False, custom_tests = { - 'CheckForx86' : CheckForx86, - }) + if using_gcc() or using_clang(): + myenv.Append( CCFLAGS=['-march=nocona', '-mtune=generic'] ) - if conf.CheckForx86(): - myenv.Append( CCFLAGS=['-march=nocona', '-mtune=generic'] ) - conf.Finish() + # Wiredtiger only supports 64-bit architecture, and will fail to compile on 32-bit + # so disable WiredTiger automatically on 32-bit since wiredtiger is on by default + if wiredtiger == True: + print "WARNING: WiredTiger is not supported on 32-bit platforms, disabling support" + wiredtiger = False + conf.Finish() # Enable PCH if we are on using gcc or clang and the 'Gch' tool is enabled. Otherwise, # remove any pre-compiled header since the compiler may try to use it if it exists. @@ -1887,6 +1899,12 @@ def doConfigure(myenv): if use_system_version_of_library("yaml"): conf.FindSysLibDep("yaml", ["yaml-cpp"]) + if wiredtiger and use_system_version_of_library("wiredtiger"): + if not conf.CheckCXXHeader( "wiredtiger.h" ): + print( "Cannot find wiredtiger headers" ) + Exit(1) + conf.FindSysLibDep("wiredtiger", ["wiredtiger"]) + if use_system_version_of_library("boost"): if not conf.CheckCXXHeader( "boost/filesystem/operations.hpp" ): print( "can't find boost headers" ) @@ -2192,6 +2210,7 @@ Export("debugBuild optBuild") Export("enforce_glibc") Export("s3push") Export("use_clang") +Export("wiredtiger") def injectMongoIncludePaths(thisEnv): thisEnv.AppendUnique(CPPPATH=['$BUILD_DIR']) diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 7ee4c58ca91..021cf28b652 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -13,6 +13,7 @@ Import("usev8") Import("v8suffix") Import("enforce_glibc") Import("darwin windows solaris linux nix") +Import("wiredtiger") # Boost we need everywhere. 's2' is spammed in all over the place by # db/geo unfortunately. pcre is also used many places. @@ -40,6 +41,7 @@ env.SConscript(['base/SConscript', 'db/storage/kv/SConscript', 'db/storage/mmap_v1/SConscript', 'db/storage/rocks/SConscript', + 'db/storage/wiredtiger/SConscript', 'db/SConscript', 'installer/msi/SConscript', 'logger/SConscript', @@ -955,6 +957,10 @@ serveronlyLibdeps = ["coreshard", if has_option("rocksdb" ): serveronlyLibdeps.append( 'db/storage/rocks/storage_rocks' ) +if wiredtiger: + serveronlyLibdeps.append( 'db/storage/wiredtiger/storage_wiredtiger' ) + serveronlyLibdeps.append( '$BUILD_DIR/third_party/shim_wiredtiger') + serveronlyEnv.Library("serveronly", serverOnlyFiles, LIBDEPS=serveronlyLibdeps ) diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript new file mode 100644 index 00000000000..7c6116eae4c --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -0,0 +1,76 @@ +Import("env") +Import("wiredtiger") + +if wiredtiger: + wtEnv = env.Clone() + wtEnv.InjectThirdPartyIncludePaths(libraries=['wiredtiger']) + + # This is the smallest possible set of files that wraps WT + wtEnv.Library( + target= 'storage_wiredtiger_core', + source= [ + 'wiredtiger_index.cpp', + 'wiredtiger_kv_engine.cpp', + 'wiredtiger_record_store.cpp', + 'wiredtiger_recovery_unit.cpp', + 'wiredtiger_session_cache.cpp', + 'wiredtiger_size_storer.cpp', + 'wiredtiger_util.cpp', + ], + LIBDEPS= [ + '$BUILD_DIR/mongo/bson', + '$BUILD_DIR/mongo/db/catalog/collection_options', + '$BUILD_DIR/mongo/db/index/index_descriptor', + '$BUILD_DIR/mongo/db/storage/index_entry_comparison', + '$BUILD_DIR/mongo/db/storage/oplog_hack', + '$BUILD_DIR/mongo/elapsed_tracker', + '$BUILD_DIR/mongo/foundation', + '$BUILD_DIR/mongo/processinfo', + '$BUILD_DIR/third_party/shim_wiredtiger', + '$BUILD_DIR/third_party/shim_snappy', + ], + ) + + wtEnv.Library( + target='storage_wiredtiger', + source=[ + 'wiredtiger_global_options.cpp', + 'wiredtiger_init.cpp', + 'wiredtiger_options_init.cpp', + 'wiredtiger_server_status.cpp', + ], + LIBDEPS=['storage_wiredtiger_core', + '$BUILD_DIR/mongo/db/storage/kv/kv_engine', + ] + ) + + wtEnv.CppUnitTest( + target='storage_wiredtiger_record_store_test', + source=['wiredtiger_record_store_test.cpp', + ], + LIBDEPS=[ + 'storage_wiredtiger_core', + '$BUILD_DIR/mongo/db/storage/record_store_test_harness', + ], + ) + + wtEnv.CppUnitTest( + target='storage_wiredtiger_index_test', + source=['wiredtiger_index_test.cpp', + ], + LIBDEPS=[ + 'storage_wiredtiger_core', + '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness', + ], + ) + + wtEnv.CppUnitTest( + target='storage_wiredtiger_kv_engine_test', + source=['wiredtiger_kv_engine_test.cpp', + ], + LIBDEPS=[ + 'storage_wiredtiger_core', + '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness', + ], + ) + diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp new file mode 100644 index 00000000000..345bc8cbd10 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp @@ -0,0 +1,87 @@ +// wiredtiger_global_options.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/base/status.h" +#include "mongo/util/log.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" + +namespace mongo { + + Status WiredTigerGlobalOptions::add(moe::OptionSection* options) { + moe::OptionSection wiredTigerOptions("WiredTiger options"); + + // Add WiredTiger storage engine specific options. + wiredTigerOptions.addOptionChaining("storage.wiredtiger.databaseConfig", + "wiredTigerDatabaseConfig", moe::String, "WiredTiger database configuration settings"); + wiredTigerOptions.addOptionChaining("storage.wiredtiger.collectionConfig", + "wiredTigerCollectionConfig", moe::String, "WiredTiger collection configuration settings"); + wiredTigerOptions.addOptionChaining("storage.wiredtiger.indexConfig", + "wiredTigerIndexConfig", moe::String, "WiredTiger index configuration settings"); + + return options->addSection(wiredTigerOptions); + } + + void WiredTigerGlobalOptions::printHelp(std::ostream* out) { + *out << "storage.wiredtiger.databaseConfig : " << databaseConfig << std::endl; + *out << "storage.wiredtiger.collectionConfig : " << collectionConfig << std::endl; + *out << "storage.wiredtiger.indexConfig : " << indexConfig << std::endl; + *out << std::flush; + } + + bool WiredTigerGlobalOptions::handlePreValidation(const moe::Environment& params) { + if (params.count("help")) { + printHelp(&std::cout); + return true; + } + return true; + } + + Status WiredTigerGlobalOptions::store(const moe::Environment& params, + const std::vector<std::string>& args) { + if (params.count("storage.wiredtiger.databaseConfig")) { + wiredTigerGlobalOptions.databaseConfig = + params["storage.wiredtiger.databaseConfig"].as<string>(); + std::cerr << "DB option: " << wiredTigerGlobalOptions.databaseConfig << std::endl; + } + if (params.count("storage.wiredtiger.collectionConfig")) { + wiredTigerGlobalOptions.collectionConfig = + params["storage.wiredtiger.collectionConfig"].as<string>(); + std::cerr << "Collection option: " << wiredTigerGlobalOptions.collectionConfig << std::endl; + } + if (params.count("storage.wiredtiger.indexConfig")) { + wiredTigerGlobalOptions.indexConfig = + params["storage.wiredtiger.indexConfig"].as<string>(); + std::cerr << "Index option: " << wiredTigerGlobalOptions.indexConfig << std::endl; + } + return Status::OK(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h new file mode 100644 index 00000000000..7709e62d6c7 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h @@ -0,0 +1,59 @@ +// wiredtiger_global_options.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/util/options_parser/startup_option_init.h" +#include "mongo/util/options_parser/startup_options.h" + +namespace mongo { + + namespace moe = mongo::optionenvironment; + + class WiredTigerGlobalOptions { + public: + WiredTigerGlobalOptions() {}; + + Status add(moe::OptionSection* options); + bool handlePreValidation(const moe::Environment& params); + Status store(const moe::Environment& params, const std::vector<std::string>& args); + private: + void printHelp(std::ostream* out); + + public: + std::string databaseConfig; + std::string collectionConfig; + std::string indexConfig; + }; + + extern WiredTigerGlobalOptions wiredTigerGlobalOptions; + +} + diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp new file mode 100644 index 00000000000..1422c8c2d5b --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -0,0 +1,769 @@ +// wiredtiger_index.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_index.h" + +#include <set> + +#include "mongo/db/json.h" +#include "mongo/db/catalog/index_catalog_entry.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/db/storage_options.h" +#include "mongo/util/log.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + static const int TempKeyMaxSize = 1024; // this goes away with SERVER-3372 + + static const WiredTigerItem emptyItem(NULL, 0); + + bool hasFieldNames(const BSONObj& obj) { + BSONForEach(e, obj) { + if (e.fieldName()[0]) + return true; + } + return false; + } + + BSONObj stripFieldNames(const BSONObj& query) { + if (!hasFieldNames(query)) + return query; + + BSONObjBuilder bb; + BSONForEach(e, query) { + bb.appendAs(e, StringData()); + } + return bb.obj(); + } + + /** + * Constructs an IndexKeyEntry from a slice containing the bytes of a BSONObject followed + * by the bytes of a DiskLoc + */ + static IndexKeyEntry makeIndexKeyEntry(const WT_ITEM *keyCols) { + const char* data = reinterpret_cast<const char*>( keyCols->data ); + BSONObj key( data ); + if ( keyCols->size == static_cast<size_t>( key.objsize() ) ) { + // in unique mode + return IndexKeyEntry( key, DiskLoc() ); + } + invariant( keyCols->size == key.objsize() + sizeof(DiskLoc) ); + DiskLoc loc = reinterpret_cast<const DiskLoc*>( data + key.objsize() )[0]; + return IndexKeyEntry( key, loc ); + } + + WiredTigerItem _toItem( const BSONObj& key, const DiskLoc& loc, + boost::scoped_array<char>*out ) { + size_t keyLen = key.objsize() + sizeof(DiskLoc); + out->reset( new char[keyLen] ); + memcpy( out->get(), key.objdata(), key.objsize() ); + memcpy( out->get() + key.objsize(), reinterpret_cast<const char*>(&loc), sizeof(DiskLoc) ); + + return WiredTigerItem( out->get(), keyLen ); + } + + DiskLoc _toDiskLoc( const WT_ITEM& item ) { + DiskLoc l; + memcpy( &l, item.data, sizeof(DiskLoc) ); + return l; + } + + /** + * Custom comparator used to compare Index Entries by BSONObj and DiskLoc + */ + struct WiredTigerIndexCollator : public WT_COLLATOR { + public: + WiredTigerIndexCollator(const Ordering& order) + :WT_COLLATOR(), _indexComparator(order) { + compare = _compare; + terminate = _terminate; + } + + int Compare(WT_SESSION *s, const WT_ITEM *a, const WT_ITEM *b) const { + const IndexKeyEntry lhs = makeIndexKeyEntry(a); + const IndexKeyEntry rhs = makeIndexKeyEntry(b); + int cmp = _indexComparator.compare( lhs, rhs ); + if (cmp < 0) + cmp = -1; + else if (cmp > 0) + cmp = 1; + return cmp; + } + + static int _compare(WT_COLLATOR *coll, WT_SESSION *s, const WT_ITEM *a, const WT_ITEM *b, int *cmp) { + WiredTigerIndexCollator *c = static_cast<WiredTigerIndexCollator *>(coll); + *cmp = c->Compare(s, a, b); + return 0; + } + + static int _terminate(WT_COLLATOR *coll, WT_SESSION *s) { + WiredTigerIndexCollator *c = static_cast<WiredTigerIndexCollator *>(coll); + delete c; + return 0; + } + + private: + const IndexEntryComparison _indexComparator; + }; + + extern "C" int index_collator_customize(WT_COLLATOR *coll, WT_SESSION *s, const char *uri, WT_CONFIG_ITEM *metadata, WT_COLLATOR **collp) { + IndexDescriptor desc(0, "unknown", fromjson(std::string(metadata->str, metadata->len))); + *collp = new WiredTigerIndexCollator(Ordering::make(desc.keyPattern())); + return 0; + } + + extern "C" MONGO_COMPILER_API_EXPORT int index_collator_extension(WT_CONNECTION *conn, WT_CONFIG_ARG *cfg) { + static WT_COLLATOR idx_static; + + idx_static.customize = index_collator_customize; + return conn->add_collator(conn, "mongo_index", &idx_static, NULL); + } + + // taken from btree_logic.cpp + Status dupKeyError(const BSONObj& key) { + StringBuilder sb; + sb << "E11000 duplicate key error "; + sb << "dup key: " << key; + return Status(ErrorCodes::DuplicateKey, sb.str()); + } +} // namespace + + int WiredTigerIndex::Create(OperationContext* txn, + const std::string& uri, + const std::string& extraConfig, + const IndexDescriptor* desc ) { + WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession()->getSession(); + + // Separate out a prefix and suffix in the default string. User configuration will + // override values in the prefix, but not values in the suffix. + string default_config_pfx = "type=file,leaf_page_max=16k,"; + // Indexes need to store the metadata for collation to work as expected. + string default_config_sfx = ",key_format=u,value_format=u,collator=mongo_index,app_metadata="; + + std::string config = default_config_pfx + extraConfig + default_config_sfx + desc->infoObj().jsonString(); + LOG(1) << "create uri: " << uri << " config: " << config; + return s->create(s, uri.c_str(), config.c_str()); + } + + WiredTigerIndex::WiredTigerIndex(const std::string &uri ) + : _uri( uri ), + _instanceId( WiredTigerSession::genCursorId() ) { + } + + Status WiredTigerIndex::insert(OperationContext* txn, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed) { + invariant(!loc.isNull()); + invariant(loc.isValid()); + invariant(!hasFieldNames(key)); + + if ( key.objsize() >= TempKeyMaxSize ) { + string msg = mongoutils::str::stream() + << "WiredTigerIndex::insert: key too large to index, failing " + << ' ' << key.objsize() << ' ' << key; + return Status(ErrorCodes::KeyTooLong, msg); + } + + WiredTigerCursor curwrap(_uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + + return _insert( c, key, loc, dupsAllowed ); + } + + void WiredTigerIndex::unindex(OperationContext* txn, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) { + invariant(!loc.isNull()); + invariant(loc.isValid()); + invariant(!hasFieldNames(key)); + + WiredTigerCursor curwrap(_uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + invariant( c ); + + _unindex( c, key, loc, dupsAllowed ); + } + + void WiredTigerIndex::fullValidate(OperationContext* txn, bool full, long long *numKeysOut, + BSONObjBuilder* output) const { + IndexCursor cursor(*this, txn, true ); + cursor.locate( minKey, minDiskLoc ); + long long count = 0; + while ( !cursor.isEOF() ) { + cursor.advance(); + count++; + } + if ( numKeysOut ) { + *numKeysOut = count; + } + + // Nothing further to do if 'full' validation is not requested. + if (!full) { + return; + } + + invariant(output); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WT_SESSION* s = session->getSession(); + Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + uri(), + "statistics=(fast)", output); + if (!status.isOK()) { + output->append("error", "unable to retrieve statistics"); + output->append("code", static_cast<int>(status.code())); + output->append("reason", status.reason()); + } + } + + Status WiredTigerIndex::dupKeyCheck( OperationContext* txn, + const BSONObj& key, + const DiskLoc& loc) { + invariant(!hasFieldNames(key)); + invariant(unique()); + + WiredTigerCursor curwrap(_uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + + if ( isDup(c, key, loc) ) + return dupKeyError(key); + return Status::OK(); + } + + bool WiredTigerIndex::isEmpty(OperationContext* txn) { + WiredTigerCursor curwrap(_uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + if (!c) + return true; + int ret = c->next(c); + if (ret == WT_NOTFOUND) + return true; + invariantWTOK(ret); + return false; + } + + Status WiredTigerIndex::touch(OperationContext* txn) const { + // already in memory... + return Status::OK(); + } + + long long WiredTigerIndex::getSpaceUsedBytes( OperationContext* txn ) const { + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + return static_cast<long long>( WiredTigerUtil::getIdentSize( session->getSession(), + _uri ) ); + } + + bool WiredTigerIndex::isDup(WT_CURSOR *c, const BSONObj& key, const DiskLoc& loc ) { + invariant( unique() ); + // First check whether the key exists. + WiredTigerItem item( key.objdata(), key.objsize() ); + c->set_key( c, item.Get() ); + int ret = c->search(c); + if ( ret == WT_NOTFOUND ) + return false; + invariantWTOK( ret ); + + WT_ITEM value; + invariantWTOK( c->get_value(c,&value) ); + DiskLoc found = _toDiskLoc( value ); + return found != loc; + } + + SortedDataInterface::Cursor* WiredTigerIndex::newCursor(OperationContext* txn, + int direction) const { + invariant((direction == 1) || (direction == -1)); + return new IndexCursor(*this, txn, direction == 1); + } + + Status WiredTigerIndex::initAsEmpty(OperationContext* txn) { + // No-op + return Status::OK(); + } + + class WiredTigerBuilderImpl : public SortedDataBuilderInterface { + public: + WiredTigerBuilderImpl(WiredTigerIndex* idx, + OperationContext *txn, + bool dupsAllowed) + : _idx(idx), _txn(txn), _dupsAllowed(dupsAllowed), _count(0) { + } + + ~WiredTigerBuilderImpl() { + } + + Status addKey(const BSONObj& key, const DiskLoc& loc) { + Status s = _idx->insert(_txn, key, loc, _dupsAllowed); + if (s.isOK()) + _count++; + return s; + } + + void commit(bool mayInterrupt) { + // this is bizarre, but required as part of the contract + WriteUnitOfWork uow( _txn ); + uow.commit(); + } + + private: + WiredTigerIndex* _idx; + OperationContext* _txn; + bool _dupsAllowed; + unsigned long long _count; + }; + + SortedDataBuilderInterface* WiredTigerIndex::getBulkBuilder( OperationContext* txn, + bool dupsAllowed ) { + if ( !dupsAllowed ) { + // if we don't allow dups, we better be unique + invariant( unique() ); + } + return new WiredTigerBuilderImpl(this, txn, dupsAllowed); + } + + + + // ---------------------- + + WiredTigerIndex::IndexCursor::IndexCursor(const WiredTigerIndex &idx, + OperationContext *txn, + bool forward) + : _txn(txn), + _cursor(idx.uri(), idx.instanceId(), txn ), + _idx(idx), + _forward(forward), + _eof(true), + _uniqueLen( -1 ) { + } + + bool WiredTigerIndex::IndexCursor::pointsToSamePlaceAs( const SortedDataInterface::Cursor &genother) const { + const WiredTigerIndex::IndexCursor &other = + dynamic_cast<const WiredTigerIndex::IndexCursor &>(genother); + + if ( _eof && other._eof ) + return true; + else if ( _eof || other._eof ) + return false; + + if ( getDiskLoc() != other.getDiskLoc() ) + return false; + + return getKey() == other.getKey(); + } + + void WiredTigerIndex::IndexCursor::aboutToDeleteBucket(const DiskLoc& bucket) { + invariant(!"aboutToDeleteBucket should not be called"); + } + + bool WiredTigerIndex::IndexCursor::_locate(const BSONObj &key, const DiskLoc& loc) { + _uniqueLen = -1; + WT_CURSOR *c = _cursor.get(); + + DiskLoc searchLoc = loc; + // Null cursors should start at the zero key to maintain search ordering in the + // collator. + // Reverse cursors should start on the last matching key. + if (loc.isNull()) + searchLoc = _forward ? DiskLoc(0, 0) : DiskLoc(INT_MAX, INT_MAX); + + boost::scoped_array<char> data; + WiredTigerItem myKey = _toItem( key, searchLoc, &data ); + + int cmp = -1; + c->set_key(c, myKey.Get() ); + + + int ret = c->search_near(c, &cmp); + if ( ret == WT_NOTFOUND ) { + _eof = true; + return false; + } + invariantWTOK( ret ); + // Make sure we land on a matching key + if ( _forward ? cmp < 0 : cmp > 0 ) + ret = _forward ? c->next(c) : c->prev(c); + + _eof = ret != 0; + + if ( _eof ) { + return false; + } + + if ( key != getKey() ) { + return false; + } + + if ( !_idx.unique() ) { + return true; + } + + // now we need to check if we have an array situation + + if ( loc.isNull() ) { + // no loc specified means start and beginning or end of array as needed + // so nothing to do + return true; + } + + // we're looking for a specific DiskLoc, lets see if we can find + + WT_ITEM item; + invariantWTOK( c->get_value(c, &item ) ); + _uniqueLen = item.size / sizeof(DiskLoc); + invariant( _uniqueLen > 0 ); + + if ( _forward ) { + _uniquePos = 0; + for ( ; _uniquePos < _uniqueLen; _uniquePos++ ) { + DiskLoc temp; + memcpy( &temp, + reinterpret_cast<const char*>(item.data) + ( _uniquePos * sizeof(DiskLoc) ), + sizeof(DiskLoc) ); + if ( temp == loc ) + break; + + if ( loc < temp ) + break; + } + } + else { + _uniquePos = _uniqueLen-1; + for ( ; _uniquePos >= 0; _uniquePos-- ) { + DiskLoc temp; + memcpy( &temp, + reinterpret_cast<const char*>(item.data) + ( _uniquePos * sizeof(DiskLoc) ), + sizeof(DiskLoc) ); + if ( temp == loc ) + break; + + if ( temp < loc ) + break; + } + _uniquePos = _uniqueLen - 1 - _uniquePos; + } + + if ( _uniquePos == _uniqueLen ) { + // we need to move to next slot + advance(); + } + + return true; + } + + bool WiredTigerIndex::IndexCursor::locate(const BSONObj &key, const DiskLoc& loc) { + const BSONObj finalKey = stripFieldNames(key); + bool result = _locate(finalKey, loc); + + // An explicit search at the start of the range should always return false + if (loc == minDiskLoc || loc == maxDiskLoc ) + return false; + return result; + } + + void WiredTigerIndex::IndexCursor::advanceTo(const BSONObj &keyBegin, + int keyBeginLen, + bool afterKey, + const vector<const BSONElement*>& keyEnd, + const vector<bool>& keyEndInclusive) { + + BSONObj key = IndexEntryComparison::makeQueryObject( + keyBegin, keyBeginLen, + afterKey, keyEnd, keyEndInclusive, getDirection() ); + + _locate(key, DiskLoc()); + } + + void WiredTigerIndex::IndexCursor::customLocate(const BSONObj& keyBegin, + int keyBeginLen, + bool afterKey, + const vector<const BSONElement*>& keyEnd, + const vector<bool>& keyEndInclusive) { + advanceTo(keyBegin, keyBeginLen, afterKey, keyEnd, keyEndInclusive); + } + + BSONObj WiredTigerIndex::IndexCursor::getKey() const { + WT_CURSOR *c = _cursor.get(); + WT_ITEM keyItem; + int ret = c->get_key(c, &keyItem); + invariantWTOK(ret); + return makeIndexKeyEntry(&keyItem).key; + } + + DiskLoc WiredTigerIndex::IndexCursor::getDiskLoc() const { + if ( _eof ) + return DiskLoc(); + + WT_CURSOR *c = _cursor.get(); + WT_ITEM item; + if ( _idx.unique() ) { + invariantWTOK( c->get_value(c, &item ) ); + if ( _uniqueLen == -1 ) { + // first time at this spot + _uniqueLen = item.size / sizeof(DiskLoc); + invariant( _uniqueLen > 0 ); + _uniquePos = 0; + } + + DiskLoc loc; + int posToUse = _uniquePos; + if ( !_forward ) + posToUse = _uniqueLen - 1 - _uniquePos; + + + + memcpy( &loc, + reinterpret_cast<const char*>(item.data) + ( posToUse * sizeof(DiskLoc) ), + sizeof(DiskLoc) ); + + invariant( posToUse >= 0 && posToUse < _uniqueLen ); + + return loc; + } + invariantWTOK( c->get_key(c, &item) ); + return makeIndexKeyEntry( &item ).loc; + } + + void WiredTigerIndex::IndexCursor::advance() { + // Advance on a cursor at the end is a no-op + if ( _eof ) + return; + + if ( _idx.unique() ) { + if ( _uniqueLen == -1 ) { + // we need to investigate + getDiskLoc(); + } + + _uniquePos++; // advance + + if ( _uniquePos < _uniqueLen ) { + return; + } + + } + + _uniqueLen = -1; + + + WT_CURSOR *c = _cursor.get(); + int ret = _forward ? c->next(c) : c->prev(c); + if ( ret == WT_NOTFOUND ) { + _eof = true; + return; + } + invariantWTOK(ret); + _eof = false; + } + + void WiredTigerIndex::IndexCursor::savePosition() { + _savedForCheck = _txn->recoveryUnit(); + + if ( !wt_keeptxnopen() && !_eof ) { + _savedKey = getKey().getOwned(); + _savedLoc = getDiskLoc(); + _cursor.reset(); + } + + _txn = NULL; + } + + void WiredTigerIndex::IndexCursor::restorePosition( OperationContext *txn ) { + // Update the session handle with our new operation context. + _txn = txn; + invariant( _savedForCheck == txn->recoveryUnit() ); + + if ( !wt_keeptxnopen() && !_eof ) { + _locate(_savedKey, _savedLoc); + } + } + + // ------------------------------ + + WiredTigerIndexUnique::WiredTigerIndexUnique( const std::string& uri ) + : WiredTigerIndex( uri ) { + } + + Status WiredTigerIndexUnique::_insert( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) { + + WiredTigerItem keyItem( key.objdata(), key.objsize() ); + WiredTigerItem valueItem( &loc, sizeof(loc) ); + c->set_key( c, keyItem.Get() ); + c->set_value( c, valueItem.Get() ); + int ret = c->insert( c ); + + if ( ret != WT_DUPLICATE_KEY ) + return wtRCToStatus( ret ); + + // we're in weird mode where there might be multiple values + // we put them all in the "list" + ret = c->search(c); + invariantWTOK( ret ); + + WT_ITEM old; + invariantWTOK( c->get_value(c, &old ) ); + + std::set<DiskLoc> all; + + // see if its already in the array + for ( size_t i = 0; i < (old.size/sizeof(DiskLoc)); i++ ) { + DiskLoc temp; + memcpy( &temp, + reinterpret_cast<const char*>( old.data ) + ( i * sizeof(DiskLoc) ), + sizeof(DiskLoc) ); + if ( loc == temp ) + return Status::OK(); + all.insert( temp ); + } + + if ( !dupsAllowed ) { + return dupKeyError(key); + } + + all.insert( loc ); + + // not in the array, add it to the back + size_t newSize = all.size() * sizeof(DiskLoc); + boost::scoped_array<char> bigger( new char[newSize] ); + + size_t offset = 0; + for ( std::set<DiskLoc>::const_iterator it = all.begin(); it != all.end(); ++it ) { + DiskLoc dl = *it; + memcpy( bigger.get() + offset, &dl, sizeof(DiskLoc) ); + offset += sizeof(DiskLoc); + } + + valueItem = WiredTigerItem( bigger.get(), newSize ); + c->set_value( c, valueItem.Get() ); + return wtRCToStatus( c->update( c ) ); + } + + void WiredTigerIndexUnique::_unindex( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) { + WiredTigerItem keyItem( key.objdata(), key.objsize() ); + c->set_key( c, keyItem.Get() ); + + if ( !dupsAllowed ) { + // nice and clear + int ret = c->remove(c); + if (ret == WT_NOTFOUND) { + return; + } + invariantWTOK(ret); + return; + } + + // ups are allowed, so we have to deal with a vector of DiskLoc + + int ret = c->search(c); + if ( ret == WT_NOTFOUND ) + return; + invariantWTOK( ret ); + + WT_ITEM old; + invariantWTOK( c->get_value(c, &old ) ); + + // see if its in the array + size_t num = old.size / sizeof(DiskLoc); + for ( size_t i = 0; i < num; i++ ) { + DiskLoc temp; + memcpy( &temp, + reinterpret_cast<const char*>( old.data ) + ( i * sizeof(DiskLoc) ), + sizeof(DiskLoc) ); + if ( loc != temp ) + continue; + + // we found it, now lets re-save array without it + size_t newSize = old.size - sizeof(DiskLoc); + + if ( newSize == 0 ) { + // nothing left, just delete entry + invariantWTOK( c->remove(c) ); + return; + } + + boost::scoped_array<char> smaller( new char[newSize] ); + size_t offset = i * sizeof(DiskLoc); + memcpy( smaller.get(), old.data, offset ); + memcpy( smaller.get() + offset, + reinterpret_cast<const char*>( old.data ) + offset + sizeof(DiskLoc), + old.size - sizeof(DiskLoc) - offset ); + WiredTigerItem valueItem = WiredTigerItem( smaller.get(), newSize ); + c->set_value( c, valueItem.Get() ); + invariantWTOK( c->update( c ) ); + } + } + + // ------------------------------ + + WiredTigerIndexStandard::WiredTigerIndexStandard( const std::string& uri ) + : WiredTigerIndex( uri ) { + } + + Status WiredTigerIndexStandard::_insert( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) { + invariant( dupsAllowed ); + + boost::scoped_array<char> data; + WiredTigerItem item = _toItem( key, loc, &data ); + c->set_key(c, item.Get() ); + c->set_value(c, &emptyItem); + return wtRCToStatus( c->insert(c) ); + } + + void WiredTigerIndexStandard::_unindex( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) { + invariant( dupsAllowed ); + boost::scoped_array<char> data; + WiredTigerItem item = _toItem( key, loc, &data); + c->set_key(c, item.Get() ); + int ret = c->remove(c); + if (ret != WT_NOTFOUND) { + invariantWTOK(ret); + } + } + + +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.h b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h new file mode 100644 index 00000000000..c390bb121cf --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.h @@ -0,0 +1,204 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/shared_ptr.hpp> +#include <wiredtiger.h> + +#include "mongo/db/storage/index_entry_comparison.h" +#include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" + +namespace mongo { + + class IndexCatalogEntry; + class IndexDescriptor; + struct WiredTigerItem; + + class WiredTigerIndex : public SortedDataInterface { + public: + + static int Create(OperationContext* txn, + const std::string& uri, + const std::string& extraConfig, + const IndexDescriptor* desc); + + /** + * @param unique - If this is a unique index. + * Note: even if unique, it may be allowed ot be non-unique at times. + */ + WiredTigerIndex(const std::string &uri ); + + virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed); + + virtual Status insert(OperationContext* txn, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed); + + virtual void unindex(OperationContext* txn, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed); + + virtual void fullValidate(OperationContext* txn, bool full, long long *numKeysOut, + BSONObjBuilder* output) const; + + virtual Status dupKeyCheck(OperationContext* txn, const BSONObj& key, const DiskLoc& loc); + + virtual bool isEmpty(OperationContext* txn); + + virtual Status touch(OperationContext* txn) const; + + virtual long long getSpaceUsedBytes( OperationContext* txn ) const; + + bool isDup(WT_CURSOR *c, const BSONObj& key, const DiskLoc& loc ); + + virtual SortedDataInterface::Cursor* newCursor( + OperationContext* txn, int direction) const; + + virtual Status initAsEmpty(OperationContext* txn); + + const std::string& uri() const { return _uri; } + + uint64_t instanceId() const { return _instanceId; } + + virtual bool unique() const = 0; + + protected: + + virtual Status _insert( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) = 0; + + virtual void _unindex( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ) = 0; + + class IndexCursor : public SortedDataInterface::Cursor { + public: + IndexCursor(const WiredTigerIndex& idx, + OperationContext *txn, + bool forward); + + virtual ~IndexCursor() { } + + virtual int getDirection() const { return _forward ? 1 : -1; } + + virtual bool isEOF() const { return _eof; } + + virtual bool pointsToSamePlaceAs(const SortedDataInterface::Cursor &genother) const; + + virtual void aboutToDeleteBucket(const DiskLoc& bucket); + + virtual bool locate(const BSONObj &key, const DiskLoc& loc); + + virtual void customLocate(const BSONObj& keyBegin, + int keyBeginLen, + bool afterKey, + const vector<const BSONElement*>& keyEnd, + const vector<bool>& keyEndInclusive); + + void advanceTo(const BSONObj &keyBegin, + int keyBeginLen, + bool afterKey, + const vector<const BSONElement*>& keyEnd, + const vector<bool>& keyEndInclusive); + + virtual BSONObj getKey() const; + + virtual DiskLoc getDiskLoc() const; + + virtual void advance(); + + virtual void savePosition(); + + virtual void restorePosition( OperationContext *txn ); + + private: + bool _locate(const BSONObj &key, const DiskLoc& loc); + + OperationContext *_txn; + WiredTigerCursor _cursor; + const WiredTigerIndex& _idx; // not owned + bool _forward; + bool _eof; + + mutable int _uniquePos; + mutable int _uniqueLen; + + // For save/restorePosition check + RecoveryUnit* _savedForCheck; + BSONObj _savedKey; + DiskLoc _savedLoc; + }; + + std::string _uri; + uint64_t _instanceId; + }; + + + class WiredTigerIndexUnique : public WiredTigerIndex { + public: + WiredTigerIndexUnique( const std::string& uri ); + + virtual bool unique() const { return true; } + + virtual Status _insert( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ); + + virtual void _unindex( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ); + }; + + class WiredTigerIndexStandard : public WiredTigerIndex { + public: + WiredTigerIndexStandard( const std::string& uri ); + + virtual bool unique() const { return false; } + + virtual Status _insert( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ); + + virtual void _unindex( WT_CURSOR* c, + const BSONObj& key, + const DiskLoc& loc, + bool dupsAllowed ); + + }; + +} // namespace diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp new file mode 100644 index 00000000000..b9455ab88be --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index_test.cpp @@ -0,0 +1,92 @@ +// wiredtiger_index_test.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/catalog/index_catalog_entry.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/storage/sorted_data_interface_test_harness.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_index.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + + class MyHarnessHelper : public HarnessHelper { + public: + MyHarnessHelper() : _dbpath( "wt_test" ), _conn( NULL ) { + + const char* config = "create,cache_size=1G,extensions=[local=(entry=index_collator_extension)],"; + int ret = wiredtiger_open( _dbpath.path().c_str(), NULL, config, &_conn); + invariantWTOK( ret ); + + _sessionCache = new WiredTigerSessionCache( _conn ); + } + + ~MyHarnessHelper() { + delete _sessionCache; + _conn->close(_conn, NULL); + } + + virtual SortedDataInterface* newSortedDataInterface( bool unique ) { + std::string ns = "test.wt"; + OperationContextNoop txn( newRecoveryUnit() ); + + BSONObj spec = BSON( "key" << BSON( "a" << 1 ) << + "name" << "testIndex" << + "ns" << ns ); + + IndexDescriptor desc( NULL, "", spec ); + + string uri = "table:" + ns; + invariantWTOK( WiredTigerIndex::Create( &txn, uri, "", &desc ) ); + + if ( unique ) + return new WiredTigerIndexUnique( uri ); + return new WiredTigerIndexStandard( uri ); + } + + virtual RecoveryUnit* newRecoveryUnit() { + return new WiredTigerRecoveryUnit( _sessionCache ); + } + + private: + unittest::TempDir _dbpath; + WT_CONNECTION* _conn; + WiredTigerSessionCache* _sessionCache; + }; + + HarnessHelper* newHarnessHelper() { + return new MyHarnessHelper(); + } + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp new file mode 100644 index 00000000000..b4681a456a4 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_init.cpp @@ -0,0 +1,68 @@ +// wiredtiger_init.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/base/init.h" +#include "mongo/db/global_environment_d.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/storage/kv/kv_storage_engine.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_server_status.h" +#include "mongo/db/storage_options.h" + +namespace mongo { + + namespace { + class WiredTigerFactory : public StorageEngine::Factory { + public: + virtual ~WiredTigerFactory(){} + virtual StorageEngine* create( const StorageGlobalParams& params ) const { + WiredTigerKVEngine* kv = new WiredTigerKVEngine( params.dbpath, + wiredTigerGlobalOptions.databaseConfig, + params.dur ); + kv->setRecordStoreExtraOptions( wiredTigerGlobalOptions.collectionConfig ); + kv->setSortedDataInterfaceExtraOptions( wiredTigerGlobalOptions.indexConfig ); + // Intentionally leaked. + new WiredTigerServerStatusSection(kv); + return new KVStorageEngine( kv ); + } + }; + } // namespace + + MONGO_INITIALIZER_WITH_PREREQUISITES(WiredTigerEngineInit, + ("SetGlobalEnvironment")) + (InitializerContext* context ) { + getGlobalEnvironment()->registerStorageEngine("wiredtiger", new WiredTigerFactory() ); + return Status::OK(); + } + +} + diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp new file mode 100644 index 00000000000..cb4ae9925af --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -0,0 +1,372 @@ +// wiredtiger_kv_engine.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" + +#include <boost/filesystem.hpp> +#include <boost/filesystem/operations.hpp> + +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_index.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/log.h" +#include "mongo/util/processinfo.h" + +namespace mongo { + + namespace { + int mdb_handle_error(WT_EVENT_HANDLER *handler, WT_SESSION *session, + int errorCode, const char *message) { + error() << "WiredTiger (" << errorCode << ") " << message; + return 0; + } + + int mdb_handle_message( WT_EVENT_HANDLER *handler, WT_SESSION *session, + const char *message) { + log() << "WiredTiger " << message; + return 0; + } + + int mdb_handle_progress( WT_EVENT_HANDLER *handler, WT_SESSION *session, + const char *operation, uint64_t progress) { + log() << "WiredTiger progress " << operation << " " << progress; + return 0; + } + + int mdb_handle_close( WT_EVENT_HANDLER *handler, WT_SESSION *session, + WT_CURSOR *cursor) { + return 0; + } + + } + + WiredTigerKVEngine::WiredTigerKVEngine( const std::string& path, + const std::string& extraOpenOptions, + bool durable ) + : _durable( durable ), + _epoch( 0 ), + _sizeStorerSyncTracker( 100000, 60 * 1000 ) { + + _eventHandler.handle_error = mdb_handle_error; + _eventHandler.handle_message = mdb_handle_message; + _eventHandler.handle_progress = mdb_handle_progress; + _eventHandler.handle_close = mdb_handle_close; + + int cacheSizeGB = 1; + + { + ProcessInfo pi; + BSONObjBuilder b; + pi.appendSystemDetails( b ); + BSONObj obj = b.obj(); + BSONObj extra = obj["extra"].Obj(); + double pageSize = extra["pageSize"].number(); + double numPages = extra["numPages"].number(); + if ( pageSize > 0 && numPages > 0 ) { + double totalBytes = numPages * pageSize; + double cacheBytes = totalBytes / 10; + cacheSizeGB = static_cast<int>( cacheBytes / ( 1024 * 1024 * 1024 ) ); + if ( cacheSizeGB < 1 ) + cacheSizeGB = 1; + } + } + + if ( _durable ) { + boost::filesystem::path journalPath = path; + journalPath /= "journal"; + if ( !boost::filesystem::exists( journalPath ) ) { + try { + boost::filesystem::create_directory( journalPath ); + } + catch( std::exception& e) { + log() << "error creating journal dir " << journalPath.string() << ' ' << e.what(); + throw; + } + } + } + + std::stringstream ss; + ss << "create,"; + ss << "cache_size=" << cacheSizeGB << "G,"; + ss << "session_max=20000,"; + ss << "extensions=[local=(entry=index_collator_extension)],"; + ss << "statistics=(all),"; + if ( _durable ) { + ss << "log=(enabled=true,archive=true,path=journal),"; + } + ss << "checkpoint=(wait=60,log_size=2GB),"; + ss << extraOpenOptions; + string config = ss.str(); + log() << "wiredtiger_open config: " << config; + invariantWTOK(wiredtiger_open(path.c_str(), &_eventHandler, config.c_str(), &_conn)); + _sessionCache.reset( new WiredTigerSessionCache( this ) ); + + _sizeStorerUri = "table:sizeStorer"; + { + WiredTigerSession session( _conn, -1 ); + WiredTigerSizeStorer* ss = new WiredTigerSizeStorer(); + ss->loadFrom( &session, _sizeStorerUri ); + _sizeStorer.reset( ss ); + } + } + + + WiredTigerKVEngine::~WiredTigerKVEngine() { + log() << "WiredTigerKVEngine shutting down"; + syncSizeInfo(); + _sizeStorer.reset( NULL ); + + _sessionCache.reset( NULL ); + + if ( _conn ) { + invariantWTOK( _conn->close(_conn, NULL) ); + _conn = NULL; + } + + } + + Status WiredTigerKVEngine::okToRename( OperationContext* opCtx, + const StringData& fromNS, + const StringData& toNS, + const StringData& ident, + const RecordStore* originalRecordStore ) const { + _sizeStorer->store( _uri( ident ), + originalRecordStore->numRecords( opCtx ), + originalRecordStore->dataSize( opCtx ) ); + syncSizeInfo(); + return Status::OK(); + } + + int64_t WiredTigerKVEngine::getIdentSize( OperationContext* opCtx, + const StringData& ident ) { + WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(); + return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident) ); + } + + Status WiredTigerKVEngine::repairIdent( OperationContext* opCtx, + const StringData& ident ) { + WiredTigerSession session( _conn, -1 ); + WT_SESSION* s = session.getSession(); + string uri = _uri(ident); + return wtRCToStatus( s->compact(s, uri.c_str(), NULL ) ); + } + + int WiredTigerKVEngine::flushAllFiles( bool sync ) { + LOG(1) << "WiredTigerKVEngine::flushAllFiles"; + syncSizeInfo(); + + WiredTigerSession session( _conn, -1 ); + WT_SESSION* s = session.getSession(); + invariantWTOK( s->checkpoint(s, NULL ) ); + + return 1; + } + + void WiredTigerKVEngine::syncSizeInfo() const { + if ( !_sizeStorer ) + return; + + try { + WiredTigerSession session( _conn, -1 ); + WT_SESSION* s = session.getSession(); + invariantWTOK( s->begin_transaction( s, "sync=true" ) ); + _sizeStorer->storeInto( &session, _sizeStorerUri ); + invariantWTOK( s->commit_transaction( s, NULL ) ); + } + catch ( const WriteConflictException& de ) { + // ignore, it means someone else is doing it + } + } + + RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() { + return new WiredTigerRecoveryUnit( _sessionCache.get() ); + } + + void WiredTigerKVEngine::setRecordStoreExtraOptions( const std::string& options ) { + _rsOptions = options; + } + + void WiredTigerKVEngine::setSortedDataInterfaceExtraOptions( const std::string& options ) { + _indexOptions = options; + } + + Status WiredTigerKVEngine::createRecordStore( OperationContext* opCtx, + const StringData& ns, + const StringData& ident, + const CollectionOptions& options ) { + WiredTigerSession session( _conn, -1 ); + + StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(ns, options, _rsOptions); + if (!result.isOK()) { + return result.getStatus(); + } + std::string config = result.getValue(); + + string uri = _uri( ident ); + WT_SESSION* s = session.getSession(); + LOG(1) << "WiredTigerKVEngine::createRecordStore uri: " << uri << " config: " << config; + return wtRCToStatus( s->create( s, uri.c_str(), config.c_str() ) ); + } + + RecordStore* WiredTigerKVEngine::getRecordStore( OperationContext* opCtx, + const StringData& ns, + const StringData& ident, + const CollectionOptions& options ) { + + if (options.capped) { + return new WiredTigerRecordStore(opCtx, ns, _uri(ident), options.capped, + options.cappedSize ? options.cappedSize : 4096, + options.cappedMaxDocs ? options.cappedMaxDocs : -1, + NULL, + _sizeStorer.get() ); + } + else { + return new WiredTigerRecordStore(opCtx, ns, _uri(ident), + false, -1, -1, NULL, _sizeStorer.get() ); + } + } + + Status WiredTigerKVEngine::dropRecordStore( OperationContext* opCtx, + const StringData& ident ) { + _drop( ident ); + return Status::OK(); + } + + string WiredTigerKVEngine::_uri( const StringData& ident ) const { + return string("table:") + ident.toString(); + } + + Status WiredTigerKVEngine::createSortedDataInterface( OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc ) { + return wtRCToStatus( WiredTigerIndex::Create( opCtx, _uri( ident ), _indexOptions, desc ) ); + } + + SortedDataInterface* WiredTigerKVEngine::getSortedDataInterface( OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc ) { + if ( desc->unique() ) + return new WiredTigerIndexUnique( _uri( ident ) ); + return new WiredTigerIndexStandard( _uri( ident ) ); + } + + Status WiredTigerKVEngine::dropSortedDataInterface( OperationContext* opCtx, + const StringData& ident ) { + _drop( ident ); + return Status::OK(); + } + + bool WiredTigerKVEngine::_drop( const StringData& ident ) { + string uri = _uri( ident ); + + WiredTigerSession session( _conn, -1 ); + + int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" ); + LOG(1) << "WT drop of " << uri << " res " << ret; + + if ( ret == 0 ) { + // yay, it worked + return true; + } + + if ( ret == EBUSY ) { + // this is expected, queue it up + { + boost::mutex::scoped_lock lk( _identToDropMutex ); + _identToDrop.insert( uri ); + _epoch++; + } + _sessionCache->closeAll(); + return false; + } + + invariantWTOK( ret ); + return false; + } + + bool WiredTigerKVEngine::haveDropsQueued() const { + if ( _sizeStorerSyncTracker.intervalHasElapsed() ) { + _sizeStorerSyncTracker.resetLastTime(); + syncSizeInfo(); + } + boost::mutex::scoped_lock lk( _identToDropMutex ); + return !_identToDrop.empty(); + } + + void WiredTigerKVEngine::dropAllQueued() { + set<string> mine; + { + boost::mutex::scoped_lock lk( _identToDropMutex ); + mine = _identToDrop; + } + + set<string> deleted; + + { + WiredTigerSession session( _conn, -1 ); + for ( set<string>::const_iterator it = mine.begin(); it != mine.end(); ++it ) { + string uri = *it; + int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" ); + LOG(1) << "WT queued drop of " << uri << " res " << ret; + + if ( ret == 0 ) { + deleted.insert( uri ); + continue; + } + + if ( ret == EBUSY ) { + // leave in qeuue + continue; + } + + invariantWTOK( ret ); + } + } + + { + boost::mutex::scoped_lock lk( _identToDropMutex ); + for ( set<string>::const_iterator it = deleted.begin(); it != deleted.end(); ++it ) { + _identToDrop.erase( *it ); + } + } + } + + bool WiredTigerKVEngine::supportsDocLocking() const { + return true; + } + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h new file mode 100644 index 00000000000..b635c121e1f --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -0,0 +1,138 @@ +// wiredtiger_kv_engine.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <set> +#include <string> + +#include <boost/thread/mutex.hpp> + +#include <wiredtiger.h> + +#include "mongo/bson/ordering.h" +#include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/util/elapsed_tracker.h" + +namespace mongo { + + class WiredTigerSessionCache; + class WiredTigerSizeStorer; + + class WiredTigerKVEngine : public KVEngine { + public: + WiredTigerKVEngine( const std::string& path, + const std::string& extraOpenOptions = "", + bool durable = true ); + virtual ~WiredTigerKVEngine(); + + void setRecordStoreExtraOptions( const std::string& options ); + void setSortedDataInterfaceExtraOptions( const std::string& options ); + + virtual bool supportsDocLocking() const; + + virtual bool isDurable() const { return _durable; } + + virtual RecoveryUnit* newRecoveryUnit(); + + virtual Status createRecordStore( OperationContext* opCtx, + const StringData& ns, + const StringData& ident, + const CollectionOptions& options ); + + virtual RecordStore* getRecordStore( OperationContext* opCtx, + const StringData& ns, + const StringData& ident, + const CollectionOptions& options ); + + virtual Status dropRecordStore( OperationContext* opCtx, + const StringData& ident ); + + virtual Status createSortedDataInterface( OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc ); + + virtual SortedDataInterface* getSortedDataInterface( OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc ); + + virtual Status dropSortedDataInterface( OperationContext* opCtx, + const StringData& ident ); + + virtual Status okToRename( OperationContext* opCtx, + const StringData& fromNS, + const StringData& toNS, + const StringData& ident, + const RecordStore* originalRecordStore ) const; + + virtual int flushAllFiles( bool sync ); + + virtual int64_t getIdentSize( OperationContext* opCtx, + const StringData& ident ); + + virtual Status repairIdent( OperationContext* opCtx, + const StringData& ident ); + + // wiredtiger specific + + WT_CONNECTION* getConnection() { return _conn; } + void dropAllQueued(); + bool haveDropsQueued() const; + + int currentEpoch() const { return _epoch; } + + void syncSizeInfo() const; + + private: + + string _uri( const StringData& ident ) const; + bool _drop( const StringData& ident ); + + WT_CONNECTION* _conn; + WT_EVENT_HANDLER _eventHandler; + boost::scoped_ptr<WiredTigerSessionCache> _sessionCache; + bool _durable; + + string _rsOptions; + string _indexOptions; + + std::set<std::string> _identToDrop; + mutable boost::mutex _identToDropMutex; + + int _epoch; // this is how we keep track of if a session is too old + + scoped_ptr<WiredTigerSizeStorer> _sizeStorer; + string _sizeStorerUri; + mutable ElapsedTracker _sizeStorerSyncTracker; + }; + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp new file mode 100644 index 00000000000..409f0452404 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -0,0 +1,66 @@ +// wiredtiger_kv_engine_test.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/storage/kv/kv_engine_test_harness.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" +#include "mongo/unittest/temp_dir.h" + +namespace mongo { + + class WiredTigerKVHarnessHelper : public KVHarnessHelper { + public: + WiredTigerKVHarnessHelper() + : _dbpath( "wt-kv-harness" ) { + _engine.reset( new WiredTigerKVEngine( _dbpath.path() ) ); + } + + virtual ~WiredTigerKVHarnessHelper() { + _engine.reset( NULL ); + } + + virtual KVEngine* restartEngine() { + _engine.reset( NULL ); + _engine.reset( new WiredTigerKVEngine( _dbpath.path() ) ); + return _engine.get(); + } + + virtual KVEngine* getEngine() { return _engine.get(); } + + private: + unittest::TempDir _dbpath; + boost::scoped_ptr<WiredTigerKVEngine> _engine; + }; + + KVHarnessHelper* KVHarnessHelper::create() { + return new WiredTigerKVHarnessHelper(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp new file mode 100644 index 00000000000..459d0b6d4f7 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_options_init.cpp @@ -0,0 +1,66 @@ +// wiredtiger_options_init.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/util/options_parser/startup_option_init.h" +#include "mongo/util/options_parser/startup_options.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" + +namespace mongo { + + // Interface to MongoDB option parsing. + WiredTigerGlobalOptions wiredTigerGlobalOptions; + + MONGO_GENERAL_STARTUP_OPTIONS_REGISTER(WiredTigerOptions)(InitializerContext* context) { + return wiredTigerGlobalOptions.add(&moe::startupOptions); + } + + MONGO_STARTUP_OPTIONS_VALIDATE(WiredTigerOptions)(InitializerContext* context) { + if (!wiredTigerGlobalOptions.handlePreValidation(moe::startupOptionsParsed)) { + ::_exit(EXIT_SUCCESS); + } + Status ret = moe::startupOptionsParsed.validate(); + if (!ret.isOK()) { + return ret; + } + return Status::OK(); + } + + MONGO_STARTUP_OPTIONS_STORE(WiredTigerOptions)(InitializerContext* context) { + Status ret = wiredTigerGlobalOptions.store(moe::startupOptionsParsed, context->args()); + if (!ret.isOK()) { + std::cerr << ret.toString() << std::endl; + std::cerr << "try '" << context->args()[0] << " --help' for more information" + << std::endl; + ::_exit(EXIT_BADOPTIONS); + } + return Status::OK(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp new file mode 100644 index 00000000000..256cb63e429 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -0,0 +1,1006 @@ +// wiredtiger_record_store.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include <wiredtiger.h> + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/oplog_hack.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +//#define RS_ITERATOR_TRACE(x) log() << "WTRS::Iterator " << x +#define RS_ITERATOR_TRACE(x) + +namespace mongo { +namespace { + bool shouldUseOplogHack(OperationContext* opCtx, const std::string& uri) { + WiredTigerCursor curwrap("metadata:", WiredTigerSession::kMetadataCursorId, opCtx); + WT_CURSOR* c = curwrap.get(); + c->set_key(c, uri.c_str()); + int ret = c->search(c); + if (ret == WT_NOTFOUND) + return false; + invariantWTOK(ret); + + const char* config = NULL; + c->get_value(c, &config); + invariant(config); + + WiredTigerConfigParser topParser(config); + WT_CONFIG_ITEM metadata; + if (topParser.get("app_metadata", &metadata) != 0) + return false; + + if (metadata.len == 0) + return false; + + WiredTigerConfigParser parser(metadata); + WT_CONFIG_ITEM keyItem; + WT_CONFIG_ITEM value; + while (parser.next(&keyItem, &value) == 0) { + const StringData key(keyItem.str, keyItem.len); + if (key == "oplogKeyExtractionVersion") { + if (value.type == WT_CONFIG_ITEM::WT_CONFIG_ITEM_NUM && value.val == 1) + return true; + } + + // This prevents downgrades with unsupported metadata settings. + severe() << "Unrecognized WiredTiger metadata setting: " << key << '=' << value.str; + fassertFailedNoTrace(28548); + } + + return false; + } +} // namespace + + StatusWith<std::string> WiredTigerRecordStore::generateCreateString(const StringData& ns, + const CollectionOptions& options, + const StringData& extraStrings) { + // Separate out a prefix and suffix in the default string. User configuration will + // override values in the prefix, but not values in the suffix. + str::stream ss; + ss << "type=file,"; + ss << "memory_page_max=100m,"; + ss << "block_compressor=snappy,"; + + ss << extraStrings << ","; + + // Validate configuration object. + // Warn about unrecognized fields that may be introduced in newer versions of this + // storage engine instead of raising an error. + // Ensure that 'configString' field is a string. Warn if this is not the case. + BSONForEach(elem, options.storageEngine.getObjectField("wiredtiger")) { + if (elem.fieldNameStringData() == "configString") { + if (elem.type() != String) { + return StatusWith<std::string>(ErrorCodes::TypeMismatch, str::stream() + << "storageEngine.wiredtiger.configString must be a string. " + << "Not adding 'configString' value " + << elem << " to collection configuration"); + continue; + } + ss << elem.valueStringData() << ","; + } + else { + // Return error on first unrecognized field. + return StatusWith<std::string>(ErrorCodes::InvalidOptions, str::stream() + << '\'' << elem.fieldNameStringData() << '\'' + << " is not a supported option in storageEngine.wiredtiger"); + } + } + + if ( NamespaceString::oplog(ns) ) { + // force file for oplog + ss << "type=file,"; + ss << "app_metadata=(oplogKeyExtractionVersion=1),"; + } + else { + // Force this to be empty since users shouldn't be allowed to change it. + ss << "app_metadata=(),"; + } + + ss << "key_format=q,value_format=u"; + return StatusWith<std::string>(ss); + } + + WiredTigerRecordStore::WiredTigerRecordStore( OperationContext* ctx, + const StringData& ns, + const StringData& uri, + bool isCapped, + int64_t cappedMaxSize, + int64_t cappedMaxDocs, + CappedDocumentDeleteCallback* cappedDeleteCallback, + WiredTigerSizeStorer* sizeStorer ) + : RecordStore( ns ), + _uri( uri.toString() ), + _instanceId( WiredTigerSession::genCursorId() ), + _isCapped( isCapped ), + _isOplog( NamespaceString::oplog( ns ) ), + _cappedMaxSize( cappedMaxSize ), + _cappedMaxDocs( cappedMaxDocs ), + _cappedDeleteCallback( cappedDeleteCallback ), + _useOplogHack(shouldUseOplogHack(ctx, _uri)), + _sizeStorer( sizeStorer ) { + + if (_isCapped) { + invariant(_cappedMaxSize > 0); + invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0); + } + else { + invariant(_cappedMaxSize == -1); + invariant(_cappedMaxDocs == -1); + } + + // Find the largest DiskLoc currently in use and estimate the number of records. + scoped_ptr<RecordIterator> iterator( getIterator( ctx, DiskLoc(), + CollectionScanParams::BACKWARD ) ); + if ( iterator->isEOF() ) { + _dataSize.store(0); + _numRecords.store(0); + // Need to start at 1 so we are always higher than minDiskLoc + _nextIdNum.store( 1 ); + if ( sizeStorer ) + _sizeStorer->onCreate( this, 0, 0 ); + + if (_useOplogHack) { + _highestLocForOplogHack = minDiskLoc; + } + } + else { + if (_useOplogHack) { + _highestLocForOplogHack = iterator->curr(); + } + + uint64_t max = _makeKey( iterator->curr() ); + _nextIdNum.store( 1 + max ); + + if ( _sizeStorer ) { + long long numRecords; + long long dataSize; + _sizeStorer->load( uri, &numRecords, &dataSize ); + _numRecords.store( numRecords ); + _dataSize.store( dataSize ); + _sizeStorer->onCreate( this, numRecords, dataSize ); + } + + if ( _sizeStorer == NULL || _numRecords.load() < 10000 ) { + LOG(1) << "doing scan of collection " << ns << " to get info"; + + _numRecords.store(0); + _dataSize.store(0); + + while( !iterator->isEOF() ) { + DiskLoc loc = iterator->getNext(); + RecordData data = iterator->dataFor( loc ); + _numRecords.fetchAndAdd(1); + _dataSize.fetchAndAdd(data.size()); + } + + if ( _sizeStorer ) { + _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); + } + } + + } + + } + + WiredTigerRecordStore::~WiredTigerRecordStore() { + LOG(1) << "~WiredTigerRecordStore for: " << ns(); + if ( _sizeStorer ) { + _sizeStorer->onDestroy( this ); + _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); + } + } + + long long WiredTigerRecordStore::dataSize( OperationContext *txn ) const { + return _dataSize.load(); + } + + long long WiredTigerRecordStore::numRecords( OperationContext *txn ) const { + return _numRecords.load(); + } + + bool WiredTigerRecordStore::isCapped() const { + return _isCapped; + } + + int64_t WiredTigerRecordStore::cappedMaxDocs() const { + invariant(_isCapped); + return _cappedMaxDocs; + } + + int64_t WiredTigerRecordStore::cappedMaxSize() const { + invariant(_isCapped); + return _cappedMaxSize; + } + + int64_t WiredTigerRecordStore::storageSize( OperationContext* txn, + BSONObjBuilder* extraInfo, + int infoLevel ) const { + + BSONObjBuilder b; + appendCustomStats( txn, &b, 1 ); + BSONObj obj = b.obj(); + + BSONObj blockManager = obj["wiredtiger"].Obj()["block manager"].Obj(); + BSONElement fileSize = blockManager["file size in bytes"]; + invariant( fileSize.type() ); + + int64_t size = 0; + if ( fileSize.isNumber() ) { + size = fileSize.safeNumberLong(); + } + else { + invariant( fileSize.type() == String ); + size = strtoll( fileSize.valuestrsafe(), NULL, 10 ); + } + + if ( size == 0 && _isCapped ) { + // Many things assume anempty capped collection still takes up space. + return 1; + } + return size; + } + + // Retrieve the value from a positioned cursor. + RecordData WiredTigerRecordStore::_getData(const WiredTigerCursor& cursor) const { + WT_ITEM value; + int ret = cursor->get_value(cursor.get(), &value); + invariantWTOK(ret); + + boost::shared_array<char> data( new char[value.size] ); + memcpy( data.get(), value.data, value.size ); + return RecordData(reinterpret_cast<const char *>(data.get()), value.size, data ); + } + + RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const DiskLoc& loc) const { + // ownership passes to the shared_array created below + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + invariant( c ); + c->set_key(c, _makeKey(loc)); + int ret = c->search(c); + massert( 28556, + "Didn't find DiskLoc in WiredTigerRecordStore", + ret != WT_NOTFOUND ); + invariantWTOK(ret); + return _getData(curwrap); + } + + bool WiredTigerRecordStore::findRecord( OperationContext* txn, + const DiskLoc& loc, RecordData* out ) const { + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + invariant( c ); + c->set_key(c, _makeKey(loc)); + int ret = c->search(c); + if ( ret == WT_NOTFOUND ) + return false; + invariantWTOK(ret); + *out = _getData(curwrap); + return true; + } + + void WiredTigerRecordStore::deleteRecord( OperationContext* txn, const DiskLoc& loc ) { + WiredTigerCursor cursor( _uri, _instanceId, txn ); + WT_CURSOR *c = cursor.get(); + c->set_key(c, _makeKey(loc)); + int ret = c->search(c); + invariantWTOK(ret); + + WT_ITEM old_value; + ret = c->get_value(c, &old_value); + invariantWTOK(ret); + + int old_length = old_value.size; + + ret = c->remove(c); + invariantWTOK(ret); + + _changeNumRecords(txn, false); + _increaseDataSize(txn, -old_length); + } + + bool WiredTigerRecordStore::cappedAndNeedDelete(OperationContext* txn) const { + if (!_isCapped) + return false; + + if (_dataSize.load() > _cappedMaxSize) + return true; + + if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs)) + return true; + + return false; + } + + namespace { + int oplogCounter = 0; + } + + void WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn) { + + bool useTruncate = false; + + if ( _isOplog ) { + if ( oplogCounter++ % 100 > 0 ) + return; + } + + if (!cappedAndNeedDelete(txn)) + return; + + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + int ret = c->next(c); + DiskLoc oldest; + while ( ret == 0 && cappedAndNeedDelete(txn) ) { + invariant(_numRecords.load() > 0); + + uint64_t key; + ret = c->get_key(c, &key); + invariantWTOK(ret); + oldest = _fromKey(key); + + if ( _cappedDeleteCallback ) { + uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest)); + } + + if ( useTruncate ) { + _changeNumRecords( txn, false ); + WT_ITEM temp; + invariantWTOK( c->get_value( c, &temp ) ); + _increaseDataSize( txn, temp.size ); + } + else { + deleteRecord( txn, oldest ); + } + + ret = c->next(c); + } + + if (ret != WT_NOTFOUND) invariantWTOK(ret); + + if ( useTruncate && !oldest.isNull() ) { + c->reset( c ); + c->set_key( c, _makeKey( oldest ) ); + invariantWTOK( curwrap.getWTSession()->truncate( curwrap.getWTSession(), + NULL, NULL, c, "" ) ); + } + + } + + StatusWith<DiskLoc> WiredTigerRecordStore::extractAndCheckLocForOplog(const char* data, + int len) { + StatusWith<DiskLoc> status = oploghack::extractKey(data, len); + if (!status.isOK()) + return status; + + if (status.getValue() <= _highestLocForOplogHack) + return StatusWith<DiskLoc>(ErrorCodes::BadValue, "ts not higher than highest"); + + _highestLocForOplogHack = status.getValue(); + return status; + } + + StatusWith<DiskLoc> WiredTigerRecordStore::insertRecord( OperationContext* txn, + const char* data, + int len, + bool enforceQuota ) { + if ( _isCapped && len > _cappedMaxSize ) { + return StatusWith<DiskLoc>( ErrorCodes::BadValue, + "object to insert exceeds cappedMaxSize" ); + } + + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + invariant( c ); + + DiskLoc loc; + if (_useOplogHack) { + StatusWith<DiskLoc> status = extractAndCheckLocForOplog(data, len); + if (!status.isOK()) + return status; + loc = status.getValue(); + } + else { + loc = _nextId(); + } + + c->set_key(c, _makeKey(loc)); + WiredTigerItem value(data, len); + c->set_value(c, value.Get()); + int ret = c->insert(c); + invariantWTOK(ret); + + _changeNumRecords( txn, true ); + _increaseDataSize( txn, len ); + + cappedDeleteAsNeeded(txn); + + return StatusWith<DiskLoc>( loc ); + } + + StatusWith<DiskLoc> WiredTigerRecordStore::insertRecord( OperationContext* txn, + const DocWriter* doc, + bool enforceQuota ) { + const int len = doc->documentSize(); + + if ( _isCapped && len > _cappedMaxSize ) { + return StatusWith<DiskLoc>( ErrorCodes::BadValue, + "object to insert exceeds cappedMaxSize" ); + } + + boost::shared_array<char> buf( new char[len] ); + doc->writeDocument( buf.get() ); + + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + + DiskLoc loc; + if (_useOplogHack) { + StatusWith<DiskLoc> status = extractAndCheckLocForOplog(buf.get(), len); + if (!status.isOK()) + return status; + loc = status.getValue(); + } + else { + loc = _nextId(); + } + + c->set_key(c, _makeKey(loc)); + WiredTigerItem value(buf.get(), len); + c->set_value(c, value.Get()); + int ret = c->insert(c); + invariantWTOK(ret); + + _changeNumRecords( txn, true ); + _increaseDataSize( txn, len ); + + cappedDeleteAsNeeded( txn ); + + return StatusWith<DiskLoc>( loc ); + } + + StatusWith<DiskLoc> WiredTigerRecordStore::updateRecord( OperationContext* txn, + const DiskLoc& loc, + const char* data, + int len, + bool enforceQuota, + UpdateMoveNotifier* notifier ) { + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + invariant( c ); + c->set_key(c, _makeKey(loc)); + int ret = c->search(c); + invariantWTOK(ret); + + WT_ITEM old_value; + ret = c->get_value(c, &old_value); + invariantWTOK(ret); + + int old_length = old_value.size; + + c->set_key(c, _makeKey(loc)); + WiredTigerItem value(data, len); + c->set_value(c, value.Get()); + ret = c->update(c); + invariantWTOK(ret); + + _increaseDataSize(txn, len - old_length); + + cappedDeleteAsNeeded(txn); + + return StatusWith<DiskLoc>( loc ); + } + + Status WiredTigerRecordStore::updateWithDamages( OperationContext* txn, + const DiskLoc& loc, + const RecordData& oldRec, + const char* damangeSource, + const mutablebson::DamageVector& damages ) { + + // apply changes to our copy + + std::string data(reinterpret_cast<const char *>(oldRec.data()), oldRec.size()); + + char* root = const_cast<char*>( data.c_str() ); + for( size_t i = 0; i < damages.size(); i++ ) { + mutablebson::DamageEvent event = damages[i]; + const char* sourcePtr = damangeSource + event.sourceOffset; + char* targetPtr = root + event.targetOffset; + std::memcpy(targetPtr, sourcePtr, event.size); + } + + // write back + + WiredTigerItem value(data); + + WiredTigerCursor curwrap( _uri, _instanceId, txn); + WT_CURSOR *c = curwrap.get(); + c->set_key(c, _makeKey(loc)); + c->set_value(c, value.Get()); + + int ret = c->update(c); + invariantWTOK(ret); + + return Status::OK(); + } + + RecordIterator* WiredTigerRecordStore::getIterator( OperationContext* txn, + const DiskLoc& start, + const CollectionScanParams::Direction& dir ) const { + return new Iterator(*this, txn, start, dir, false); + } + + + RecordIterator* WiredTigerRecordStore::getIteratorForRepair( OperationContext* txn ) const { + return getIterator( txn ); + } + + std::vector<RecordIterator*> WiredTigerRecordStore::getManyIterators( + OperationContext* txn ) const { + // XXX do we want this to actually return a set of iterators? + + std::vector<RecordIterator*> iterators; + iterators.push_back( new Iterator(*this, txn, DiskLoc(), + CollectionScanParams::FORWARD, true) ); + + return iterators; + } + + Status WiredTigerRecordStore::truncate( OperationContext* txn ) { + // TODO: use a WiredTiger fast truncate + boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); + while( !iter->isEOF() ) { + DiskLoc loc = iter->getNext(); + deleteRecord( txn, loc ); + } + + // WiredTigerRecoveryUnit* ru = _getRecoveryUnit( txn ); + + _highestLocForOplogHack = DiskLoc(); + + return Status::OK(); + } + + Status WiredTigerRecordStore::compact( OperationContext* txn, + RecordStoreCompactAdaptor* adaptor, + const CompactOptions* options, + CompactStats* stats ) { + WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(txn)->getSessionCache(); + WiredTigerSession* session = cache->getSession(); + WT_SESSION *s = session->getSession(); + int ret = s->compact(s, GetURI().c_str(), NULL); + invariantWTOK(ret); + cache->releaseSession(session); + return Status::OK(); + } + + Status WiredTigerRecordStore::validate( OperationContext* txn, + bool full, bool scanData, + ValidateAdaptor* adaptor, + ValidateResults* results, + BSONObjBuilder* output ) const { + + long long nrecords = 0; + boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); + results->valid = true; + while( !iter->isEOF() ) { + ++nrecords; + if ( full && scanData ) { + size_t dataSize; + DiskLoc loc = iter->curr(); + RecordData data = dataFor( txn, loc ); + Status status = adaptor->validate( data, &dataSize ); + if ( !status.isOK() ) { + results->valid = false; + results->errors.push_back( loc.toString() + " is corrupted" ); + } + } + iter->getNext(); + } + + output->appendNumber( "nrecords", nrecords ); + + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WT_SESSION* s = session->getSession(); + BSONObjBuilder bob(output->subobjStart("wiredtiger")); + Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + GetURI(), + "statistics=(fast)", &bob); + if (!status.isOK()) { + bob.append("error", "unable to retrieve statistics"); + bob.append("code", static_cast<int>(status.code())); + bob.append("reason", status.reason()); + } + return Status::OK(); + } + + void WiredTigerRecordStore::appendCustomStats( OperationContext* txn, + BSONObjBuilder* result, + double scale ) const { + result->appendBool( "capped", _isCapped ); + if ( _isCapped ) { + result->appendIntOrLL( "max", _cappedMaxDocs ); + result->appendIntOrLL( "maxSize", _cappedMaxSize ); + } + WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); + WT_SESSION* s = session->getSession(); + BSONObjBuilder bob(result->subobjStart("wiredtiger")); + Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + GetURI(), + "statistics=(fast)", &bob); + if (!status.isOK()) { + bob.append("error", "unable to retrieve statistics"); + bob.append("code", static_cast<int>(status.code())); + bob.append("reason", status.reason()); + } + } + + + Status WiredTigerRecordStore::touch( OperationContext* txn, BSONObjBuilder* output ) const { + if (output) { + output->append("numRanges", 1); + output->append("millis", 0); + } + return Status::OK(); + } + + Status WiredTigerRecordStore::setCustomOption( OperationContext* txn, + const BSONElement& option, + BSONObjBuilder* info ) { + string optionName = option.fieldName(); + if ( !option.isBoolean() ) { + return Status( ErrorCodes::BadValue, "Invalid Value" ); + } + // TODO: expose some WiredTiger configurations + if ( optionName == "usePowerOf2Sizes" ) { + return Status::OK(); + } else + if ( optionName.compare( "verify_checksums" ) == 0 ) { + } + else + return Status( ErrorCodes::InvalidOptions, "Invalid Option" ); + + return Status::OK(); + } + + DiskLoc WiredTigerRecordStore::oplogStartHack(OperationContext* txn, + const DiskLoc& startingPosition) const { + if (!_useOplogHack) + return DiskLoc().setInvalid(); + + WiredTigerCursor cursor(_uri, _instanceId, txn); + WT_CURSOR* c = cursor.get(); + + int cmp; + c->set_key(c, _makeKey(startingPosition)); + int ret = c->search_near(c, &cmp); + if (ret == 0 && cmp > 0) ret = c->prev(c); // landed one higher than startingPosition + if (ret == WT_NOTFOUND) return DiskLoc(); // nothing <= startingPosition + invariantWTOK(ret); + + uint64_t key; + ret = c->get_key(c, &key); + invariantWTOK(ret); + return _fromKey(key); + } + + DiskLoc WiredTigerRecordStore::_nextId() { + invariant(!_useOplogHack); + const uint64_t myId = _nextIdNum.fetchAndAdd(1); + int a = myId >> 32; + // This masks the lowest 4 bytes of myId + int ofs = myId & 0x00000000FFFFFFFF; + DiskLoc loc( a, ofs ); + return loc; + } + + WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit( OperationContext* txn ) { + return dynamic_cast<WiredTigerRecoveryUnit*>( txn->recoveryUnit() ); + } + + class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change { + public: + NumRecordsChange(WiredTigerRecordStore* rs, bool insert) :_rs(rs), _insert(insert) {} + virtual void commit() {} + virtual void rollback() { + _rs->_numRecords.fetchAndAdd(_insert ? -1 : 1); + } + + private: + WiredTigerRecordStore* _rs; + bool _insert; + }; + + void WiredTigerRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) { + txn->recoveryUnit()->registerChange(new NumRecordsChange(this, insert)); + if ( _numRecords.fetchAndAdd(insert ? 1 : -1) < 0 ) { + if ( insert ) + _numRecords.store( 1 ); + else + _numRecords.store( 0 ); + } + } + + class WiredTigerRecordStore::DataSizeChange : public RecoveryUnit::Change { + public: + DataSizeChange(WiredTigerRecordStore* rs, int amount) :_rs(rs), _amount(amount) {} + virtual void commit() {} + virtual void rollback() { + _rs->_increaseDataSize( NULL, -_amount ); + } + + private: + WiredTigerRecordStore* _rs; + bool _amount; + }; + + void WiredTigerRecordStore::_increaseDataSize( OperationContext* txn, int amount ) { + if ( txn ) + txn->recoveryUnit()->registerChange(new DataSizeChange(this, amount)); + + if ( _dataSize.fetchAndAdd(amount) < 0 ) { + if ( amount > 0 ) { + _dataSize.store( amount ); + } + else { + _dataSize.store( 0 ); + } + } + + if ( _sizeStorer && _sizeStorerCounter++ % 1000 == 0 ) { + _sizeStorer->store( _uri, _numRecords.load(), _dataSize.load() ); + } + } + + uint64_t WiredTigerRecordStore::_makeKey( const DiskLoc& loc ) { + return ((uint64_t)loc.a() << 32 | loc.getOfs()); + } + DiskLoc WiredTigerRecordStore::_fromKey( uint64_t key ) { + uint32_t a = key >> 32; + uint32_t ofs = (uint32_t)key; + return DiskLoc(a, ofs); + } + + // -------- + + WiredTigerRecordStore::Iterator::Iterator( + const WiredTigerRecordStore& rs, + OperationContext *txn, + const DiskLoc& start, + const CollectionScanParams::Direction& dir, + bool forParallelCollectionScan) + : _rs( rs ), + _txn( txn ), + _dir( dir ), + _forParallelCollectionScan( forParallelCollectionScan ), + _cursor( new WiredTigerCursor( rs.GetURI(), rs.instanceId(), txn ) ) { + RS_ITERATOR_TRACE("start"); + _locate(start, true); + } + + WiredTigerRecordStore::Iterator::~Iterator() { + } + + void WiredTigerRecordStore::Iterator::_locate(const DiskLoc &loc, bool exact) { + RS_ITERATOR_TRACE("_locate " << loc); + WT_CURSOR *c = _cursor->get(); + invariant( c ); + int ret; + if (loc.isNull()) { + ret = _forward() ? c->next(c) : c->prev(c); + if (ret != WT_NOTFOUND) invariantWTOK(ret); + _eof = (ret == WT_NOTFOUND); + RS_ITERATOR_TRACE("_locate null loc eof: " << _eof); + return; + } + c->set_key(c, _makeKey(loc)); + if (exact) { + ret = c->search(c); + } + else { + // If loc doesn't exist, inexact matches should find the first existing record before + // it, in the direction of the scan. Note that inexact callers will call _getNext() + // after locate so they actually return the record *after* the one we seek to. + int cmp; + ret = c->search_near(c, &cmp); + if ( ret == WT_NOTFOUND ) { + _eof = true; + return; + } + invariantWTOK(ret); + if (_forward()) { + // return >= loc + if (cmp < 0) + ret = c->next(c); + } + else { + // return <= loc + if (cmp > 0) + ret = c->prev(c); + } + } + if (ret != WT_NOTFOUND) invariantWTOK(ret); + _eof = (ret == WT_NOTFOUND); + RS_ITERATOR_TRACE("_locate not null loc eof: " << _eof); + } + + bool WiredTigerRecordStore::Iterator::isEOF() { + RS_ITERATOR_TRACE( "isEOF " << _eof << " " << _lastLoc ); + return _eof; + } + + // Allow const functions to use curr to find current location. + DiskLoc WiredTigerRecordStore::Iterator::_curr() const { + RS_ITERATOR_TRACE( "_curr" ); + if (_eof) + return DiskLoc(); + + WT_CURSOR *c = _cursor->get(); + invariant( c ); + uint64_t key; + int ret = c->get_key(c, &key); + invariantWTOK(ret); + return _fromKey(key); + } + + DiskLoc WiredTigerRecordStore::Iterator::curr() { + return _curr(); + } + + void WiredTigerRecordStore::Iterator::_getNext() { + RS_ITERATOR_TRACE("_getNext"); + WT_CURSOR *c = _cursor->get(); + int ret = _forward() ? c->next(c) : c->prev(c); + if (ret != WT_NOTFOUND) invariantWTOK(ret); + _eof = (ret == WT_NOTFOUND); + RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof ); + if ( !_eof ) { + RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof << " " << _curr() ); + } + } + + DiskLoc WiredTigerRecordStore::Iterator::getNext() { + RS_ITERATOR_TRACE( "getNext" ); + /* Take care not to restart a scan if we have hit the end */ + if (isEOF()) + return DiskLoc(); + + /* MongoDB expects "natural" ordering - which is the order that items are inserted. */ + DiskLoc toReturn = curr(); + RS_ITERATOR_TRACE( "getNext toReturn: " << toReturn ); + _getNext(); + RS_ITERATOR_TRACE( " ----" ); + _lastLoc = toReturn; + return toReturn; + } + + void WiredTigerRecordStore::Iterator::invalidate( const DiskLoc& dl ) { + // this should never be called + } + + void WiredTigerRecordStore::Iterator::saveState() { + RS_ITERATOR_TRACE("saveState"); + + // the cursor and recoveryUnit are valid on restore + // so we just record the recoveryUnit to make sure + _savedRecoveryUnit = _txn->recoveryUnit(); + if ( !wt_keeptxnopen() ) { + _cursor->reset(); + } + + if ( _forParallelCollectionScan ) { + _cursor.reset( NULL ); + } + _txn = NULL; + } + + bool WiredTigerRecordStore::Iterator::restoreState( OperationContext *txn ) { + + // This is normally already the case, but sometimes we are given a new + // OperationContext on restore - update the iterators context in that + // case + _txn = txn; + + bool needRestore = false; + + if ( _forParallelCollectionScan ) { + invariant( _savedRecoveryUnit != txn->recoveryUnit() ); + // parallel collection scan or something + needRestore = true; + _savedRecoveryUnit = txn->recoveryUnit(); + _cursor.reset( new WiredTigerCursor( _rs.GetURI(), _rs.instanceId(), txn ) ); + _forParallelCollectionScan = false; // we only do this the first time + } + + invariant( _savedRecoveryUnit == txn->recoveryUnit() ); + if ( needRestore || !wt_keeptxnopen() ) { + DiskLoc saved = _lastLoc; + _locate(_lastLoc, false); + RS_ITERATOR_TRACE( "isEOF check " << _eof ); + if ( _eof ) { + _lastLoc = DiskLoc(); + } + else if ( _curr() != saved ) { + // old doc deleted, we're ok + } + else { + // we found where we left off! + // now we advance to the next one + RS_ITERATOR_TRACE( "isEOF found " << _curr() ); + _getNext(); + } + } + + return true; + } + + RecordData WiredTigerRecordStore::Iterator::dataFor( const DiskLoc& loc ) const { + // Retrieve the data if the iterator is already positioned at loc, otherwise + // open a new cursor and find the data to avoid upsetting the iterators + // cursor position. + if (loc == _curr()) + return (_rs._getData(*_cursor)); + else + return (_rs.dataFor( _txn, loc )); + } + + bool WiredTigerRecordStore::Iterator::_forward() const { + return _dir == CollectionScanParams::FORWARD; + } + + void WiredTigerRecordStore::temp_cappedTruncateAfter( OperationContext* txn, + DiskLoc end, + bool inclusive ) { + WriteUnitOfWork wuow(txn); + boost::scoped_ptr<RecordIterator> iter( getIterator( txn, end ) ); + while( !iter->isEOF() ) { + DiskLoc loc = iter->getNext(); + if ( end < loc || ( inclusive && end == loc ) ) { + deleteRecord( txn, loc ); + } + } + wuow.commit(); + + iter.reset(getIterator(txn, DiskLoc(), CollectionScanParams::BACKWARD)); + _highestLocForOplogHack = iter->isEOF() ? DiskLoc() : iter->curr(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h new file mode 100644 index 00000000000..6f8634e8436 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -0,0 +1,246 @@ +// wiredtiger_record_store.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/storage/record_store.h" +#include "mongo/db/storage/capped_callback.h" +#include "mongo/platform/atomic_word.h" + +namespace mongo { + + class RecoveryUnit; + class WiredTigerCursor; + class WiredTigerRecoveryUnit; + class WiredTigerSizeStorer; + + class WiredTigerRecordStore : public RecordStore { + public: + + /** + * Creates a configuration string suitable for 'config' parameter in WT_SESSION::create(). + * Configuration string is constructed from: + * built-in defaults + * storageEngine.wiredtiger.configString in 'options' + * 'extraStrings' + * Performs simple validation on the supplied parameters. + * Returns error status if validation fails. + * Note that even if this function returns an OK status, WT_SESSION:create() may still + * fail with the constructed configuration string. + */ + static StatusWith<std::string> generateCreateString(const StringData& ns, + const CollectionOptions &options, + const StringData& extraStrings); + + WiredTigerRecordStore(OperationContext* txn, + const StringData& ns, + const StringData& uri, + bool isCapped = false, + int64_t cappedMaxSize = -1, + int64_t cappedMaxDocs = -1, + CappedDocumentDeleteCallback* cappedDeleteCallback = NULL, + WiredTigerSizeStorer* sizeStorer = NULL ); + + virtual ~WiredTigerRecordStore(); + + // name of the RecordStore implementation + virtual const char* name() const { return "wiredtiger"; } + + virtual long long dataSize( OperationContext *txn ) const; + + virtual long long numRecords( OperationContext* txn ) const; + + virtual bool isCapped() const; + + virtual int64_t storageSize( OperationContext* txn, + BSONObjBuilder* extraInfo = NULL, + int infoLevel = 0 ) const; + + // CRUD related + + virtual RecordData dataFor( OperationContext* txn, const DiskLoc& loc ) const; + + virtual bool findRecord( OperationContext* txn, const DiskLoc& loc, RecordData* out ) const; + + virtual void deleteRecord( OperationContext* txn, const DiskLoc& dl ); + + virtual StatusWith<DiskLoc> insertRecord( OperationContext* txn, + const char* data, + int len, + bool enforceQuota ); + + virtual StatusWith<DiskLoc> insertRecord( OperationContext* txn, + const DocWriter* doc, + bool enforceQuota ); + + virtual StatusWith<DiskLoc> updateRecord( OperationContext* txn, + const DiskLoc& oldLocation, + const char* data, + int len, + bool enforceQuota, + UpdateMoveNotifier* notifier ); + + virtual Status updateWithDamages( OperationContext* txn, + const DiskLoc& loc, + const RecordData& oldRec, + const char* damangeSource, + const mutablebson::DamageVector& damages ); + + virtual RecordIterator* getIterator( OperationContext* txn, + const DiskLoc& start = DiskLoc(), + const CollectionScanParams::Direction& dir = + CollectionScanParams::FORWARD ) const; + + virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const; + + virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const; + + virtual Status truncate( OperationContext* txn ); + + virtual bool compactSupported() const { return true; } + + virtual Status compact( OperationContext* txn, + RecordStoreCompactAdaptor* adaptor, + const CompactOptions* options, + CompactStats* stats ); + + virtual Status validate( OperationContext* txn, + bool full, bool scanData, + ValidateAdaptor* adaptor, + ValidateResults* results, BSONObjBuilder* output ) const; + + virtual void appendCustomStats( OperationContext* txn, + BSONObjBuilder* result, + double scale ) const; + + virtual Status touch( OperationContext* txn, BSONObjBuilder* output ) const; + + virtual Status setCustomOption( OperationContext* txn, + const BSONElement& option, + BSONObjBuilder* info = NULL ); + + virtual void temp_cappedTruncateAfter(OperationContext* txn, + DiskLoc end, + bool inclusive); + + virtual DiskLoc oplogStartHack(OperationContext* txn, + const DiskLoc& startingPosition) const; + + void setCappedDeleteCallback(CappedDocumentDeleteCallback* cb) { + _cappedDeleteCallback = cb; + } + int64_t cappedMaxDocs() const; + int64_t cappedMaxSize() const; + + const std::string& GetURI() const { return _uri; } + uint64_t instanceId() const { return _instanceId; } + + void setSizeStorer( WiredTigerSizeStorer* ss ) { _sizeStorer = ss; } + + private: + + class Iterator : public RecordIterator { + public: + Iterator( const WiredTigerRecordStore& rs, + OperationContext* txn, + const DiskLoc& start, + const CollectionScanParams::Direction& dir, + bool forParallelCollectionScan ); + + virtual ~Iterator(); + + virtual bool isEOF(); + virtual DiskLoc curr(); + virtual DiskLoc getNext(); + virtual void invalidate(const DiskLoc& dl); + virtual void saveState(); + virtual bool restoreState(OperationContext *txn); + virtual RecordData dataFor( const DiskLoc& loc ) const; + + private: + bool _forward() const; + void _getNext(); + void _locate( const DiskLoc &loc, bool exact ); + void _checkStatus(); + DiskLoc _curr() const; // const version of public curr method + + const WiredTigerRecordStore& _rs; + OperationContext* _txn; + RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore + CollectionScanParams::Direction _dir; + bool _forParallelCollectionScan; + scoped_ptr<WiredTigerCursor> _cursor; + bool _eof; + + DiskLoc _lastLoc; // the last thing returned from getNext() + }; + + class NumRecordsChange; + class DataSizeChange; + + static WiredTigerRecoveryUnit* _getRecoveryUnit( OperationContext* txn ); + + static uint64_t _makeKey(const DiskLoc &loc); + static DiskLoc _fromKey(uint64_t k); + + DiskLoc _nextId(); + void _setId(DiskLoc loc); + bool cappedAndNeedDelete(OperationContext* txn) const; + void cappedDeleteAsNeeded(OperationContext* txn); + void _changeNumRecords(OperationContext* txn, bool insert); + void _increaseDataSize(OperationContext* txn, int amount); + RecordData _getData( const WiredTigerCursor& cursor) const; + StatusWith<DiskLoc> extractAndCheckLocForOplog(const char* data, int len); + + const std::string _uri; + const uint64_t _instanceId; // not persisted + + // The capped settings should not be updated once operations have started + const bool _isCapped; + const bool _isOplog; + const int64_t _cappedMaxSize; + const int64_t _cappedMaxDocs; + CappedDocumentDeleteCallback* _cappedDeleteCallback; + + const bool _useOplogHack; + DiskLoc _highestLocForOplogHack; + + AtomicUInt64 _nextIdNum; + AtomicInt64 _dataSize; + AtomicInt64 _numRecords; + + WiredTigerSizeStorer* _sizeStorer; // not owned, can be NULL + int _sizeStorerCounter; + }; +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp new file mode 100644 index 00000000000..0f2f0541764 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -0,0 +1,410 @@ +// wiredtiger_record_store_test.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <sstream> +#include <string> + +#include "mongo/db/json.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/storage/record_store_test_harness.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + + using std::string; + using std::stringstream; + + class WiredTigerHarnessHelper : public HarnessHelper { + public: + WiredTigerHarnessHelper() : _dbpath( "wt_test" ), _conn( NULL ) { + + std::stringstream ss; + ss << "create,"; + ss << "statistics=(all),"; + string config = ss.str(); + int ret = wiredtiger_open( _dbpath.path().c_str(), NULL, config.c_str(), &_conn); + invariantWTOK( ret ); + + _sessionCache = new WiredTigerSessionCache( _conn ); + } + + ~WiredTigerHarnessHelper() { + delete _sessionCache; + _conn->close(_conn, NULL); + } + + virtual RecordStore* newNonCappedRecordStore() { return newNonCappedRecordStore("a.b"); } + RecordStore* newNonCappedRecordStore(const std::string& ns) { + WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit( _sessionCache ); + OperationContextNoop txn( ru ); + string uri = "table:" + ns; + + StatusWith<std::string> result = + WiredTigerRecordStore::generateCreateString(ns, CollectionOptions(), ""); + ASSERT_TRUE(result.isOK()); + std::string config = result.getValue(); + + { + WriteUnitOfWork uow(&txn); + WT_SESSION* s = ru->getSession()->getSession(); + invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) ); + uow.commit(); + } + + return new WiredTigerRecordStore( &txn, ns, uri ); + } + + virtual RecordStore* newCappedRecordStore( int64_t cappedMaxSize, + int64_t cappedMaxDocs ) { + std::string ns = "a.b"; + + WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit( _sessionCache ); + OperationContextNoop txn( ru ); + string uri = "table:a.b"; + + StatusWith<std::string> result = + WiredTigerRecordStore::generateCreateString("", CollectionOptions(), ""); + ASSERT_TRUE(result.isOK()); + std::string config = result.getValue(); + + { + WriteUnitOfWork uow(&txn); + WT_SESSION* s = ru->getSession()->getSession(); + invariantWTOK( s->create( s, uri.c_str(), config.c_str() ) ); + uow.commit(); + } + + return new WiredTigerRecordStore( &txn, ns, uri, true, cappedMaxSize, cappedMaxDocs ); + } + + virtual RecoveryUnit* newRecoveryUnit() { + return new WiredTigerRecoveryUnit( _sessionCache ); + } + private: + unittest::TempDir _dbpath; + WT_CONNECTION* _conn; + WiredTigerSessionCache* _sessionCache; + }; + + HarnessHelper* newHarnessHelper() { + return new WiredTigerHarnessHelper(); + } + + TEST(WiredTigerRecordStoreTest, GenerateCreateStringUnknownField) { + CollectionOptions options; + options.storageEngine = fromjson("{wiredtiger: {unknownField: 1}}"); + StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString("", options, ""); + const Status& status = result.getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::InvalidOptions, status.code()); + } + + TEST(WiredTigerRecordStoreTest, GenerateCreateStringNonStringConfig) { + CollectionOptions options; + options.storageEngine = fromjson("{wiredtiger: {configString: 12345}}"); + StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString("", options, ""); + const Status& status = result.getStatus(); + ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::TypeMismatch, status.code()); + } + + TEST(WiredTigerRecordStoreTest, Isolation1 ) { + scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() ); + scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() ); + + DiskLoc loc1; + DiskLoc loc2; + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + { + WriteUnitOfWork uow( opCtx.get() ); + + StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false ); + ASSERT_OK( res.getStatus() ); + loc1 = res.getValue(); + + res = rs->insertRecord( opCtx.get(), "a", 2, false ); + ASSERT_OK( res.getStatus() ); + loc2 = res.getValue(); + + uow.commit(); + } + } + + { + scoped_ptr<OperationContext> t1( harnessHelper->newOperationContext() ); + scoped_ptr<OperationContext> t2( harnessHelper->newOperationContext() ); + + scoped_ptr<WriteUnitOfWork> w1( new WriteUnitOfWork( t1.get() ) ); + scoped_ptr<WriteUnitOfWork> w2( new WriteUnitOfWork( t2.get() ) ); + + rs->dataFor( t1.get(), loc1 ); + rs->dataFor( t2.get(), loc1 ); + + ASSERT_OK( rs->updateRecord( t1.get(), loc1, "b", 2, false, NULL ).getStatus() ); + ASSERT_OK( rs->updateRecord( t1.get(), loc2, "B", 2, false, NULL ).getStatus() ); + + try { + // this should fail + rs->updateRecord( t2.get(), loc1, "c", 2, false, NULL ); + ASSERT( 0 ); + } + catch ( WriteConflictException& dle ) { + w2.reset( NULL ); + t2.reset( NULL ); + } + + w1->commit(); // this should succeed + } + } + + TEST(WiredTigerRecordStoreTest, Isolation2 ) { + scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() ); + scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() ); + + DiskLoc loc1; + DiskLoc loc2; + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + { + WriteUnitOfWork uow( opCtx.get() ); + + StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false ); + ASSERT_OK( res.getStatus() ); + loc1 = res.getValue(); + + res = rs->insertRecord( opCtx.get(), "a", 2, false ); + ASSERT_OK( res.getStatus() ); + loc2 = res.getValue(); + + uow.commit(); + } + } + + { + scoped_ptr<OperationContext> t1( harnessHelper->newOperationContext() ); + scoped_ptr<OperationContext> t2( harnessHelper->newOperationContext() ); + + // ensure we start transactions + rs->dataFor( t1.get(), loc2 ); + rs->dataFor( t2.get(), loc2 ); + + { + WriteUnitOfWork w( t1.get() ); + ASSERT_OK( rs->updateRecord( t1.get(), loc1, "b", 2, false, NULL ).getStatus() ); + w.commit(); + } + + { + WriteUnitOfWork w( t2.get() ); + ASSERT_EQUALS( string("a"), rs->dataFor( t2.get(), loc1 ).data() ); + try { + // this should fail as our version of loc1 is too old + rs->updateRecord( t2.get(), loc1, "c", 2, false, NULL ); + ASSERT( 0 ); + } + catch ( WriteConflictException& dle ) { + } + + } + + } + } + + TEST(WiredTigerRecordStoreTest, SizeStorer1 ) { + scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() ); + scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() ); + + string uri = dynamic_cast<WiredTigerRecordStore*>( rs.get() )->GetURI(); + + WiredTigerSizeStorer ss; + dynamic_cast<WiredTigerRecordStore*>( rs.get() )->setSizeStorer( &ss ); + + int N = 12; + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + { + WriteUnitOfWork uow( opCtx.get() ); + for ( int i = 0; i < N; i++ ) { + StatusWith<DiskLoc> res = rs->insertRecord( opCtx.get(), "a", 2, false ); + ASSERT_OK( res.getStatus() ); + } + uow.commit(); + } + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) ); + } + + rs.reset( NULL ); + + { + long long numRecords; + long long dataSize; + ss.load( uri, &numRecords, &dataSize ); + ASSERT_EQUALS( N, numRecords ); + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + rs.reset( new WiredTigerRecordStore( opCtx.get(), "a.b", uri, + false, -1, -1, NULL, &ss ) ); + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + ASSERT_EQUALS( N, rs->numRecords( opCtx.get() ) ); + } + + string indexUri = "table:myindex"; + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + WiredTigerRecoveryUnit* ru = + dynamic_cast<WiredTigerRecoveryUnit*>( opCtx->recoveryUnit() ); + + { + WriteUnitOfWork uow( opCtx.get() ); + WT_SESSION* s = ru->getSession()->getSession(); + invariantWTOK( s->create( s, indexUri.c_str(), "" ) ); + uow.commit(); + } + + { + WriteUnitOfWork uow( opCtx.get() ); + ss.storeInto( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri ); + uow.commit(); + } + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + WiredTigerSizeStorer ss2; + ss2.loadFrom( WiredTigerRecoveryUnit::get( opCtx.get() )->getSession(), indexUri ); + long long numRecords; + long long dataSize; + ss2.load( uri, &numRecords, &dataSize ); + ASSERT_EQUALS( N, numRecords ); + } + + rs.reset( NULL ); // this has to be deleted before ss + } + + StatusWith<DiskLoc> insertBSON(ptr<OperationContext> opCtx, ptr<RecordStore> rs, + const BSONObj& obj) { + WriteUnitOfWork wuow(opCtx); + StatusWith<DiskLoc> status = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false); + if (status.isOK()) + wuow.commit(); + return status; + } + + // TODO make generic + TEST(WiredTigerRecordStoreTest, OplogHack) { + WiredTigerHarnessHelper harnessHelper; + scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.oplog.foo")); + scoped_ptr<OperationContext> opCtx(harnessHelper.newOperationContext()); + + // always illegal + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,-1))).getStatus(), + ErrorCodes::BadValue); + + ASSERT_EQ(insertBSON(opCtx, rs, BSON("not_ts" << OpTime(2,1))).getStatus(), + ErrorCodes::BadValue); + + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << "not an OpTime")).getStatus(), + ErrorCodes::BadValue); + + // currently dasserts + // ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(-2,1))).getStatus(), + // ErrorCodes::BadValue); + + // success cases + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(1,1))).getValue(), + DiskLoc(1,1)); + + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(1,2))).getValue(), + DiskLoc(1,2)); + + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,2))).getValue(), + DiskLoc(2,2)); + + // fails because <= highest + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,1))).getStatus(), + ErrorCodes::BadValue); + + ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,2))).getStatus(), + ErrorCodes::BadValue); + + + // find start + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(0,1)), DiskLoc()); // nothing <= + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,1)), DiskLoc(1,2)); // between + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,2)), DiskLoc(2,2)); // == + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(2,2)); // > highest + + rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(2,2), false); // no-op + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(2,2)); + + rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(1,2), false); // deletes 2,2 + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(1,2)); + + rs->temp_cappedTruncateAfter(opCtx.get(), DiskLoc(1,2), true); // deletes 1,2 + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc(1,1)); + + { + WriteUnitOfWork wuow(opCtx.get()); + ASSERT_OK(rs->truncate(opCtx.get())); // deletes 1,1 and leaves collection empty + wuow.commit(); + } + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(2,3)), DiskLoc()); + } + + TEST(WiredTigerRecordStoreTest, OplogHackOnNonOplog) { + WiredTigerHarnessHelper harnessHelper; + scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.NOT_oplog.foo")); + scoped_ptr<OperationContext> opCtx(harnessHelper.newOperationContext()); + + ASSERT_OK(insertBSON(opCtx, rs, BSON("ts" << OpTime(2,-1))).getStatus()); + ASSERT_EQ(rs->oplogStartHack(opCtx.get(), DiskLoc(0,1)), DiskLoc().setInvalid()); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp new file mode 100644 index 00000000000..5f7694c8091 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -0,0 +1,268 @@ +// wiredtiger_recovery_unit.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include <boost/thread/condition.hpp> +#include <boost/thread/mutex.hpp> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/log.h" +#include "mongo/util/stacktrace.h" + +namespace mongo { + + namespace { + struct AwaitCommitData { + AwaitCommitData() : + numWaitingForSync(0), + lastSyncTime(0) { + } + + void syncHappend() { + boost::mutex::scoped_lock lk( mutex ); + lastSyncTime++; + condvar.notify_all(); + } + + // return true if happened + bool awaitCommit() { + boost::mutex::scoped_lock lk( mutex ); + long long start = lastSyncTime; + numWaitingForSync.fetchAndAdd(1); + condvar.timed_wait(lk,boost::posix_time::milliseconds(50)); + numWaitingForSync.fetchAndAdd(-1); + return lastSyncTime > start; + } + + AtomicUInt32 numWaitingForSync; + + boost::mutex mutex; // this just protects lastSyncTime + boost::condition condvar; + long long lastSyncTime; + } awaitCommitData; + } + + WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc) : + _sessionCache( sc ), + _session( NULL ), + _depth(0), + _active( false ), + _everStartedWrite( false ), + _currentlySquirreled( false ), + _syncing( false ) { + } + + WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() { + invariant( _depth == 0 ); + _abort(); + if ( _session ) { + _sessionCache->releaseSession( _session ); + _session = NULL; + } + } + + void WiredTigerRecoveryUnit::reportState( BSONObjBuilder* b ) const { + b->append( "wt_depth", _depth ); + b->append( "wt_active", _active ); + b->append( "wt_everStartedWrite", _everStartedWrite ); + if ( _active ) + b->append( "wt_millisSinceCommit", _timer.millis() ); + } + + void WiredTigerRecoveryUnit::_commit() { + if ( _session && _active ) { + _txnClose( true ); + } + + for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { + (*it)->commit(); + } + _changes.clear(); + } + + void WiredTigerRecoveryUnit::_abort() { + if ( _session && _active ) { + _txnClose( false ); + } + + for (Changes::reverse_iterator it = _changes.rbegin(), end = _changes.rend(); + it != end; ++it) { + (*it)->rollback(); + } + _changes.clear(); + } + + void WiredTigerRecoveryUnit::beginUnitOfWork() { + invariant( !_currentlySquirreled ); + _depth++; + _everStartedWrite = true; + } + + void WiredTigerRecoveryUnit::commitUnitOfWork() { + if (_depth > 1) + return; // only outermost WUOW gets committed. + _commit(); + } + + void WiredTigerRecoveryUnit::endUnitOfWork() { + _depth--; + if ( _depth == 0 ) { + _abort(); + } + } + + void WiredTigerRecoveryUnit::goingToAwaitCommit() { + if ( _active ) { + // too late, can't change config + return; + } + // yay, we've configured ourselves for sync + _syncing = true; + } + + bool WiredTigerRecoveryUnit::awaitCommit() { + if ( _syncing && _everStartedWrite ) { + // we did a sync, so we're good + return true; + } + awaitCommitData.awaitCommit(); + return true; + } + + void WiredTigerRecoveryUnit::registerChange(Change* change) { + invariant(_depth > 0); + _changes.push_back(ChangePtr(change)); + } + + WiredTigerRecoveryUnit* WiredTigerRecoveryUnit::get(OperationContext *txn) { + invariant( txn ); + return dynamic_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit()); + } + + WiredTigerSession* WiredTigerRecoveryUnit::getSession() { + if ( !_session ) { + _session = _sessionCache->getSession(); + } + + if ( !_active ) { + _txnOpen(); + } + return _session; + } + + void WiredTigerRecoveryUnit::commitAndRestart() { + invariant( _depth == 0 ); + if ( _active ) { + _txnClose( true ); + } + } + + void WiredTigerRecoveryUnit::_txnClose( bool commit ) { + invariant( _active ); + WT_SESSION *s = _session->getSession(); + if ( commit ) { + invariantWTOK( s->commit_transaction(s, NULL) ); + LOG(2) << "WT commit_transaction"; + if ( _syncing ) + awaitCommitData.syncHappend(); + } + else { + invariantWTOK( s->rollback_transaction(s, NULL) ); + LOG(2) << "WT rollback_transaction"; + } + _active = false; + } + + void WiredTigerRecoveryUnit::_txnOpen() { + invariant( !_active ); + WT_SESSION *s = _session->getSession(); + _syncing = _syncing || awaitCommitData.numWaitingForSync.load() > 0; + invariantWTOK( s->begin_transaction(s, _syncing ? "sync=true" : NULL) ); + LOG(2) << "WT begin_transaction"; + _timer.reset(); + _active = true; + } + + void WiredTigerRecoveryUnit::beingReleasedFromOperationContext() { + LOG(2) << "WiredTigerRecoveryUnit::beingReleased"; + _currentlySquirreled = true; + if ( !wt_keeptxnopen() ) { + _commit(); + } + } + void WiredTigerRecoveryUnit::beingSetOnOperationContext() { + LOG(2) << "WiredTigerRecoveryUnit::broughtBack"; + _currentlySquirreled = false; + } + + + // --------------------- + + WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t id, WiredTigerRecoveryUnit* ru) { + _init( uri, id, ru ); + } + + WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t id, OperationContext* txn) { + _init( uri, id, WiredTigerRecoveryUnit::get( txn ) ); + } + + void WiredTigerCursor::_init( const std::string& uri, uint64_t id, WiredTigerRecoveryUnit* ru ) { + _uriID = id; + _ru = ru; + _session = _ru->getSession(); + _cursor = _session->getCursor( uri, id ); + if ( !_cursor ) { + error() << "no cursor for uri: " << uri; + } + } + + WiredTigerCursor::~WiredTigerCursor() { + invariant( _session == _ru->getSession() ); + _session->releaseCursor( _uriID, _cursor ); + _cursor = NULL; + } + + WT_CURSOR* WiredTigerCursor::get() const { + invariant( _session == _ru->getSession() ); + return _cursor; + } + + void WiredTigerCursor::reset() { + invariantWTOK( _cursor->reset( _cursor ) ); + } + + WT_SESSION* WiredTigerCursor::getWTSession() { + return _session->getSession(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h new file mode 100644 index 00000000000..53af1d01b22 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -0,0 +1,137 @@ +// wiredtiger_recovery_unit.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <wiredtiger.h> + +#include <memory.h> + +#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> + +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/recovery_unit.h" +#include "mongo/util/timer.h" + +namespace mongo { + + class BSONObjBuilder; + class WiredTigerSession; + class WiredTigerSessionCache; + + class WiredTigerRecoveryUnit : public RecoveryUnit { + public: + WiredTigerRecoveryUnit(WiredTigerSessionCache* sc); + + virtual ~WiredTigerRecoveryUnit(); + + virtual void reportState( BSONObjBuilder* b ) const; + + virtual void beginUnitOfWork(); + + virtual void commitUnitOfWork(); + + virtual void endUnitOfWork(); + + virtual bool awaitCommit(); + virtual void goingToAwaitCommit(); + + virtual void registerChange(Change *); + + virtual void beingReleasedFromOperationContext(); + virtual void beingSetOnOperationContext(); + + virtual void commitAndRestart(); + + // un-used API + virtual void* writingPtr(void* data, size_t len) { invariant(!"don't call writingPtr"); } + virtual void syncDataAndTruncateJournal() {} + + // ---- WT STUFF + + WiredTigerSession* getSession(); + WiredTigerSessionCache* getSessionCache() { return _sessionCache; } + + bool everStartedWrite() const { return _everStartedWrite; } + int depth() const { return _depth; } + + static WiredTigerRecoveryUnit* get(OperationContext *txn); + + private: + + void _abort(); + void _commit(); + + void _txnClose( bool commit ); + void _txnOpen(); + + WiredTigerSessionCache* _sessionCache; // not owned + WiredTigerSession* _session; // owned, but from pool + bool _defaultCommit; + int _depth; + bool _active; + bool _everStartedWrite; + Timer _timer; + bool _currentlySquirreled; + bool _syncing; + + typedef boost::shared_ptr<Change> ChangePtr; + typedef std::vector<ChangePtr> Changes; + Changes _changes; + }; + + /** + * This is a smart pointer that wraps a WT_CURSOR and knows how to obtain and get from pool. + */ + class WiredTigerCursor { + public: + WiredTigerCursor(const std::string& uri, uint64_t uriID, OperationContext* txn); + WiredTigerCursor(const std::string& uri, uint64_t uriID, WiredTigerRecoveryUnit* ru); + ~WiredTigerCursor(); + + WT_CURSOR* get() const; + WT_CURSOR* operator->() const { return get(); } + + WiredTigerSession* getSession() { return _session; } + WT_SESSION* getWTSession(); + + void reset(); + + private: + void _init( const std::string& uri, uint64_t uriID, WiredTigerRecoveryUnit* ru ); + + uint64_t _uriID; + WiredTigerRecoveryUnit* _ru; // not owned + WiredTigerSession* _session; + WT_CURSOR* _cursor; // owned, but pulled + }; + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp new file mode 100644 index 00000000000..89370ac35b1 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.cpp @@ -0,0 +1,81 @@ +// wiredtiger_server_status.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_server_status.h" + +#include "boost/scoped_ptr.hpp" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + + using std::string; + + WiredTigerServerStatusSection::WiredTigerServerStatusSection(WiredTigerKVEngine* engine) + : ServerStatusSection("wiredtiger"), + _engine(engine) { } + + bool WiredTigerServerStatusSection::includeByDefault() const { + return true; + } + + BSONObj WiredTigerServerStatusSection::generateSection( + const BSONElement& configElement) const { + + boost::scoped_ptr<WiredTigerRecoveryUnit> recoveryUnit( + dynamic_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit())); + WiredTigerSession* session = recoveryUnit->getSession(); + WT_SESSION* s = session->getSession(); + invariant(s); + const string uri = "statistics:"; + + BSONObjBuilder bob; + Status status = WiredTigerUtil::exportTableToBSON(s, uri, + "statistics=(fast)", &bob); + if (!status.isOK()) { + bob.append("error", "unable to retrieve statistics"); + bob.append("code", static_cast<int>(status.code())); + bob.append("reason", status.reason()); + } + + return bob.obj(); + } + +} // namespace mongo + diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h new file mode 100644 index 00000000000..cc0cbb1a492 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_server_status.h @@ -0,0 +1,51 @@ +// wiredtiger_server_status.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/commands/server_status.h" + +namespace mongo { + + class WiredTigerKVEngine; + + /** + * Adds "wiredtiger" to the results of db.serverStatus(). + */ + class WiredTigerServerStatusSection : public ServerStatusSection { + public: + WiredTigerServerStatusSection(WiredTigerKVEngine* engine); + virtual bool includeByDefault() const; + virtual BSONObj generateSection(const BSONElement& configElement) const; + private: + WiredTigerKVEngine* _engine; + }; + +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp new file mode 100644 index 00000000000..a2e861047da --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -0,0 +1,184 @@ +// wiredtiger_session_cache.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/log.h" + +namespace mongo { + + WiredTigerSession::WiredTigerSession( WT_CONNECTION* conn, int epoch ) + : _epoch( epoch ), _session( NULL ), _cursorsOut( 0 ) { + int ret = conn->open_session(conn, NULL, "isolation=snapshot", &_session); + invariantWTOK(ret); + } + + WiredTigerSession::~WiredTigerSession() { + if (_session) { + int ret = _session->close(_session, NULL); + invariantWTOK(ret); + _session = NULL; + } + } + + WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id) { + { + Cursors& cursors = _curmap[id]; + if ( !cursors.empty() ) { + WT_CURSOR* save = cursors.back(); + cursors.pop_back(); + _cursorsOut++; + return save; + } + } + WT_CURSOR* c = NULL; + int ret = _session->open_cursor(_session, uri.c_str(), NULL, "overwrite=false", &c); + if (ret != ENOENT) + invariantWTOK(ret); + if ( c ) _cursorsOut++; + return c; + } + + void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR *cursor) { + invariant( _session ); + invariant( cursor ); + _cursorsOut--; + + Cursors& cursors = _curmap[id]; + if ( cursors.size() > 10u ) { + invariantWTOK( cursor->close(cursor) ); + } + else { + invariantWTOK( cursor->reset( cursor ) ); + cursors.push_back( cursor ); + } + } + + void WiredTigerSession::closeAllCursors() { + invariant( _session ); + for (CursorMap::iterator i = _curmap.begin(); i != _curmap.end(); ++i ) { + Cursors& cursors = i->second; + for ( size_t j = 0; j < cursors.size(); j++ ) { + WT_CURSOR *cursor = cursors[j]; + if (cursor) { + int ret = cursor->close(cursor); + invariantWTOK(ret); + } + } + } + _curmap.clear(); + } + + namespace { + AtomicUInt64 nextCursorId(1); + } + // static + uint64_t WiredTigerSession::genCursorId() { + return nextCursorId.fetchAndAdd(1); + } + + // ----------------------- + + WiredTigerSessionCache::WiredTigerSessionCache( WiredTigerKVEngine* engine ) + : _engine( engine ), _conn( engine->getConnection() ) { + } + + WiredTigerSessionCache::WiredTigerSessionCache( WT_CONNECTION* conn ) + : _engine( NULL ), _conn( conn ) { + } + + WiredTigerSessionCache::~WiredTigerSessionCache() { + _closeAll(); + } + + void WiredTigerSessionCache::closeAll() { + boost::mutex::scoped_lock lk( _sessionLock ); + _closeAll(); + } + + void WiredTigerSessionCache::_closeAll() { + for ( size_t i = 0; i < _sessionPool.size(); i++ ) { + delete _sessionPool[i]; + } + _sessionPool.clear(); + } + + WiredTigerSession* WiredTigerSessionCache::getSession() { + { + boost::mutex::scoped_lock lk( _sessionLock ); + if ( !_sessionPool.empty() ) { + WiredTigerSession* s = _sessionPool.back(); + _sessionPool.pop_back(); + { + WT_SESSION* ss = s->getSession(); + uint64_t range; + invariantWTOK( ss->transaction_pinned_range( ss, &range ) ); + invariant( range == 0 ); + } + return s; + } + } + return new WiredTigerSession( _conn, _engine ? _engine->currentEpoch() : -1 ); + } + + void WiredTigerSessionCache::releaseSession( WiredTigerSession* session ) { + invariant( session ); + invariant( session->cursorsOut() == 0 ); + + { + WT_SESSION* ss = session->getSession(); + uint64_t range; + invariantWTOK( ss->transaction_pinned_range( ss, &range ) ); + invariant( range == 0 ); + } + + if ( _engine && _engine->haveDropsQueued() && session->epoch() < _engine->currentEpoch() ) { + delete session; + _engine->dropAllQueued(); + return; + } + + boost::mutex::scoped_lock lk( _sessionLock ); + _sessionPool.push_back( session ); + } + + bool WiredTigerSessionCache::_shouldBeClosed( WiredTigerSession* session ) const { + if ( !_engine ) + return false; + if ( !_engine->haveDropsQueued() ) + return false; + return session->epoch() < _engine->currentEpoch(); + } +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h new file mode 100644 index 00000000000..9d49311a742 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -0,0 +1,108 @@ +// wiredtiger_session_cache.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> +#include <string> +#include <vector> + +#include <boost/thread/mutex.hpp> + +#include <wiredtiger.h> + +namespace mongo { + + class WiredTigerKVEngine; + + /** + * This is a structure that caches 1 cursor for each uri. + * The idea is that there is a pool of these somewhere. + * NOT THREADSAFE + */ + class WiredTigerSession { + public: + WiredTigerSession( WT_CONNECTION* conn, int epoch ); + ~WiredTigerSession(); + + WT_SESSION* getSession() const { return _session; } + + WT_CURSOR* getCursor(const std::string& uri, uint64_t id); + void releaseCursor(uint64_t id, WT_CURSOR *cursor); + + void closeAllCursors(); + + int cursorsOut() const { return _cursorsOut; } + + int epoch() const { return _epoch; } + + static uint64_t genCursorId(); + + /** + * For "metadata:" cursors. Guaranteed never to collide with genCursorId() ids. + */ + static const uint64_t kMetadataCursorId = 0; + + private: + int _epoch; + WT_SESSION* _session; // owned + typedef std::vector<WT_CURSOR*> Cursors; + typedef std::map<uint64_t, Cursors> CursorMap; + CursorMap _curmap; // owned + int _cursorsOut; + }; + + class WiredTigerSessionCache { + public: + + WiredTigerSessionCache( WiredTigerKVEngine* engine ); + WiredTigerSessionCache( WT_CONNECTION* conn ); + ~WiredTigerSessionCache(); + + WiredTigerSession* getSession(); + void releaseSession( WiredTigerSession* session ); + + void closeAll(); + + private: + + bool _shouldBeClosed( WiredTigerSession* session ) const; + + void _closeAll(); // does not lock + + WiredTigerKVEngine* _engine; // not owned, might be NULL + WT_CONNECTION* _conn; // not owned + typedef std::vector<WiredTigerSession*> SessionPool; + SessionPool _sessionPool; // owned + mutable boost::mutex _sessionLock; + }; + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp new file mode 100644 index 00000000000..91a6d9342d2 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp @@ -0,0 +1,212 @@ +// wiredtiger_size_storer.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include <wiredtiger.h> + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/util/log.h" + +namespace mongo { + + namespace { + int MAGIC = 123123; + } + + WiredTigerSizeStorer::WiredTigerSizeStorer() { + _magic = MAGIC; + } + + WiredTigerSizeStorer::~WiredTigerSizeStorer() { + _magic = 11111; + } + + void WiredTigerSizeStorer::_checkMagic() const { + if ( _magic == MAGIC ) + return; + log() << "WiredTigerSizeStorer magic wrong: " << _magic; + invariant( _magic == MAGIC ); + } + + void WiredTigerSizeStorer::onCreate( WiredTigerRecordStore* rs, + long long numRecords, long long dataSize ) { + _checkMagic(); + boost::mutex::scoped_lock lk( _entriesMutex ); + Entry& entry = _entries[rs->GetURI()]; + entry.rs = rs; + entry.numRecords = numRecords; + entry.dataSize = dataSize; + entry.dirty = true; + } + + void WiredTigerSizeStorer::onDestroy( WiredTigerRecordStore* rs ) { + _checkMagic(); + boost::mutex::scoped_lock lk( _entriesMutex ); + Entry& entry = _entries[rs->GetURI()]; + entry.numRecords = rs->numRecords( NULL ); + entry.dataSize = rs->dataSize( NULL ); + entry.dirty = true; + entry.rs = NULL; + } + + + void WiredTigerSizeStorer::store( const StringData& uri, + long long numRecords, long long dataSize ) { + _checkMagic(); + boost::mutex::scoped_lock lk( _entriesMutex ); + Entry& entry = _entries[uri.toString()]; + entry.numRecords = numRecords; + entry.dataSize = dataSize; + entry.dirty = true; + } + + void WiredTigerSizeStorer::load( const StringData& uri, + long long* numRecords, long long* dataSize ) const { + _checkMagic(); + boost::mutex::scoped_lock lk( _entriesMutex ); + Map::const_iterator it = _entries.find( uri.toString() ); + if ( it == _entries.end() ) { + *numRecords = 0; + *dataSize = 0; + return; + } + *numRecords = it->second.numRecords; + *dataSize = it->second.dataSize; + } + + void WiredTigerSizeStorer::loadFrom( WiredTigerSession* session, + const std::string& uri ) { + _checkMagic(); + + Map m; + { + WT_SESSION* s = session->getSession(); + WT_CURSOR* c = NULL; + int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); + if ( ret == ENOENT ) { + // doesn't exist, we'll create later + return; + } + invariantWTOK( ret ); + + while ( c->next(c) == 0 ) { + WT_ITEM key; + WT_ITEM value; + invariantWTOK( c->get_key(c, &key ) ); + invariantWTOK( c->get_value(c, &value ) ); + string uri( reinterpret_cast<const char*>( key.data ), key.size ); + BSONObj data( reinterpret_cast<const char*>( value.data ) ); + + LOG(2) << "WiredTigerSizeStorer::loadFrom " << uri << " -> " << data; + + Entry& e = m[uri]; + e.numRecords = data["numRecords"].safeNumberLong(); + e.dataSize = data["dataSize"].safeNumberLong(); + e.dirty = false; + e.rs = NULL; + } + invariantWTOK( c->close(c) ); + } + + boost::mutex::scoped_lock lk( _entriesMutex ); + _entries = m; + } + + void WiredTigerSizeStorer::storeInto( WiredTigerSession* session, + const std::string& uri ) { + Map myMap; + { + boost::mutex::scoped_lock lk( _entriesMutex ); + for ( Map::iterator it = _entries.begin(); it != _entries.end(); ++it ) { + string uri = it->first; + Entry& entry = it->second; + if ( entry.rs ) { + if ( entry.dataSize != entry.rs->dataSize( NULL ) ) { + entry.dataSize = entry.rs->dataSize( NULL ); + entry.dirty = true; + } + if ( entry.numRecords != entry.rs->numRecords( NULL ) ) { + entry.numRecords = entry.rs->numRecords( NULL ); + entry.dirty = true; + } + } + + if ( !entry.dirty ) + continue; + myMap[uri] = entry; + } + } + + WT_SESSION* s = session->getSession(); + WT_CURSOR* c = NULL; + int ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); + if ( ret == ENOENT ) { + invariantWTOK( s->create( s, uri.c_str(), "" ) ); + ret = s->open_cursor( s, uri.c_str(), NULL, NULL, &c ); + } + invariantWTOK( ret ); + + for ( Map::iterator it = myMap.begin(); it != myMap.end(); ++it ) { + string uri = it->first; + Entry& entry = it->second; + + BSONObj data; + { + BSONObjBuilder b; + b.append( "numRecords", entry.numRecords ); + b.append( "dataSize", entry.dataSize ); + data = b.obj(); + } + + LOG(2) << "WiredTigerSizeStorer::storeInto " << uri << " -> " << data; + + WiredTigerItem key( uri.c_str(), uri.size() ); + WiredTigerItem value( data.objdata(), data.objsize() ); + c->set_key( c, key.Get() ); + c->set_value( c, value.Get() ); + invariantWTOK( c->insert(c) ); + entry.dirty = false; + + c->reset(c); + } + + invariantWTOK( c->close(c) ); + + } + + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h new file mode 100644 index 00000000000..15df6874139 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h @@ -0,0 +1,81 @@ +// wiredtiger_size_storer.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> +#include <string> + +#include <boost/thread/mutex.hpp> + +#include "mongo/base/string_data.h" + +namespace mongo { + + class WiredTigerRecordStore; + class WiredTigerSession; + + class WiredTigerSizeStorer { + public: + WiredTigerSizeStorer(); + ~WiredTigerSizeStorer(); + + void onCreate( WiredTigerRecordStore* rs, long long nr, long long ds ); + void onDestroy( WiredTigerRecordStore* rs ); + + void store( const StringData& uri, + long long numRecords, long long dataSize ); + + void load( const StringData& uri, + long long* numRecords, long long* dataSize ) const; + + void loadFrom( WiredTigerSession* cursor, const std::string& uri ); + void storeInto( WiredTigerSession* cursor, const std::string& uri ); + + private: + void _checkMagic() const; + + struct Entry { + Entry() : numRecords(0), dataSize(0), dirty(false), rs(NULL){} + long long numRecords; + long long dataSize; + bool dirty; + WiredTigerRecordStore* rs; // not owned + }; + + int _magic; + + typedef std::map<std::string,Entry> Map; + Map _entries; + mutable boost::mutex _entriesMutex; + }; + +} diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp new file mode 100644 index 00000000000..b67e4870a8f --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp @@ -0,0 +1,137 @@ +// wiredtiger_util.cpp + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/wiredtiger/wiredtiger_util.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + + using std::string; + + int64_t WiredTigerUtil::getIdentSize(WT_SESSION* s, + const std::string& uri ) { + BSONObjBuilder b; + Status status = WiredTigerUtil::exportTableToBSON(s, + "statistics:" + uri, + "statistics=(fast)", + &b); + if ( !status.isOK() ) { + if ( status.code() == ErrorCodes::CursorNotFound ) { + // ident gone, so its 0 + return 0; + } + uassertStatusOK( status ); + } + + BSONObj obj = b.obj(); + BSONObj sub = obj["block manager"].Obj(); + BSONElement e = sub["file size in bytes"]; + invariant( e.type() ); + + if ( e.isNumber() ) + return e.safeNumberLong(); + + return strtoull( e.valuestrsafe(), NULL, 10 ); + } + + + Status WiredTigerUtil::exportTableToBSON(WT_SESSION* s, + const std::string& uri, const std::string& config, + BSONObjBuilder* bob) { + invariant(s); + invariant(bob); + WT_CURSOR* c = NULL; + const char *cursorConfig = config.empty() ? NULL : config.c_str(); + int ret = s->open_cursor(s, uri.c_str(), NULL, cursorConfig, &c); + if (ret != 0) { + return Status(ErrorCodes::CursorNotFound, str::stream() + << "unable to open cursor at URI " << uri + << ". reason: " << wiredtiger_strerror(ret)); + } + bob->append("uri", uri); + invariant(c); + ON_BLOCK_EXIT(c->close, c); + + std::map<string,BSONObjBuilder*> subs; + const char *desc, *pvalue; + uint64_t value; + while (c->next(c) == 0 && c->get_value(c, &desc, &pvalue, &value) == 0) { + StringData key( desc ); + + StringData prefix; + StringData suffix; + + size_t idx = key.find( ':' ); + if ( idx != string::npos ) { + prefix = key.substr( 0, idx ); + suffix = key.substr( idx + 1 ); + } + else { + idx = key.find( ' ' ); + } + + if ( idx != string::npos ) { + prefix = key.substr( 0, idx ); + suffix = key.substr( idx + 1 ); + } + else { + prefix = key; + suffix = "num"; + } + + if ( prefix.size() == 0 ) { + bob->append(desc, pvalue); + } + else { + BSONObjBuilder*& sub = subs[prefix.toString()]; + if ( !sub ) + sub = new BSONObjBuilder(); + sub->append( mongoutils::str::ltrim(suffix.toString()), pvalue ); + } + } + + for ( std::map<string,BSONObjBuilder*>::const_iterator it = subs.begin(); + it != subs.end(); ++it ) { + const std::string& s = it->first; + bob->append( s, it->second->obj() ); + delete it->second; + } + return Status::OK(); + } + +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.h b/src/mongo/db/storage/wiredtiger/wiredtiger_util.h new file mode 100644 index 00000000000..5de65c30b43 --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.h @@ -0,0 +1,139 @@ +// wiredtiger_util.h + +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <wiredtiger.h> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/stacktrace.h" + +namespace mongo { + + class BSONObjBuilder; + + inline bool wt_keeptxnopen() { + return false; + } + + /** + * converts wiredtiger return codes to mongodb statuses. + */ + inline Status wtRCToStatus(int retCode) { + if (MONGO_likely(retCode == 0)) + return Status::OK(); + + + if ( retCode == WT_ROLLBACK ) { + //printStackTrace(); + throw WriteConflictException(); + } + + // TODO convert specific codes rather than just using UNKNOWN_ERROR for everything. + return Status(ErrorCodes::UnknownError, + str::stream() << retCode << ": " << wiredtiger_strerror(retCode)); + + } + + inline void invariantWTOK(int retCode) { + if (MONGO_likely(retCode == 0)) + return; + + fassertFailedWithStatus(28519, wtRCToStatus(retCode)); + } + + struct WiredTigerItem : public WT_ITEM { + WiredTigerItem(const void *d, size_t s) { + data = d; + size = s; + } + WiredTigerItem(const std::string &str) { + data = str.c_str(); + size = str.size(); + } + // NOTE: do not call Get() on a temporary. + // The pointer returned by Get() must not be allowed to live longer than *this. + WT_ITEM *Get() { return this; } + const WT_ITEM *Get() const { return this; } + }; + + class WiredTigerUtil { + MONGO_DISALLOW_COPYING(WiredTigerUtil); + private: + WiredTigerUtil(); + + public: + + /** + * Reads contents of table using URI and exports all keys to BSON as string elements. + * Additional, adds 'uri' field to output document. + */ + static Status exportTableToBSON(WT_SESSION* s, + const std::string& uri, const std::string& config, + BSONObjBuilder* bob); + + static int64_t getIdentSize(WT_SESSION* s, + const std::string& uri ); + }; + + class WiredTigerConfigParser { + MONGO_DISALLOW_COPYING(WiredTigerConfigParser); + public: + WiredTigerConfigParser(const StringData& config) { + invariantWTOK(wiredtiger_config_parser_open(NULL, config.rawData(), config.size(), + &_parser)); + } + + WiredTigerConfigParser(const WT_CONFIG_ITEM& nested) { + invariant(nested.type == WT_CONFIG_ITEM::WT_CONFIG_ITEM_STRUCT); + invariantWTOK(wiredtiger_config_parser_open(NULL, nested.str, nested.len, &_parser)); + } + + ~WiredTigerConfigParser() { + invariantWTOK(_parser->close(_parser)); + } + + int next(WT_CONFIG_ITEM* key, WT_CONFIG_ITEM* value) { + return _parser->next(_parser, key, value); + } + + int get(const char* key, WT_CONFIG_ITEM* value) { + return _parser->get(_parser, key, value); + } + + private: + WT_CONFIG_PARSER* _parser; + }; + +} diff --git a/src/third_party/SConscript b/src/third_party/SConscript index 7ad9fae9f32..91a74add572 100644 --- a/src/third_party/SConscript +++ b/src/third_party/SConscript @@ -1,6 +1,7 @@ # -*- mode: python -*- Import("env use_system_version_of_library windows darwin usev8 v8suffix solaris boostSuffix") +Import("wiredtiger") snappySuffix = '-1.1.2' @@ -33,6 +34,13 @@ if not use_system_version_of_library('stemmer'): thirdPartyIncludePathList.append( ('stemmer', '#/src/third_party/libstemmer_c/include')) +# Note that the wiredtiger header is generated, so +# we want to look for it in the build directory not +# the source directory. +if wiredtiger and not use_system_version_of_library('wiredtiger'): + thirdPartyIncludePathList.append( + ('wiredtiger', '$BUILD_DIR/third_party/wiredtiger')) + if not use_system_version_of_library('yaml'): thirdPartyIncludePathList.append( ('yaml', '#/src/third_party/yaml-cpp-0.5.1/include')) @@ -236,3 +244,26 @@ tzEnv.Library( source=[ 'shim_tz.cpp', ]) + + +if wiredtiger: + if use_system_version_of_library("wiredtiger"): + wiredtigerEnv = env.Clone( + SYSLIBDEPS=[ + env['LIBDEPS_WIREDTIGER_SYSLIBDEP'], + ]) + else: + wiredtigerEnv = env.Clone() + wiredtigerEnv.InjectThirdPartyIncludePaths(libraries=['wiredtiger']) + wiredtigerEnv.SConscript('wiredtiger/SConscript', exports={ 'env' : wiredtigerEnv }) + wiredtigerEnv = wiredtigerEnv.Clone( + LIBDEPS=[ + 'wiredtiger/wiredtiger', + ]) + + wiredtigerEnv.Library( + target="shim_wiredtiger", + source=[ + 'shim_wiredtiger.cpp' + ]) + diff --git a/src/third_party/shim_wiredtiger.cpp b/src/third_party/shim_wiredtiger.cpp new file mode 100644 index 00000000000..ab1cc40aea1 --- /dev/null +++ b/src/third_party/shim_wiredtiger.cpp @@ -0,0 +1,3 @@ +// This file intentionally blank. shim_wiredtiger.cpp is part of the +// third_party/wiredtiger library, which is just a placeholder for forwarding +// library dependencies. |