From cb9472afc30d32d1c18691d64899c1aa72cdc43d Mon Sep 17 00:00:00 2001 From: Justin Seyster Date: Wed, 29 Jun 2022 12:32:12 -0400 Subject: SERVER-65481 Bulk shredding and loading for column store indexes --- jstests/noPassthrough/column_store_index_load.js | 190 +++++++++++++ src/mongo/db/catalog/multi_index_block.cpp | 11 +- src/mongo/db/exec/sbe/SConscript | 2 +- src/mongo/db/index/SConscript | 43 +-- src/mongo/db/index/column_store_sorter.cpp | 297 +++++++++++++++++++++ src/mongo/db/index/column_store_sorter.h | 134 ++++++++++ src/mongo/db/index/column_store_sorter_test.cpp | 140 ++++++++++ src/mongo/db/index/columns_access_method.cpp | 119 +++++++-- src/mongo/db/index/index_access_method.cpp | 110 ++++---- src/mongo/db/index/index_access_method.h | 14 + src/mongo/db/sorter/sorter.cpp | 6 +- src/mongo/db/sorter/sorter.h | 13 +- .../storage/wiredtiger/wiredtiger_column_store.cpp | 28 +- .../db/storage/wiredtiger/wiredtiger_cursor.cpp | 28 ++ .../db/storage/wiredtiger/wiredtiger_cursor.h | 27 ++ .../db/storage/wiredtiger/wiredtiger_index.cpp | 60 +---- src/mongo/util/bufreader.h | 8 + 17 files changed, 1065 insertions(+), 165 deletions(-) create mode 100644 jstests/noPassthrough/column_store_index_load.js create mode 100644 src/mongo/db/index/column_store_sorter.cpp create mode 100644 src/mongo/db/index/column_store_sorter.h create mode 100644 src/mongo/db/index/column_store_sorter_test.cpp diff --git a/jstests/noPassthrough/column_store_index_load.js b/jstests/noPassthrough/column_store_index_load.js new file mode 100644 index 00000000000..757a2eeb73d --- /dev/null +++ b/jstests/noPassthrough/column_store_index_load.js @@ -0,0 +1,190 @@ + +/** + * Test that different methods of loading a column store index all produce the same valid results. + * Indexes are validated by comparing query results that use the index with results from a control + * query that uses a collection scan. + * @tags: [ + * # columnstore indexes are new in 6.1. + * requires_fcv_61, + * # We could potentially need to resume an index build in the event of a stepdown, which is not + * # yet implemented. + * does_not_support_stepdowns, + * # Columnstore indexes are incompatible with clustered collections. + * incompatible_with_clustered_collection, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/analyze_plan.js"); +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. + +const mongod = MongoRunner.runMongod({}); +const db = mongod.getDB("test"); + +const columnStoreEnabled = + checkSBEEnabled(db, ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"]); +if (!columnStoreEnabled) { + jsTestLog("Skipping column store bulk load test test since the feature flag is not enabled."); + MongoRunner.stopMongod(mongod); + return; +} + +// +// Create test documents. +// + +const testValues = [ + {foo: 1, bar: 2}, + {bar: 2, baz: 3}, + {foo: 3, baz: 4}, + {foo: 5, bar: 6}, + {bar: 7, baz: [7, 8]}, +]; + +// We create our test documents by choosing k-permutations from the 'testValues' array. The +// kPermutations() function returns an array of all possible permutations with 'k' choices from all +// values in the 'arr' input. The 'choices' input stores the values chosen at previous levels of +// recursion. +function kPermutations(arr, n, choices = []) { + if (n == 0) { + return [choices]; + } + + const permutations = []; + for (let i = 0; i < arr.length; ++i) { + const subSequence = arr.slice(0, i).concat(arr.slice(i + 1)); + permutations.push(...kPermutations(subSequence, n - 1, choices.concat([arr[i]]))); + } + return permutations; +} + +const testDocs = kPermutations(testValues, 4).map((permutation, idx) => ({ + idx: idx, + foo: [permutation[0], permutation[1]], + bar: [permutation[2], permutation[3]] + })); + +// Test queries use a projection that includes every possible leaf field. Projections on fields that +// have sub-documents fall back to the row store, which would not serve to validate the contents of +// the index. +const testProjection = { + _id: 0, + idx: 1, + "foo.foo": 1, + "foo.bar": 1, + "foo.baz": 1, + "bar.foo": 1, + "bar.bar": 1, + "bar.baz": 1, +}; + +// The test query would normally not qualify for a column store index plan, because it projects a +// large number of fields. We raise the limit on the number of fields to allow column store plans +// for the purposes of this test. +db.adminCommand({ + setParameter: 1, + internalQueryMaxNumberOfFieldsToChooseUnfilteredColumnScan: Object.keys(testProjection).length +}); + +function loadDocs(coll, documents) { + const bulk = coll.initializeUnorderedBulkOp(); + for (const doc of documents) { + bulk.insert(doc); + } + assert.commandWorked(bulk.execute()); +} + +// +// We load the same documents into 4 collections: +// +// 1. a control collection with no index, +const noIndexColl = db.column_store_index_load_no_index; + +// 2. a collection whose column store index is populated with an in-memory bulk load, +const bulkLoadInMemoryColl = db.column_store_index_load_in_memory; + +// 3. a collection whose column store index is populated with a bulk load that uses an external +// merge sort (i.e., one that "spills" to disk), and +const bulkLoadExternalColl = db.column_store_index_load_external; + +// 4. a collection whose column store index is populated as documents are inserted. +const onlineLoadColl = db.column_store_index_online_load; + +// Load the control collection. +noIndexColl.drop(); +loadDocs(noIndexColl, testDocs); + +// Perform the in-memory bulk load. +bulkLoadInMemoryColl.drop(); +loadDocs(bulkLoadInMemoryColl, testDocs); +assert.commandWorked(bulkLoadInMemoryColl.createIndex({"$**": "columnstore"})); + +const statsAfterInMemoryBuild = assert.commandWorked(db.runCommand({serverStatus: 1})); +assert.docEq({ + count: NumberLong(1), + resumed: NumberLong(0), + filesOpenedForExternalSort: NumberLong(0), + filesClosedForExternalSort: NumberLong(0) +}, + statsAfterInMemoryBuild.indexBulkBuilder); + +// Perform the external bulk load. The server config won't allow a memory limit lower than 50MB, so +// we use a failpoint to set it lower than that for the purposes of this test. +bulkLoadExternalColl.drop(); +assert.commandWorked(db.adminCommand({ + configureFailPoint: "constrainMemoryForBulkBuild", + mode: "alwaysOn", + data: {maxBytes: 20000} +})); +loadDocs(bulkLoadExternalColl, testDocs); +assert.commandWorked(bulkLoadExternalColl.createIndex({"$**": "columnstore"})); + +const statsAfterExternalLoad = assert.commandWorked(db.runCommand({serverStatus: 1})); +assert.docEq({ + count: NumberLong(2), + resumed: NumberLong(0), + filesOpenedForExternalSort: NumberLong(1), + filesClosedForExternalSort: NumberLong(1) +}, + statsAfterExternalLoad.indexBulkBuilder); + +// Perfom the online load. +onlineLoadColl.drop(); +onlineLoadColl.createIndex({"$**": "columnstore"}); +loadDocs(onlineLoadColl, testDocs); + +// +// Verify that our test query uses the column store. +// +[bulkLoadInMemoryColl, bulkLoadExternalColl, onlineLoadColl].forEach(function(coll) { + const explain = coll.find({}, testProjection).sort({idx: 1}).explain(); + assert(planHasStage(db, explain, "COLUMN_SCAN"), explain); +}); + +// +// Run a query on each of the test collections, including the "no index" control collection. +// +const noIndexResults = noIndexColl.find({}, testProjection).sort({idx: 1}).toArray(); +const bulkLoadInMemoryResults = + bulkLoadInMemoryColl.find({}, testProjection).sort({idx: 1}).toArray(); +const bulkLoadExternalResults = + bulkLoadExternalColl.find({}, testProjection).sort({idx: 1}).toArray(); +const onlineLoadResults = onlineLoadColl.find({}, testProjection).sort({idx: 1}).toArray(); + +// +// Verify that the test query produces the same results in all test configurations. +// +assert.eq(testDocs.length, noIndexResults.length); +assert.eq(testDocs.length, bulkLoadInMemoryResults.length); +assert.eq(testDocs.length, bulkLoadExternalResults.length); +assert.eq(testDocs.length, onlineLoadResults.length); + +for (let i = 0; i < noIndexResults.length; ++i) { + assert.docEq(noIndexResults[i], bulkLoadInMemoryResults[i]); + assert.docEq(noIndexResults[i], bulkLoadExternalResults[i]); + assert.docEq(noIndexResults[i], onlineLoadResults[i]); +} + +MongoRunner.stopMongod(mongod); +})(); diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index 8a66be4f602..bce71c5496b 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -68,6 +68,7 @@ namespace mongo { +MONGO_FAIL_POINT_DEFINE(constrainMemoryForBulkBuild); MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpIndexBuild); MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpIndexBuildUnlocked); MONGO_FAIL_POINT_DEFINE(hangAfterStartingIndexBuild); @@ -83,8 +84,16 @@ size_t getEachIndexBuildMaxMemoryUsageBytes(size_t numIndexSpecs) { return 0; } - return static_cast(maxIndexBuildMemoryUsageMegabytes.load()) * 1024 * 1024 / + auto result = static_cast(maxIndexBuildMemoryUsageMegabytes.load()) * 1024 * 1024 / numIndexSpecs; + + // When enabled by a test, this failpoint allows the test to set the maximum allowed memory for + // an index build to an unreasonably low value that is below what the user configuration will + // allow. + constrainMemoryForBulkBuild.execute( + [&](const BSONObj& data) { result = data["maxBytes"].numberLong(); }); + + return result; } Status timeseriesMixedSchemaDataFailure(const Collection* collection) { diff --git a/src/mongo/db/exec/sbe/SConscript b/src/mongo/db/exec/sbe/SConscript index 485ab55fa82..859cf09da2f 100644 --- a/src/mongo/db/exec/sbe/SConscript +++ b/src/mongo/db/exec/sbe/SConscript @@ -219,7 +219,7 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/concurrency/lock_manager', - '$BUILD_DIR/mongo/db/index/columnar_index', + '$BUILD_DIR/mongo/db/index/column_store_index', '$BUILD_DIR/mongo/db/query/collation/collator_interface_mock', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/db/service_context_test_fixture', diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 9ccff04ff89..f71d78da857 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -84,9 +84,9 @@ env.Library( ], ) -serveronlyEnv = env.Clone() -serveronlyEnv.InjectThirdParty(libraries=['snappy']) -serveronlyEnv.Library( +iamEnv = env.Clone() +iamEnv.InjectThirdParty(libraries=['snappy']) +iamEnv.Library( target="index_access_method", source=[ 'duplicate_key_tracker.cpp', @@ -119,6 +119,22 @@ serveronlyEnv.Library( ], ) +iamEnv.Library( + target="column_store_index", + source=[ + 'column_cell.cpp', + "column_store_sorter.cpp", + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/sorter/sorter_idl', + '$BUILD_DIR/mongo/db/storage/encryption_hooks', + '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/s/is_mongos', + '$BUILD_DIR/third_party/shim_snappy', + ], +) + env.Library( target="index_access_methods", source=[ @@ -137,31 +153,26 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/concurrency/exception_util', + '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/fts/base_fts', '$BUILD_DIR/mongo/db/index_names', - 'columnar_index', + 'column_store_index', 'expression_params', 'key_generator', ], ) -env.Library( - target="columnar_index", - source=[ - 'column_cell.cpp', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', - ], -) - -env.CppUnitTest( +indexTestEnv = env.Clone() +indexTestEnv.InjectThirdParty(libraries=['snappy']) +indexTestEnv.CppUnitTest( target='db_index_test', source=[ '2d_key_generator_test.cpp', 'btree_key_generator_test.cpp', 'column_cell_test.cpp', 'column_key_generator_test.cpp', + 'column_store_sorter_test.cpp', 'hash_key_generator_test.cpp', 's2_key_generator_test.cpp', 's2_bucket_key_generator_test.cpp', @@ -177,7 +188,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/query/sort_pattern', '$BUILD_DIR/mongo/db/record_id_helpers', - 'columnar_index', + 'column_store_index', 'expression_params', 'key_generator', ], diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp new file mode 100644 index 00000000000..f708518a560 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter.cpp @@ -0,0 +1,297 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex + +#include "mongo/platform/basic.h" + +#include "mongo/db/index/column_store_sorter.h" + +namespace mongo { +struct ComparisonForPathAndRid { + int operator()(const std::pair& left, + const std::pair& right) const { + auto stringComparison = left.first.path.compare(right.first.path); + return (stringComparison != 0) ? stringComparison + : left.first.recordId.compare(right.first.recordId); + } +}; + +bool ColumnStoreSorter::Key::operator<(const Key& other) const { + if (auto cmp = path.compare(other.path); cmp != 0) { + return cmp < 0; + } else { + return recordId < other.recordId; + } +} + +void ColumnStoreSorter::Key::serializeForSorter(BufBuilder& buf) const { + buf.appendStr(path); + recordId.serializeToken(buf); +} + +ColumnStoreSorter::Key ColumnStoreSorter::Key::deserializeForSorter( + BufReader& buf, ColumnStoreSorter::Key::SorterDeserializeSettings) { + // Note: unlike function call parameters, the order of evaluation for initializer + // parameters is defined. + return {buf.readCStr(), RecordId::deserializeToken(buf)}; +} + +void ColumnStoreSorter::Value::serializeForSorter(BufBuilder& buf) const { + buf.appendNum(uint32_t(cell.size())); // Little-endian write + buf.appendBuf(cell.rawData(), cell.size()); +} + +ColumnStoreSorter::Value ColumnStoreSorter::Value::deserializeForSorter( + BufReader& buf, ColumnStoreSorter::Value::SorterDeserializeSettings) { + size_t cellSize = buf.read>(); + return Value{buf.readBytes(cellSize)}; +} + +ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes, + StringData dbName, + SorterFileStats* stats) + : _dbName(dbName.toString()), + _stats(stats), + _maxMemoryUsageBytes(maxMemoryUsageBytes), + _spillFile(std::make_shared::File>(pathForNewSpillFile(), _stats)) {} + +void ColumnStoreSorter::add(PathView path, RecordId recordId, CellView cellContents) { + auto& cellListAtPath = _dataByPath[path]; + if (cellListAtPath.empty()) { + // Track memory usage of this new path. + _memUsed += sizeof(StringMap::value_type) + path.size(); + } + + // The sorter assumes that RecordIds are added in sorted order. + tassert(6548102, + "Out-of-order record during columnar index build", + cellListAtPath.empty() || cellListAtPath.back().first < recordId); + + cellListAtPath.emplace_back(recordId, CellValue(cellContents.rawData(), cellContents.size())); + _memUsed += cellListAtPath.back().first.memUsage() + sizeof(CellValue) + + cellListAtPath.back().second.size(); + if (_memUsed > _maxMemoryUsageBytes) { + spill(); + } +} + +namespace { +std::string tempDir() { + return str::stream() << storageGlobalParams.dbpath << "/_tmp"; +} +} // namespace + +SortOptions ColumnStoreSorter::makeSortOptions(const std::string& dbName, SorterFileStats* stats) { + return SortOptions().TempDir(tempDir()).ExtSortAllowed().FileStats(stats).DBName(dbName); +} + +std::string ColumnStoreSorter::pathForNewSpillFile() { + static AtomicWord fileNameCounter; + static const uint64_t randomSuffix = static_cast(SecureRandom().nextInt64()); + return str::stream() << tempDir() << "/ext-sort-column-store-index." + << fileNameCounter.fetchAndAdd(1) << "-" << randomSuffix; +} + +void ColumnStoreSorter::spill() { + if (_dataByPath.empty()) { + return; + } + ++_numSpills; + + SortedFileWriter writer(makeSortOptions(_dbName, _stats), _spillFile, {}); + + // Cells loaded into memory are sorted by record id but not yet sorted by path. We perform that + // sort now, so that we can output cells sorted by (path, rid) for later consumption by our + // standard external merge implementation: SortIteratorInterface::merge(). + std::vector::value_type*> sortedPathList; + sortedPathList.reserve(_dataByPath.size()); + for (auto&& pathWithCellVector : _dataByPath) { + sortedPathList.push_back(&pathWithCellVector); + } + std::sort(sortedPathList.begin(), sortedPathList.end(), [](auto left, auto right) { + return left->first < right->first; + }); + + size_t currentChunkSize = 0; + for (auto&& pathWithCellVector : sortedPathList) { + auto& [path, cellVector] = *pathWithCellVector; + + size_t cellVectorSize = std::accumulate( + cellVector.begin(), cellVector.end(), 0, [& path = path](size_t sum, auto& ridAndCell) { + return sum + path.size() + ridAndCell.first.memUsage() + ridAndCell.second.size(); + }); + + // Add (path, rid, cell) records to the spill file so that the first cell in each contiguous + // run of cells with the same path lives in its own chunk. E.g.: + // Path1, rid1, Cell contents + // CHUNK BOUNDARY + // Path1, rid2, Cell Contents + // ... + // Path1, ridN, Cell Contents + // CHUNK BOUNDARY + // Path2, rid1, Cell Contents + // CHUNK BOUNDARY + // Path2, rid2, Cell Contents + // ... + // + // During merging, file readers will hold one chunk from each spill file in memory, so + // optimizing chunk size can reduce memory usage during the merge. Merging for a column + // store index is a special case: because the sorter is loaded in RecordId order, all the + // cells from this spill are guaranteed to merge together, with no interleaving cells from + // other spill files. + // + // This layout will result in a merger that holds a single cell from each leg of the merge + // representing the first in a large contiguous range. Once that cell gets picked, the merge + // will consume all chunks at that path in that file before moving on to the next file or + // the next path. + // + // To avoid the pathological case where runs are very short, we don't force a chunk boundary + // when a run of cells would not result in a chunk greater than 1024 bytes. + const size_t kShortChunkThreshold = 1024; + bool writeBoundaryAfterAdd = (currentChunkSize + cellVectorSize) > kShortChunkThreshold; + if (writeBoundaryAfterAdd) { + // Add the chunk boundary just before the first cell with this path name. + writer.writeChunk(); + currentChunkSize = 0; + } + for (auto ridAndCell : cellVector) { + const auto& cell = ridAndCell.second; + currentChunkSize += path.size() + ridAndCell.first.memUsage() + cell.size(); + writer.addAlreadySorted(Key{path, ridAndCell.first}, + Value{CellView{cell.c_str(), cell.size()}}); + + if (writeBoundaryAfterAdd) { + // Add the chunk boundary just after the first cell with this path name, giving it + // its own chunk. + writer.writeChunk(); + writeBoundaryAfterAdd = false; + currentChunkSize = 0; + } + } + } + + _spilledFileIterators.emplace_back(writer.done()); + + _dataByPath.clear(); + _memUsed = 0; +} + +ColumnStoreSorter::Iterator* ColumnStoreSorter::done() { + invariant(!std::exchange(_done, true)); + + if (_spilledFileIterators.size() == 0) { + return inMemoryIterator(); + } + + spill(); + return SortIteratorInterface::merge( + _spilledFileIterators, makeSortOptions(_dbName, _stats), ComparisonForPathAndRid()); +} + +/** + * This iterator "unwinds" our path -> CellVector mapping into sorted tuples of (path name, + * recordId, cell), with the path name and recordId bundled into a single "key." The unwinding + * proceeds using an outer iterator over the paths and an inner iterator for the current CellVector. + * The outer iterator uses a separate path list that gets sorted when the 'InMemoryIterator' is + * initialized. The inner iterator directly traverses the CellVector, which is already sorted. + */ +class ColumnStoreSorter::InMemoryIterator final : public ColumnStoreSorter::Iterator { +public: + InMemoryIterator(const StringMap& dataByPath) { + // Cells loaded into memory are sorted by record id but now yet by path. Sorting by path + // finalizes the sort algorithm. + _sortedPathList.reserve(dataByPath.size()); + for (const auto& pathWithCellVector : dataByPath) { + _sortedPathList.push_back(&pathWithCellVector); + } + std::sort(_sortedPathList.begin(), _sortedPathList.end(), [](auto left, auto right) { + return left->first < right->first; + }); + + _pathIt = _sortedPathList.begin(); + if (_pathIt != _sortedPathList.end()) { + _cellVectorIt = (*_pathIt)->second.begin(); + } + } + + bool more() final { + return _pathIt != _sortedPathList.end(); + } + + std::pair next() final { + Key key{(*_pathIt)->first, _cellVectorIt->first}; + + Value contents{_cellVectorIt->second}; + + ++_cellVectorIt; + while (_cellVectorIt == (*_pathIt)->second.end() && ++_pathIt != _sortedPathList.end()) { + _cellVectorIt = (*_pathIt)->second.begin(); + } + + return {key, contents}; + } + + const std::pair& current() final { + tasserted(ErrorCodes::NotImplemented, + "current() not implemented for ColumnStoreSorter::Iterator"); + } + + void openSource() final {} + + void closeSource() final {} + +private: + std::vector::value_type*> _sortedPathList; + + decltype(_sortedPathList)::const_iterator _pathIt; + CellVector::const_iterator _cellVectorIt; +}; + +ColumnStoreSorter::Iterator* ColumnStoreSorter::inMemoryIterator() const { + return new InMemoryIterator(_dataByPath); +} +} // namespace mongo + +namespace { +/** + * A 'nextFilename()' is required for the below "sorter.cpp" include to compile, but this file does + * not use any of the 'Sorter' classes that call it. + */ +std::string nextFileName() { + MONGO_UNREACHABLE; +} +} // namespace + +#undef MONGO_LOGV2_DEFAULT_COMPONENT +#include "mongo/db/sorter/sorter.cpp" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex +MONGO_CREATE_SORTER(mongo::ColumnStoreSorter::Key, + mongo::ColumnStoreSorter::Value, + mongo::ComparisonForPathAndRid); diff --git a/src/mongo/db/index/column_store_sorter.h b/src/mongo/db/index/column_store_sorter.h new file mode 100644 index 00000000000..833213b3646 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter.h @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/record_id.h" +#include "mongo/db/sorter/sorter.h" +#include "mongo/db/storage/column_store.h" + +namespace mongo { +/** + * Performs the organizing and sorting steps of a column store index bulk build, presenting an + * interface similar to the 'Sorter' interface. The client can add cells with the 'add()' method + * until none remain and then call 'done()' to get an iterator that returns the cells in sorted + * order. + * + * This class assumes that inputs are _already sorted_ by RecordId. Adding out-of-orders cells will + * result in undefined behavior. + * + * Internally, this class maintains a hash table that maps each path to a sorted list of + * (RecordId, CellValue) pairs. Because we use a hash table and not a sorted data structure (like + * std::map), we need to sort the list of paths when finalizing the output or when writing a spill + * file. The total number of cells inserted into this mapping is potentially very large, making it + * preferable to defer the cost of sorting to the end in order to avoid the cost of a binary tree + * traversal for each inserted cell. + */ +class ColumnStoreSorter { +public: + ColumnStoreSorter(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats); + + void add(PathView path, RecordId recordId, CellView cellContents); + + size_t numSpills() const { + return _numSpills; + } + + struct Key { + PathView path; + RecordId recordId; + + struct SorterDeserializeSettings {}; + + bool operator<(const Key& other) const; + void serializeForSorter(BufBuilder& buf) const; + + // Assumes that the source buffer will remain valid for the lifetime of the returned + // ColumnStoreSorter::Key object. + static Key deserializeForSorter(BufReader& buf, SorterDeserializeSettings); + + size_t memUsageForSorter() const { + return sizeof(path) + path.size() + recordId.memUsage(); + } + + Key getOwned() const { + MONGO_UNREACHABLE; + } + }; + + struct Value { + CellView cell; + + struct SorterDeserializeSettings {}; + + void serializeForSorter(BufBuilder& buf) const; + static Value deserializeForSorter(BufReader& buf, SorterDeserializeSettings); + + size_t memUsageForSorter() const { + return sizeof(cell) + cell.size(); + } + + Value getOwned() const { + MONGO_UNREACHABLE; + } + }; + + using Iterator = SortIteratorInterface; + + Iterator* done(); + +private: + class InMemoryIterator; + + static SortOptions makeSortOptions(const std::string& dbName, SorterFileStats* stats); + static std::string pathForNewSpillFile(); + + void spill(); + + Iterator* inMemoryIterator() const; + + const std::string _dbName; + SorterFileStats* _stats; // Unowned + + const size_t _maxMemoryUsageBytes; + size_t _memUsed = 0; + size_t _numSpills = 0; + + /** + * Mapping from path name to the sorted list of (RecordId, Cell) pairs. + */ + using CellVector = std::vector>; + StringMap _dataByPath; + + std::shared_ptr::File> _spillFile; + std::vector> _spilledFileIterators; + + bool _done = false; +}; +} // namespace mongo diff --git a/src/mongo/db/index/column_store_sorter_test.cpp b/src/mongo/db/index/column_store_sorter_test.cpp new file mode 100644 index 00000000000..4b407bc1211 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter_test.cpp @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/index/column_store_sorter.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +TEST(ColumnStoreSorter, SortTest) { + // Each entry of the top-level vector contains the field names of a sample document whose + // RecordId is the entry index. No field values are included in this sample data. + std::vector> sampleData = {{"foo", "bar", "foo.bar", "bar.bar"}, + {"bar", "foo", "bar.bar"}, + {"bar.bar", "foo.bar", "bar", "foo"}, + {"bar", "foo.bar", "baz"}, + {"foo.bar", "bar", "foo"}}; + + // The output of sorting the 'sampleData' field names by (Field name, RecordId). + std::vector> sortedData = {{"bar", 0}, + {"bar", 1}, + {"bar", 2}, + {"bar", 3}, + {"bar", 4}, + {"bar.bar", 0}, + {"bar.bar", 1}, + {"bar.bar", 2}, + {"baz", 3}, + {"foo", 0}, + {"foo", 1}, + {"foo", 2}, + {"foo", 4}, + {"foo.bar", 0}, + {"foo.bar", 2}, + {"foo.bar", 3}, + {"foo.bar", 4}}; + + // ColumnStoreSorter uses the dbpath to store its spill files. + ON_BLOCK_EXIT( + [oldDbPath = storageGlobalParams.dbpath]() { storageGlobalParams.dbpath = oldDbPath; }); + unittest::TempDir tempDir("columnStoreSorterTests"); + storageGlobalParams.dbpath = tempDir.path(); + + // We test two sorters: one that can perform the sort in memory and one that is constrained so + // that it must spill to disk. + + SorterFileStats statsForInMemorySorter; + auto inMemorySorter = std::make_unique( + 1000000 /* maxMemoryUsageBytes */, "dbName", &statsForInMemorySorter); + + SorterFileStats statsForExternalSorter; + auto externalSorter = std::make_unique( + 500 /* maxMemoryUsageBytes */, "dbName", &statsForExternalSorter); + + // First, load documents into each sorter. + for (size_t i = 0; i < sampleData.size(); ++i) { + for (auto&& fieldName : sampleData[i]) { + // Synthesize cell contents based on the field name and RecordId, so that we can test + // that cell contents travel with the (Field name, RecordId) key. The null-byte + // delimiter tests that the sorter correctly stores cells with internal null bytes. + std::string cell = str::stream() << fieldName << "\0" << i; + inMemorySorter->add(fieldName, RecordId(i), cell); + externalSorter->add(fieldName, RecordId(i), cell); + } + } + + // Now sort, iterate the sorted output, and ensure it matches the expected output. + std::unique_ptr sortedItInMemory(inMemorySorter->done()); + std::unique_ptr sortedItExternal(externalSorter->done()); + for (auto&& expected : sortedData) { + std::string expectedCell = str::stream() << expected.first << "\0" << expected.second; + + { + ASSERT(sortedItInMemory->more()); + auto [columnKey, columnValue] = sortedItInMemory->next(); + + ASSERT_EQ(expected.first, columnKey.path); + ASSERT_EQ(RecordId(expected.second), columnKey.recordId); + ASSERT_EQ(expectedCell, columnValue.cell); + } + + { + ASSERT(sortedItExternal->more()); + auto [columnKey, columnValue] = sortedItExternal->next(); + + ASSERT_EQ(expected.first, columnKey.path); + ASSERT_EQ(RecordId(expected.second), columnKey.recordId); + ASSERT_EQ(expectedCell, columnValue.cell); + } + } + ASSERT(!sortedItInMemory->more()); + ASSERT(!sortedItExternal->more()); + + sortedItInMemory.reset(); + sortedItExternal.reset(); + + // Ensure that statistics for spills and file accesses are as expected. + ASSERT_EQ(0, inMemorySorter->numSpills()); + ASSERT_EQ(4, externalSorter->numSpills()); + + ASSERT_EQ(0, statsForInMemorySorter.opened.load()); + ASSERT_EQ(0, statsForInMemorySorter.closed.load()); + + // The external sorter has opened its spill file but will not close and delete it until it is + // destroyed. + ASSERT_EQ(1, statsForExternalSorter.opened.load()); + ASSERT_EQ(0, statsForExternalSorter.closed.load()); + + inMemorySorter.reset(); + externalSorter.reset(); + + ASSERT_EQ(0, statsForInMemorySorter.closed.load()); + ASSERT_EQ(1, statsForExternalSorter.closed.load()); +} +} // namespace mongo diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index 87501f61cd5..71d4c996aa7 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -35,9 +35,11 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" +#include "mongo/db/index/column_store_sorter.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" @@ -96,23 +98,29 @@ public: private: ColumnStoreAccessMethod* const _columnsAccess; - // For now we'll just collect all the docs to insert before inserting them. - // TODO SERVER-65481 Do an actual optimized bulk insert with sorting. - std::list _ownedObjects; - std::vector _deferredInserts; + + ColumnStoreSorter _sorter; + BufBuilder _cellBuilder; + int64_t _keysInserted = 0; }; ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName) - : _columnsAccess(index) {} + : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) { + countNewBuildInStats(); +} ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName) - : _columnsAccess(index) {} + : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) { + countResumedBuildInStats(); + // TODO SERVER-66925: Add this support. + tasserted(6548103, "No support for resuming interrupted columnstore index builds."); +} Status ColumnStoreAccessMethod::BulkBuilder::insert( OperationContext* opCtx, @@ -123,15 +131,20 @@ Status ColumnStoreAccessMethod::BulkBuilder::insert( const InsertDeleteOptions& options, const std::function& saveCursorBeforeWrite, const std::function& restoreCursorAfterWrite) { - // TODO SERVER-65481 Do an actual optimized bulk insert with sorting. - _ownedObjects.push_back(obj.getOwned()); - BsonRecord record; - record.docPtr = &_ownedObjects.back(); - record.id = rid; - _deferredInserts.push_back(record); + column_keygen::visitCellsForInsert( + obj, [&](PathView path, const column_keygen::UnencodedCellView& cell) { + _cellBuilder.reset(); + writeEncodedCell(cell, &_cellBuilder); + _sorter.add(path, rid, CellView(_cellBuilder.buf(), _cellBuilder.len())); + + ++_keysInserted; + }); + return Status::OK(); } +// The "multikey" property does not apply to columnstore indexes, because the array key does not +// represent a field in a document and const MultikeyPaths& ColumnStoreAccessMethod::BulkBuilder::getMultikeyPaths() const { const static MultikeyPaths empty; return empty; @@ -156,21 +169,73 @@ Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx, int32_t yieldIterations, const KeyHandlerFn& onDuplicateKeyInserted, const RecordIdHandlerFn& onDuplicateRecord) { - static constexpr size_t kBufferBlockSize = 1024; - SharedBufferFragmentBuilder pooledBufferBuilder(kBufferBlockSize); - - WriteUnitOfWork wunit(opCtx); - auto status = _columnsAccess->insert(opCtx, - pooledBufferBuilder, - collection, - _deferredInserts, - InsertDeleteOptions{}, - &_keysInserted); - if (!status.isOK()) { - return status; + Timer timer; + + auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx); + + static constexpr char message[] = + "Index Build: inserting keys from external sorter into columnstore index"; + ProgressMeterHolder pm; + { + stdx::unique_lock lk(*opCtx->getClient()); + pm.set( + CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */)); + } + + auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx); + + int64_t iterations = 0; + boost::optional previousKey; + std::unique_ptr it(_sorter.done()); + while (it->more()) { + opCtx->checkForInterrupt(); + + auto columnStoreKeyWithValue = it->next(); + + // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing + // order. + if (kDebugBuild) { + if (previousKey && !(*previousKey < columnStoreKeyWithValue.first)) { + LOGV2_FATAL_NOTRACE(6548100, + "Out-of-order result from sorter for column store bulk loader", + "prevPathName"_attr = previousKey->path, + "prevRecordId"_attr = previousKey->recordId, + "nextPathName"_attr = columnStoreKeyWithValue.first.path, + "nextRecordId"_attr = columnStoreKeyWithValue.first.recordId, + "index"_attr = _columnsAccess->_descriptor->indexName()); + } + previousKey = columnStoreKeyWithValue.first; + } + + try { + writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] { + WriteUnitOfWork wunit(opCtx); + auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue; + builder->addCell( + columnStoreKey.path, columnStoreKey.recordId, columnStoreValue.cell); + wunit.commit(); + }); + } catch (DBException& e) { + return e.toStatus(); + } + + // Yield locks every 'yieldIterations' key insertions. + if (++iterations % yieldIterations == 0) { + yield(opCtx, &collection, ns); + } + + pm.hit(); } - wunit.commit(); + pm.finished(); + + LOGV2(6548101, + "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace " + "{namespace} in {duration} seconds", + "keysInserted"_attr = _keysInserted, + "index"_attr = _columnsAccess->_descriptor->indexName(), + logAttrs(ns), + "duration"_attr = Seconds(timer.seconds())); return Status::OK(); } @@ -185,7 +250,7 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, auto cursor = _store->newWriteCursor(opCtx); column_keygen::visitCellsForInsert( bsonRecords, - [&](StringData path, + [&](PathView path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { if (!rec.ts.isNull()) { @@ -214,7 +279,7 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { auto cursor = _store->newWriteCursor(opCtx); - column_keygen::visitPathsForDelete(obj, [&](StringData path) { + column_keygen::visitPathsForDelete(obj, [&](PathView path) { cursor->remove(path, rid); inc(keysDeletedOut); }); diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 150558bfb69..0144e00d973 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -50,7 +50,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/timestamp_block.h" -#include "mongo/db/sorter/sorter.h" #include "mongo/db/storage/execution_context.h" #include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" @@ -128,12 +127,12 @@ bool isMultikeyFromPaths(const MultikeyPaths& multikeyPaths) { [](const MultikeyComponents& components) { return !components.empty(); }); } -SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) { +SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats) { return SortOptions() .TempDir(storageGlobalParams.dbpath + "/_tmp") .ExtSortAllowed() .MaxMemoryUsageBytes(maxMemoryUsageBytes) - .FileStats(&indexBulkBuilderSSS.sorterFileStats) + .FileStats(stats) .DBName(dbName.toString()); } @@ -610,6 +609,51 @@ Ident* SortedDataIndexAccessMethod::getIdentPtr() const { return this->_newInterface.get(); } +void IndexAccessMethod::BulkBuilder::countNewBuildInStats() { + indexBulkBuilderSSS.count.addAndFetch(1); +} + +void IndexAccessMethod::BulkBuilder::countResumedBuildInStats() { + indexBulkBuilderSSS.count.addAndFetch(1); + indexBulkBuilderSSS.resumed.addAndFetch(1); +} + +SorterFileStats* IndexAccessMethod::BulkBuilder::bulkBuilderFileStats() { + return &indexBulkBuilderSSS.sorterFileStats; +} + +void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx, + const Yieldable* yieldable, + const NamespaceString& ns) { + // Releasing locks means a new snapshot should be acquired when restored. + opCtx->recoveryUnit()->abandonSnapshot(); + yieldable->yield(); + + auto locker = opCtx->lockState(); + Locker::LockSnapshot snapshot; + if (locker->saveLockStateAndUnlock(&snapshot)) { + + // Track the number of yields in CurOp. + CurOp::get(opCtx)->yielded(); + + auto failPointHang = [opCtx, &ns](FailPoint* fp) { + fp->executeIf( + [fp](auto&&) { + LOGV2(5180600, "Hanging index build during bulk load yield"); + fp->pauseWhileSet(); + }, + [opCtx, &ns](auto&& config) { + return config.getStringField("namespace") == ns.ns(); + }); + }; + failPointHang(&hangDuringIndexBuildBulkLoadYield); + failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond); + + locker->restoreLockState(opCtx, snapshot); + } + yieldable->restore(); +} + class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder { public: using Sorter = mongo::Sorter; @@ -646,9 +690,6 @@ public: IndexStateInfo persistDataForShutdown() final; private: - void _yield(OperationContext* opCtx, - const Yieldable* yieldable, - const NamespaceString& ns) const; void _insertMultikeyMetadataKeysIntoSorter(); Sorter* _makeSorter( @@ -689,7 +730,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc size_t maxMemoryUsageBytes, StringData dbName) : _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) { - indexBulkBuilderSSS.count.addAndFetch(1); + countNewBuildInStats(); } SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam, @@ -702,8 +743,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc _keysInserted(stateInfo.getNumKeys().value_or(0)), _isMultiKey(stateInfo.getIsMultikey()), _indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) { - indexBulkBuilderSSS.count.addAndFetch(1); - indexBulkBuilderSSS.resumed.addAndFetch(1); + countResumedBuildInStats(); } Status SortedDataIndexAccessMethod::BulkBuilderImpl::insert( @@ -825,46 +865,16 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter( StringData dbName, boost::optional fileName, const boost::optional>& ranges) const { - return fileName ? Sorter::makeFromExistingRanges(fileName->toString(), - *ranges, - makeSortOptions(maxMemoryUsageBytes, dbName), - BtreeExternalSortComparison(), - _makeSorterSettings()) - : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName), - BtreeExternalSortComparison(), - _makeSorterSettings()); -} - -void SortedDataIndexAccessMethod::BulkBuilderImpl::_yield(OperationContext* opCtx, - const Yieldable* yieldable, - const NamespaceString& ns) const { - // Releasing locks means a new snapshot should be acquired when restored. - opCtx->recoveryUnit()->abandonSnapshot(); - yieldable->yield(); - - auto locker = opCtx->lockState(); - Locker::LockSnapshot snapshot; - if (locker->saveLockStateAndUnlock(&snapshot)) { - - // Track the number of yields in CurOp. - CurOp::get(opCtx)->yielded(); - - auto failPointHang = [opCtx, &ns](FailPoint* fp) { - fp->executeIf( - [fp](auto&&) { - LOGV2(5180600, "Hanging index build during bulk load yield"); - fp->pauseWhileSet(); - }, - [opCtx, &ns](auto&& config) { - return config.getStringField("namespace") == ns.ns(); - }); - }; - failPointHang(&hangDuringIndexBuildBulkLoadYield); - failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond); - - locker->restoreLockState(opCtx, snapshot); - } - yieldable->restore(); + return fileName + ? Sorter::makeFromExistingRanges( + fileName->toString(), + *ranges, + makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()), + BtreeExternalSortComparison(), + _makeSorterSettings()) + : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()), + BtreeExternalSortComparison(), + _makeSorterSettings()); } Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit( @@ -979,7 +989,7 @@ Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit( // Starts yielding locks after the first non-zero 'yieldIterations' inserts. if (yieldIterations && (i + 1) % yieldIterations == 0) { - _yield(opCtx, &collection, ns); + yield(opCtx, &collection, ns); } // If we're here either it's a dup and we're cool with it or the addKey went just fine. diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 3718040885d..7e3a4dd504a 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -40,6 +40,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/record_id.h" +#include "mongo/db/sorter/sorter.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/db/yieldable.h" @@ -219,6 +220,19 @@ public: * Persists on disk the keys that have been inserted using this BulkBuilder. */ virtual IndexStateInfo persistDataForShutdown() = 0; + + protected: + static void countNewBuildInStats(); + static void countResumedBuildInStats(); + static SorterFileStats* bulkBuilderFileStats(); + + /** + * Abandon the current snapshot and release then reacquire locks. Tests that target the + * behavior of bulk index builds that yield can use failpoints to stall this yield. + */ + static void yield(OperationContext* opCtx, + const Yieldable* yieldable, + const NamespaceString& ns); }; /** diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index c23c2695afd..ecb9e10118e 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -1291,11 +1291,11 @@ void SortedFileWriter::addAlreadySorted(const Key& key, const Value& addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum); if (_buffer.len() > static_cast(kSortedFileBufferSize)) - spill(); + writeChunk(); } template -void SortedFileWriter::spill() { +void SortedFileWriter::writeChunk() { int32_t size = _buffer.len(); char* outBuffer = _buffer.buf(); @@ -1340,7 +1340,7 @@ void SortedFileWriter::spill() { template SortIteratorInterface* SortedFileWriter::done() { - spill(); + writeChunk(); return new sorter::FileIterator( _file, _fileStartOffset, _file->currentOffset(), _settings, _dbName, _checksum); diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 12bccee1178..85c0f279ac2 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -592,16 +592,23 @@ public: void addAlreadySorted(const Key&, const Value&); /** - * Spills any data remaining in the buffer to disk and then closes the file to which data was + * Writes any data remaining in the buffer to disk and then closes the file to which data was * written. * * No more data can be added via addAlreadySorted() after calling done(). */ Iterator* done(); -private: - void spill(); + /** + * The SortedFileWriter organizes data into chunks, with a chunk getting written to the output + * file when it exceends a maximum chunks size. A SortedFilerWriter client can produce a short + * chunk by manually calling this function. + * + * If no new data has been added since the last chunk was written, this function is a no-op. + */ + void writeChunk(); +private: const Settings _settings; std::shared_ptr::File> _file; BufBuilder _buffer; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp index e9bc642dfb8..e9000044126 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp @@ -393,28 +393,26 @@ std::unique_ptr WiredTigerColumnStore::newCursor( class WiredTigerColumnStore::BulkBuilder final : public ColumnStore::BulkBuilder { public: BulkBuilder(WiredTigerColumnStore* idx, OperationContext* opCtx) - : _opCtx(opCtx), - _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()), - _cursor(openBulkCursor(idx)) {} - - ~BulkBuilder() { - _cursor->close(_cursor); - } + : _opCtx(opCtx), _cursor(idx->uri(), opCtx) {} void addCell(PathView path, const RecordId& rid, CellView cell) override { - uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder"); - } + const std::string& key = makeKey(_buffer, path, rid); + WiredTigerItem keyItem(key.c_str(), key.size()); + _cursor->set_key(_cursor.get(), keyItem.Get()); -private: - WT_CURSOR* openBulkCursor(WiredTigerColumnStore* idx) { - // TODO SERVER-65484: Much of this logic can be shared with standard WT index. - uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder"); + WiredTigerItem cellItem(cell.rawData(), cell.size()); + _cursor->set_value(_cursor.get(), cellItem.Get()); + + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); + + ResourceConsumption::MetricsCollector::get(_opCtx).incrementOneIdxEntryWritten( + std::string(_cursor->uri), keyItem.size); } +private: std::string _buffer; OperationContext* const _opCtx; - UniqueWiredTigerSession const _session; - WT_CURSOR* const _cursor; + WiredTigerBulkLoadCursor _cursor; }; std::unique_ptr WiredTigerColumnStore::makeBulkBuilder( diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp index 188983bf071..d5704989b20 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp @@ -82,4 +82,32 @@ WiredTigerCursor::~WiredTigerCursor() { void WiredTigerCursor::reset() { invariantWTOK(_cursor->reset(_cursor), _cursor->session); } + +WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(const std::string& indexUri, + OperationContext* opCtx) + : _session(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->getSession()) { + // Open cursors can cause bulk open_cursor to fail with EBUSY. + // TODO any other cases that could cause EBUSY? + WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(opCtx)->getSession(); + outerSession->closeAllCursors(indexUri); + + // The 'checkpoint_wait=false' option is set to prefer falling back on the "non-bulk" cursor + // over waiting a potentially long time for a checkpoint. + WT_SESSION* sessionPtr = _session->getSession(); + int err = sessionPtr->open_cursor( + sessionPtr, indexUri.c_str(), nullptr, "bulk,checkpoint_wait=false", &_cursor); + if (!err) { + return; // Success + } + + LOGV2_WARNING(51783, + "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk " + "cursor for index {index}", + "Failed to create WiredTiger bulk cursor, falling back to non-bulk", + "error"_attr = wiredtiger_strerror(err), + "index"_attr = indexUri); + + invariantWTOK(sessionPtr->open_cursor(sessionPtr, indexUri.c_str(), nullptr, nullptr, &_cursor), + sessionPtr); +} } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h index b31f91465cf..716226e45a9 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h @@ -81,4 +81,31 @@ protected: WT_CURSOR* _cursor = nullptr; // Owned }; + +/** + * An owning object wrapper for a WT_SESSION and WT_CURSOR configured for bulk loading when + * possible. The cursor is created and closed independently of the cursor cache, which does not + * store bulk cursors. It uses its own session to avoid hijacking an existing transaction in the + * current session. + */ +class WiredTigerBulkLoadCursor { +public: + WiredTigerBulkLoadCursor(const std::string& indexUri, OperationContext* opCtx); + + ~WiredTigerBulkLoadCursor() { + _cursor->close(_cursor); + } + + WT_CURSOR* get() const { + return _cursor; + } + + WT_CURSOR* operator->() const { + return get(); + } + +private: + UniqueWiredTigerSession const _session; + WT_CURSOR* _cursor = nullptr; // Owned +}; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index 69c422c27d5..16870b4c7d7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -692,54 +692,16 @@ RecordId WiredTigerIndex::_decodeRecordIdAtEnd(const void* buffer, size_t size) class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface { public: BulkBuilder(WiredTigerIndex* idx, OperationContext* opCtx) - : _ordering(idx->_ordering), - _opCtx(opCtx), - _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()), - _cursor(openBulkCursor(idx)) {} - - ~BulkBuilder() { - _cursor->close(_cursor); - } + : _ordering(idx->_ordering), _opCtx(opCtx), _cursor(idx->uri(), _opCtx) {} protected: - WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) { - // Open cursors can cause bulk open_cursor to fail with EBUSY. - // TODO any other cases that could cause EBUSY? - WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_opCtx)->getSession(); - outerSession->closeAllCursors(idx->uri()); - - // Not using cursor cache since we need to set "bulk". - WT_CURSOR* cursor; - // Use a different session to ensure we don't hijack an existing transaction. - // Configure the bulk cursor open to fail quickly if it would wait on a checkpoint - // completing - since checkpoints can take a long time, and waiting can result in - // an unexpected pause in building an index. - WT_SESSION* session = _session->getSession(); - int err = session->open_cursor( - session, idx->uri().c_str(), nullptr, "bulk,checkpoint_wait=false", &cursor); - if (!err) - return cursor; - - LOGV2_WARNING(51783, - "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk " - "cursor for index {index}", - "Failed to create WiredTiger bulk cursor, falling back to non-bulk", - "error"_attr = wiredtiger_strerror(err), - "index"_attr = idx->uri()); - - invariantWTOK(session->open_cursor(session, idx->uri().c_str(), nullptr, nullptr, &cursor), - session); - return cursor; - } - void setKey(WT_CURSOR* cursor, const WT_ITEM* item) { cursor->set_key(cursor, item); } const Ordering _ordering; OperationContext* const _opCtx; - UniqueWiredTigerSession const _session; - WT_CURSOR* const _cursor; + WiredTigerBulkLoadCursor _cursor; }; @@ -756,16 +718,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem item(keyString.getBuffer(), keyString.getSize()); - setKey(_cursor, item.Get()); + setKey(_cursor.get(), item.Get()); const KeyString::TypeBits typeBits = keyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), item.size); @@ -824,16 +786,16 @@ public: // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem keyItem(newKeyString.getBuffer(), newKeyString.getSize()); - setKey(_cursor, keyItem.Get()); + setKey(_cursor.get(), keyItem.Get()); const KeyString::TypeBits typeBits = newKeyString.getTypeBits(); WiredTigerItem valueItem = typeBits.isAllZeros() ? emptyItem : WiredTigerItem(typeBits.getBuffer(), typeBits.getSize()); - _cursor->set_value(_cursor, valueItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); @@ -882,10 +844,10 @@ public: WiredTigerItem keyItem(newKeyString.getBuffer(), sizeWithoutRecordId); WiredTigerItem valueItem(value.getBuffer(), value.getSize()); - setKey(_cursor, keyItem.Get()); - _cursor->set_value(_cursor, valueItem.Get()); + setKey(_cursor.get(), keyItem.Get()); + _cursor->set_value(_cursor.get(), valueItem.Get()); - invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session); + invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size); diff --git a/src/mongo/util/bufreader.h b/src/mongo/util/bufreader.h index 8c30070bada..7b9f8a1c5a9 100644 --- a/src/mongo/util/bufreader.h +++ b/src/mongo/util/bufreader.h @@ -124,6 +124,14 @@ public: s = readCStr().toString(); } + /** + * Return a view of the next len bytes and advance by len. + */ + StringData readBytes(size_t len) { + // Note: the call to skip() includes a check that at least 'len' bytes remain in the buffer. + return StringData(reinterpret_cast(skip(len)), len); + } + const void* pos() { return _pos; } -- cgit v1.2.1