summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2022-06-29 12:32:12 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-29 20:02:02 +0000
commitcb9472afc30d32d1c18691d64899c1aa72cdc43d (patch)
tree924020b6f50ccf365926a2420c64d92e5497a34a
parentf684488a8509f0a3764fa48221499b9cd508c0e1 (diff)
downloadmongo-cb9472afc30d32d1c18691d64899c1aa72cdc43d.tar.gz
SERVER-65481 Bulk shredding and loading for column store indexes
-rw-r--r--jstests/noPassthrough/column_store_index_load.js190
-rw-r--r--src/mongo/db/catalog/multi_index_block.cpp11
-rw-r--r--src/mongo/db/exec/sbe/SConscript2
-rw-r--r--src/mongo/db/index/SConscript43
-rw-r--r--src/mongo/db/index/column_store_sorter.cpp297
-rw-r--r--src/mongo/db/index/column_store_sorter.h134
-rw-r--r--src/mongo/db/index/column_store_sorter_test.cpp140
-rw-r--r--src/mongo/db/index/columns_access_method.cpp119
-rw-r--r--src/mongo/db/index/index_access_method.cpp110
-rw-r--r--src/mongo/db/index/index_access_method.h14
-rw-r--r--src/mongo/db/sorter/sorter.cpp6
-rw-r--r--src/mongo/db/sorter/sorter.h13
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h27
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp60
-rw-r--r--src/mongo/util/bufreader.h8
17 files changed, 1065 insertions, 165 deletions
diff --git a/jstests/noPassthrough/column_store_index_load.js b/jstests/noPassthrough/column_store_index_load.js
new file mode 100644
index 00000000000..757a2eeb73d
--- /dev/null
+++ b/jstests/noPassthrough/column_store_index_load.js
@@ -0,0 +1,190 @@
+
+/**
+ * Test that different methods of loading a column store index all produce the same valid results.
+ * Indexes are validated by comparing query results that use the index with results from a control
+ * query that uses a collection scan.
+ * @tags: [
+ * # columnstore indexes are new in 6.1.
+ * requires_fcv_61,
+ * # We could potentially need to resume an index build in the event of a stepdown, which is not
+ * # yet implemented.
+ * does_not_support_stepdowns,
+ * # Columnstore indexes are incompatible with clustered collections.
+ * incompatible_with_clustered_collection,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/analyze_plan.js");
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
+
+const mongod = MongoRunner.runMongod({});
+const db = mongod.getDB("test");
+
+const columnStoreEnabled =
+ checkSBEEnabled(db, ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"]);
+if (!columnStoreEnabled) {
+ jsTestLog("Skipping column store bulk load test test since the feature flag is not enabled.");
+ MongoRunner.stopMongod(mongod);
+ return;
+}
+
+//
+// Create test documents.
+//
+
+const testValues = [
+ {foo: 1, bar: 2},
+ {bar: 2, baz: 3},
+ {foo: 3, baz: 4},
+ {foo: 5, bar: 6},
+ {bar: 7, baz: [7, 8]},
+];
+
+// We create our test documents by choosing k-permutations from the 'testValues' array. The
+// kPermutations() function returns an array of all possible permutations with 'k' choices from all
+// values in the 'arr' input. The 'choices' input stores the values chosen at previous levels of
+// recursion.
+function kPermutations(arr, n, choices = []) {
+ if (n == 0) {
+ return [choices];
+ }
+
+ const permutations = [];
+ for (let i = 0; i < arr.length; ++i) {
+ const subSequence = arr.slice(0, i).concat(arr.slice(i + 1));
+ permutations.push(...kPermutations(subSequence, n - 1, choices.concat([arr[i]])));
+ }
+ return permutations;
+}
+
+const testDocs = kPermutations(testValues, 4).map((permutation, idx) => ({
+ idx: idx,
+ foo: [permutation[0], permutation[1]],
+ bar: [permutation[2], permutation[3]]
+ }));
+
+// Test queries use a projection that includes every possible leaf field. Projections on fields that
+// have sub-documents fall back to the row store, which would not serve to validate the contents of
+// the index.
+const testProjection = {
+ _id: 0,
+ idx: 1,
+ "foo.foo": 1,
+ "foo.bar": 1,
+ "foo.baz": 1,
+ "bar.foo": 1,
+ "bar.bar": 1,
+ "bar.baz": 1,
+};
+
+// The test query would normally not qualify for a column store index plan, because it projects a
+// large number of fields. We raise the limit on the number of fields to allow column store plans
+// for the purposes of this test.
+db.adminCommand({
+ setParameter: 1,
+ internalQueryMaxNumberOfFieldsToChooseUnfilteredColumnScan: Object.keys(testProjection).length
+});
+
+function loadDocs(coll, documents) {
+ const bulk = coll.initializeUnorderedBulkOp();
+ for (const doc of documents) {
+ bulk.insert(doc);
+ }
+ assert.commandWorked(bulk.execute());
+}
+
+//
+// We load the same documents into 4 collections:
+//
+// 1. a control collection with no index,
+const noIndexColl = db.column_store_index_load_no_index;
+
+// 2. a collection whose column store index is populated with an in-memory bulk load,
+const bulkLoadInMemoryColl = db.column_store_index_load_in_memory;
+
+// 3. a collection whose column store index is populated with a bulk load that uses an external
+// merge sort (i.e., one that "spills" to disk), and
+const bulkLoadExternalColl = db.column_store_index_load_external;
+
+// 4. a collection whose column store index is populated as documents are inserted.
+const onlineLoadColl = db.column_store_index_online_load;
+
+// Load the control collection.
+noIndexColl.drop();
+loadDocs(noIndexColl, testDocs);
+
+// Perform the in-memory bulk load.
+bulkLoadInMemoryColl.drop();
+loadDocs(bulkLoadInMemoryColl, testDocs);
+assert.commandWorked(bulkLoadInMemoryColl.createIndex({"$**": "columnstore"}));
+
+const statsAfterInMemoryBuild = assert.commandWorked(db.runCommand({serverStatus: 1}));
+assert.docEq({
+ count: NumberLong(1),
+ resumed: NumberLong(0),
+ filesOpenedForExternalSort: NumberLong(0),
+ filesClosedForExternalSort: NumberLong(0)
+},
+ statsAfterInMemoryBuild.indexBulkBuilder);
+
+// Perform the external bulk load. The server config won't allow a memory limit lower than 50MB, so
+// we use a failpoint to set it lower than that for the purposes of this test.
+bulkLoadExternalColl.drop();
+assert.commandWorked(db.adminCommand({
+ configureFailPoint: "constrainMemoryForBulkBuild",
+ mode: "alwaysOn",
+ data: {maxBytes: 20000}
+}));
+loadDocs(bulkLoadExternalColl, testDocs);
+assert.commandWorked(bulkLoadExternalColl.createIndex({"$**": "columnstore"}));
+
+const statsAfterExternalLoad = assert.commandWorked(db.runCommand({serverStatus: 1}));
+assert.docEq({
+ count: NumberLong(2),
+ resumed: NumberLong(0),
+ filesOpenedForExternalSort: NumberLong(1),
+ filesClosedForExternalSort: NumberLong(1)
+},
+ statsAfterExternalLoad.indexBulkBuilder);
+
+// Perfom the online load.
+onlineLoadColl.drop();
+onlineLoadColl.createIndex({"$**": "columnstore"});
+loadDocs(onlineLoadColl, testDocs);
+
+//
+// Verify that our test query uses the column store.
+//
+[bulkLoadInMemoryColl, bulkLoadExternalColl, onlineLoadColl].forEach(function(coll) {
+ const explain = coll.find({}, testProjection).sort({idx: 1}).explain();
+ assert(planHasStage(db, explain, "COLUMN_SCAN"), explain);
+});
+
+//
+// Run a query on each of the test collections, including the "no index" control collection.
+//
+const noIndexResults = noIndexColl.find({}, testProjection).sort({idx: 1}).toArray();
+const bulkLoadInMemoryResults =
+ bulkLoadInMemoryColl.find({}, testProjection).sort({idx: 1}).toArray();
+const bulkLoadExternalResults =
+ bulkLoadExternalColl.find({}, testProjection).sort({idx: 1}).toArray();
+const onlineLoadResults = onlineLoadColl.find({}, testProjection).sort({idx: 1}).toArray();
+
+//
+// Verify that the test query produces the same results in all test configurations.
+//
+assert.eq(testDocs.length, noIndexResults.length);
+assert.eq(testDocs.length, bulkLoadInMemoryResults.length);
+assert.eq(testDocs.length, bulkLoadExternalResults.length);
+assert.eq(testDocs.length, onlineLoadResults.length);
+
+for (let i = 0; i < noIndexResults.length; ++i) {
+ assert.docEq(noIndexResults[i], bulkLoadInMemoryResults[i]);
+ assert.docEq(noIndexResults[i], bulkLoadExternalResults[i]);
+ assert.docEq(noIndexResults[i], onlineLoadResults[i]);
+}
+
+MongoRunner.stopMongod(mongod);
+})();
diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp
index 8a66be4f602..bce71c5496b 100644
--- a/src/mongo/db/catalog/multi_index_block.cpp
+++ b/src/mongo/db/catalog/multi_index_block.cpp
@@ -68,6 +68,7 @@
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(constrainMemoryForBulkBuild);
MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpIndexBuild);
MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpIndexBuildUnlocked);
MONGO_FAIL_POINT_DEFINE(hangAfterStartingIndexBuild);
@@ -83,8 +84,16 @@ size_t getEachIndexBuildMaxMemoryUsageBytes(size_t numIndexSpecs) {
return 0;
}
- return static_cast<std::size_t>(maxIndexBuildMemoryUsageMegabytes.load()) * 1024 * 1024 /
+ auto result = static_cast<std::size_t>(maxIndexBuildMemoryUsageMegabytes.load()) * 1024 * 1024 /
numIndexSpecs;
+
+ // When enabled by a test, this failpoint allows the test to set the maximum allowed memory for
+ // an index build to an unreasonably low value that is below what the user configuration will
+ // allow.
+ constrainMemoryForBulkBuild.execute(
+ [&](const BSONObj& data) { result = data["maxBytes"].numberLong(); });
+
+ return result;
}
Status timeseriesMixedSchemaDataFailure(const Collection* collection) {
diff --git a/src/mongo/db/exec/sbe/SConscript b/src/mongo/db/exec/sbe/SConscript
index 485ab55fa82..859cf09da2f 100644
--- a/src/mongo/db/exec/sbe/SConscript
+++ b/src/mongo/db/exec/sbe/SConscript
@@ -219,7 +219,7 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
- '$BUILD_DIR/mongo/db/index/columnar_index',
+ '$BUILD_DIR/mongo/db/index/column_store_index',
'$BUILD_DIR/mongo/db/query/collation/collator_interface_mock',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 9ccff04ff89..f71d78da857 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -84,9 +84,9 @@ env.Library(
],
)
-serveronlyEnv = env.Clone()
-serveronlyEnv.InjectThirdParty(libraries=['snappy'])
-serveronlyEnv.Library(
+iamEnv = env.Clone()
+iamEnv.InjectThirdParty(libraries=['snappy'])
+iamEnv.Library(
target="index_access_method",
source=[
'duplicate_key_tracker.cpp',
@@ -119,6 +119,22 @@ serveronlyEnv.Library(
],
)
+iamEnv.Library(
+ target="column_store_index",
+ source=[
+ 'column_cell.cpp',
+ "column_store_sorter.cpp",
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
+ '$BUILD_DIR/mongo/db/storage/encryption_hooks',
+ '$BUILD_DIR/mongo/db/storage/storage_options',
+ '$BUILD_DIR/mongo/s/is_mongos',
+ '$BUILD_DIR/third_party/shim_snappy',
+ ],
+)
+
env.Library(
target="index_access_methods",
source=[
@@ -137,31 +153,26 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
+ '$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/fts/base_fts',
'$BUILD_DIR/mongo/db/index_names',
- 'columnar_index',
+ 'column_store_index',
'expression_params',
'key_generator',
],
)
-env.Library(
- target="columnar_index",
- source=[
- 'column_cell.cpp',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/base',
- ],
-)
-
-env.CppUnitTest(
+indexTestEnv = env.Clone()
+indexTestEnv.InjectThirdParty(libraries=['snappy'])
+indexTestEnv.CppUnitTest(
target='db_index_test',
source=[
'2d_key_generator_test.cpp',
'btree_key_generator_test.cpp',
'column_cell_test.cpp',
'column_key_generator_test.cpp',
+ 'column_store_sorter_test.cpp',
'hash_key_generator_test.cpp',
's2_key_generator_test.cpp',
's2_bucket_key_generator_test.cpp',
@@ -177,7 +188,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/query/sort_pattern',
'$BUILD_DIR/mongo/db/record_id_helpers',
- 'columnar_index',
+ 'column_store_index',
'expression_params',
'key_generator',
],
diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp
new file mode 100644
index 00000000000..f708518a560
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter.cpp
@@ -0,0 +1,297 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/index/column_store_sorter.h"
+
+namespace mongo {
+struct ComparisonForPathAndRid {
+ int operator()(const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& left,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& right) const {
+ auto stringComparison = left.first.path.compare(right.first.path);
+ return (stringComparison != 0) ? stringComparison
+ : left.first.recordId.compare(right.first.recordId);
+ }
+};
+
+bool ColumnStoreSorter::Key::operator<(const Key& other) const {
+ if (auto cmp = path.compare(other.path); cmp != 0) {
+ return cmp < 0;
+ } else {
+ return recordId < other.recordId;
+ }
+}
+
+void ColumnStoreSorter::Key::serializeForSorter(BufBuilder& buf) const {
+ buf.appendStr(path);
+ recordId.serializeToken(buf);
+}
+
+ColumnStoreSorter::Key ColumnStoreSorter::Key::deserializeForSorter(
+ BufReader& buf, ColumnStoreSorter::Key::SorterDeserializeSettings) {
+ // Note: unlike function call parameters, the order of evaluation for initializer
+ // parameters is defined.
+ return {buf.readCStr(), RecordId::deserializeToken(buf)};
+}
+
+void ColumnStoreSorter::Value::serializeForSorter(BufBuilder& buf) const {
+ buf.appendNum(uint32_t(cell.size())); // Little-endian write
+ buf.appendBuf(cell.rawData(), cell.size());
+}
+
+ColumnStoreSorter::Value ColumnStoreSorter::Value::deserializeForSorter(
+ BufReader& buf, ColumnStoreSorter::Value::SorterDeserializeSettings) {
+ size_t cellSize = buf.read<LittleEndian<uint32_t>>();
+ return Value{buf.readBytes(cellSize)};
+}
+
+ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes,
+ StringData dbName,
+ SorterFileStats* stats)
+ : _dbName(dbName.toString()),
+ _stats(stats),
+ _maxMemoryUsageBytes(maxMemoryUsageBytes),
+ _spillFile(std::make_shared<Sorter<Key, Value>::File>(pathForNewSpillFile(), _stats)) {}
+
+void ColumnStoreSorter::add(PathView path, RecordId recordId, CellView cellContents) {
+ auto& cellListAtPath = _dataByPath[path];
+ if (cellListAtPath.empty()) {
+ // Track memory usage of this new path.
+ _memUsed += sizeof(StringMap<CellVector>::value_type) + path.size();
+ }
+
+ // The sorter assumes that RecordIds are added in sorted order.
+ tassert(6548102,
+ "Out-of-order record during columnar index build",
+ cellListAtPath.empty() || cellListAtPath.back().first < recordId);
+
+ cellListAtPath.emplace_back(recordId, CellValue(cellContents.rawData(), cellContents.size()));
+ _memUsed += cellListAtPath.back().first.memUsage() + sizeof(CellValue) +
+ cellListAtPath.back().second.size();
+ if (_memUsed > _maxMemoryUsageBytes) {
+ spill();
+ }
+}
+
+namespace {
+std::string tempDir() {
+ return str::stream() << storageGlobalParams.dbpath << "/_tmp";
+}
+} // namespace
+
+SortOptions ColumnStoreSorter::makeSortOptions(const std::string& dbName, SorterFileStats* stats) {
+ return SortOptions().TempDir(tempDir()).ExtSortAllowed().FileStats(stats).DBName(dbName);
+}
+
+std::string ColumnStoreSorter::pathForNewSpillFile() {
+ static AtomicWord<unsigned> fileNameCounter;
+ static const uint64_t randomSuffix = static_cast<uint64_t>(SecureRandom().nextInt64());
+ return str::stream() << tempDir() << "/ext-sort-column-store-index."
+ << fileNameCounter.fetchAndAdd(1) << "-" << randomSuffix;
+}
+
+void ColumnStoreSorter::spill() {
+ if (_dataByPath.empty()) {
+ return;
+ }
+ ++_numSpills;
+
+ SortedFileWriter<Key, Value> writer(makeSortOptions(_dbName, _stats), _spillFile, {});
+
+ // Cells loaded into memory are sorted by record id but not yet sorted by path. We perform that
+ // sort now, so that we can output cells sorted by (path, rid) for later consumption by our
+ // standard external merge implementation: SortIteratorInterface<Key, Value>::merge().
+ std::vector<const StringMap<CellVector>::value_type*> sortedPathList;
+ sortedPathList.reserve(_dataByPath.size());
+ for (auto&& pathWithCellVector : _dataByPath) {
+ sortedPathList.push_back(&pathWithCellVector);
+ }
+ std::sort(sortedPathList.begin(), sortedPathList.end(), [](auto left, auto right) {
+ return left->first < right->first;
+ });
+
+ size_t currentChunkSize = 0;
+ for (auto&& pathWithCellVector : sortedPathList) {
+ auto& [path, cellVector] = *pathWithCellVector;
+
+ size_t cellVectorSize = std::accumulate(
+ cellVector.begin(), cellVector.end(), 0, [& path = path](size_t sum, auto& ridAndCell) {
+ return sum + path.size() + ridAndCell.first.memUsage() + ridAndCell.second.size();
+ });
+
+ // Add (path, rid, cell) records to the spill file so that the first cell in each contiguous
+ // run of cells with the same path lives in its own chunk. E.g.:
+ // Path1, rid1, Cell contents
+ // CHUNK BOUNDARY
+ // Path1, rid2, Cell Contents
+ // ...
+ // Path1, ridN, Cell Contents
+ // CHUNK BOUNDARY
+ // Path2, rid1, Cell Contents
+ // CHUNK BOUNDARY
+ // Path2, rid2, Cell Contents
+ // ...
+ //
+ // During merging, file readers will hold one chunk from each spill file in memory, so
+ // optimizing chunk size can reduce memory usage during the merge. Merging for a column
+ // store index is a special case: because the sorter is loaded in RecordId order, all the
+ // cells from this spill are guaranteed to merge together, with no interleaving cells from
+ // other spill files.
+ //
+ // This layout will result in a merger that holds a single cell from each leg of the merge
+ // representing the first in a large contiguous range. Once that cell gets picked, the merge
+ // will consume all chunks at that path in that file before moving on to the next file or
+ // the next path.
+ //
+ // To avoid the pathological case where runs are very short, we don't force a chunk boundary
+ // when a run of cells would not result in a chunk greater than 1024 bytes.
+ const size_t kShortChunkThreshold = 1024;
+ bool writeBoundaryAfterAdd = (currentChunkSize + cellVectorSize) > kShortChunkThreshold;
+ if (writeBoundaryAfterAdd) {
+ // Add the chunk boundary just before the first cell with this path name.
+ writer.writeChunk();
+ currentChunkSize = 0;
+ }
+ for (auto ridAndCell : cellVector) {
+ const auto& cell = ridAndCell.second;
+ currentChunkSize += path.size() + ridAndCell.first.memUsage() + cell.size();
+ writer.addAlreadySorted(Key{path, ridAndCell.first},
+ Value{CellView{cell.c_str(), cell.size()}});
+
+ if (writeBoundaryAfterAdd) {
+ // Add the chunk boundary just after the first cell with this path name, giving it
+ // its own chunk.
+ writer.writeChunk();
+ writeBoundaryAfterAdd = false;
+ currentChunkSize = 0;
+ }
+ }
+ }
+
+ _spilledFileIterators.emplace_back(writer.done());
+
+ _dataByPath.clear();
+ _memUsed = 0;
+}
+
+ColumnStoreSorter::Iterator* ColumnStoreSorter::done() {
+ invariant(!std::exchange(_done, true));
+
+ if (_spilledFileIterators.size() == 0) {
+ return inMemoryIterator();
+ }
+
+ spill();
+ return SortIteratorInterface<Key, Value>::merge(
+ _spilledFileIterators, makeSortOptions(_dbName, _stats), ComparisonForPathAndRid());
+}
+
+/**
+ * This iterator "unwinds" our path -> CellVector mapping into sorted tuples of (path name,
+ * recordId, cell), with the path name and recordId bundled into a single "key." The unwinding
+ * proceeds using an outer iterator over the paths and an inner iterator for the current CellVector.
+ * The outer iterator uses a separate path list that gets sorted when the 'InMemoryIterator' is
+ * initialized. The inner iterator directly traverses the CellVector, which is already sorted.
+ */
+class ColumnStoreSorter::InMemoryIterator final : public ColumnStoreSorter::Iterator {
+public:
+ InMemoryIterator(const StringMap<CellVector>& dataByPath) {
+ // Cells loaded into memory are sorted by record id but now yet by path. Sorting by path
+ // finalizes the sort algorithm.
+ _sortedPathList.reserve(dataByPath.size());
+ for (const auto& pathWithCellVector : dataByPath) {
+ _sortedPathList.push_back(&pathWithCellVector);
+ }
+ std::sort(_sortedPathList.begin(), _sortedPathList.end(), [](auto left, auto right) {
+ return left->first < right->first;
+ });
+
+ _pathIt = _sortedPathList.begin();
+ if (_pathIt != _sortedPathList.end()) {
+ _cellVectorIt = (*_pathIt)->second.begin();
+ }
+ }
+
+ bool more() final {
+ return _pathIt != _sortedPathList.end();
+ }
+
+ std::pair<Key, Value> next() final {
+ Key key{(*_pathIt)->first, _cellVectorIt->first};
+
+ Value contents{_cellVectorIt->second};
+
+ ++_cellVectorIt;
+ while (_cellVectorIt == (*_pathIt)->second.end() && ++_pathIt != _sortedPathList.end()) {
+ _cellVectorIt = (*_pathIt)->second.begin();
+ }
+
+ return {key, contents};
+ }
+
+ const std::pair<Key, Value>& current() final {
+ tasserted(ErrorCodes::NotImplemented,
+ "current() not implemented for ColumnStoreSorter::Iterator");
+ }
+
+ void openSource() final {}
+
+ void closeSource() final {}
+
+private:
+ std::vector<const StringMap<CellVector>::value_type*> _sortedPathList;
+
+ decltype(_sortedPathList)::const_iterator _pathIt;
+ CellVector::const_iterator _cellVectorIt;
+};
+
+ColumnStoreSorter::Iterator* ColumnStoreSorter::inMemoryIterator() const {
+ return new InMemoryIterator(_dataByPath);
+}
+} // namespace mongo
+
+namespace {
+/**
+ * A 'nextFilename()' is required for the below "sorter.cpp" include to compile, but this file does
+ * not use any of the 'Sorter' classes that call it.
+ */
+std::string nextFileName() {
+ MONGO_UNREACHABLE;
+}
+} // namespace
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT
+#include "mongo/db/sorter/sorter.cpp"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex
+MONGO_CREATE_SORTER(mongo::ColumnStoreSorter::Key,
+ mongo::ColumnStoreSorter::Value,
+ mongo::ComparisonForPathAndRid);
diff --git a/src/mongo/db/index/column_store_sorter.h b/src/mongo/db/index/column_store_sorter.h
new file mode 100644
index 00000000000..833213b3646
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter.h
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/record_id.h"
+#include "mongo/db/sorter/sorter.h"
+#include "mongo/db/storage/column_store.h"
+
+namespace mongo {
+/**
+ * Performs the organizing and sorting steps of a column store index bulk build, presenting an
+ * interface similar to the 'Sorter' interface. The client can add cells with the 'add()' method
+ * until none remain and then call 'done()' to get an iterator that returns the cells in sorted
+ * order.
+ *
+ * This class assumes that inputs are _already sorted_ by RecordId. Adding out-of-orders cells will
+ * result in undefined behavior.
+ *
+ * Internally, this class maintains a hash table that maps each path to a sorted list of
+ * (RecordId, CellValue) pairs. Because we use a hash table and not a sorted data structure (like
+ * std::map), we need to sort the list of paths when finalizing the output or when writing a spill
+ * file. The total number of cells inserted into this mapping is potentially very large, making it
+ * preferable to defer the cost of sorting to the end in order to avoid the cost of a binary tree
+ * traversal for each inserted cell.
+ */
+class ColumnStoreSorter {
+public:
+ ColumnStoreSorter(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats);
+
+ void add(PathView path, RecordId recordId, CellView cellContents);
+
+ size_t numSpills() const {
+ return _numSpills;
+ }
+
+ struct Key {
+ PathView path;
+ RecordId recordId;
+
+ struct SorterDeserializeSettings {};
+
+ bool operator<(const Key& other) const;
+ void serializeForSorter(BufBuilder& buf) const;
+
+ // Assumes that the source buffer will remain valid for the lifetime of the returned
+ // ColumnStoreSorter::Key object.
+ static Key deserializeForSorter(BufReader& buf, SorterDeserializeSettings);
+
+ size_t memUsageForSorter() const {
+ return sizeof(path) + path.size() + recordId.memUsage();
+ }
+
+ Key getOwned() const {
+ MONGO_UNREACHABLE;
+ }
+ };
+
+ struct Value {
+ CellView cell;
+
+ struct SorterDeserializeSettings {};
+
+ void serializeForSorter(BufBuilder& buf) const;
+ static Value deserializeForSorter(BufReader& buf, SorterDeserializeSettings);
+
+ size_t memUsageForSorter() const {
+ return sizeof(cell) + cell.size();
+ }
+
+ Value getOwned() const {
+ MONGO_UNREACHABLE;
+ }
+ };
+
+ using Iterator = SortIteratorInterface<Key, Value>;
+
+ Iterator* done();
+
+private:
+ class InMemoryIterator;
+
+ static SortOptions makeSortOptions(const std::string& dbName, SorterFileStats* stats);
+ static std::string pathForNewSpillFile();
+
+ void spill();
+
+ Iterator* inMemoryIterator() const;
+
+ const std::string _dbName;
+ SorterFileStats* _stats; // Unowned
+
+ const size_t _maxMemoryUsageBytes;
+ size_t _memUsed = 0;
+ size_t _numSpills = 0;
+
+ /**
+ * Mapping from path name to the sorted list of (RecordId, Cell) pairs.
+ */
+ using CellVector = std::vector<std::pair<RecordId, CellValue>>;
+ StringMap<CellVector> _dataByPath;
+
+ std::shared_ptr<Sorter<Key, Value>::File> _spillFile;
+ std::vector<std::shared_ptr<Iterator>> _spilledFileIterators;
+
+ bool _done = false;
+};
+} // namespace mongo
diff --git a/src/mongo/db/index/column_store_sorter_test.cpp b/src/mongo/db/index/column_store_sorter_test.cpp
new file mode 100644
index 00000000000..4b407bc1211
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter_test.cpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/index/column_store_sorter.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+TEST(ColumnStoreSorter, SortTest) {
+ // Each entry of the top-level vector contains the field names of a sample document whose
+ // RecordId is the entry index. No field values are included in this sample data.
+ std::vector<std::vector<std::string>> sampleData = {{"foo", "bar", "foo.bar", "bar.bar"},
+ {"bar", "foo", "bar.bar"},
+ {"bar.bar", "foo.bar", "bar", "foo"},
+ {"bar", "foo.bar", "baz"},
+ {"foo.bar", "bar", "foo"}};
+
+ // The output of sorting the 'sampleData' field names by (Field name, RecordId).
+ std::vector<std::pair<StringData, int64_t>> sortedData = {{"bar", 0},
+ {"bar", 1},
+ {"bar", 2},
+ {"bar", 3},
+ {"bar", 4},
+ {"bar.bar", 0},
+ {"bar.bar", 1},
+ {"bar.bar", 2},
+ {"baz", 3},
+ {"foo", 0},
+ {"foo", 1},
+ {"foo", 2},
+ {"foo", 4},
+ {"foo.bar", 0},
+ {"foo.bar", 2},
+ {"foo.bar", 3},
+ {"foo.bar", 4}};
+
+ // ColumnStoreSorter uses the dbpath to store its spill files.
+ ON_BLOCK_EXIT(
+ [oldDbPath = storageGlobalParams.dbpath]() { storageGlobalParams.dbpath = oldDbPath; });
+ unittest::TempDir tempDir("columnStoreSorterTests");
+ storageGlobalParams.dbpath = tempDir.path();
+
+ // We test two sorters: one that can perform the sort in memory and one that is constrained so
+ // that it must spill to disk.
+
+ SorterFileStats statsForInMemorySorter;
+ auto inMemorySorter = std::make_unique<ColumnStoreSorter>(
+ 1000000 /* maxMemoryUsageBytes */, "dbName", &statsForInMemorySorter);
+
+ SorterFileStats statsForExternalSorter;
+ auto externalSorter = std::make_unique<ColumnStoreSorter>(
+ 500 /* maxMemoryUsageBytes */, "dbName", &statsForExternalSorter);
+
+ // First, load documents into each sorter.
+ for (size_t i = 0; i < sampleData.size(); ++i) {
+ for (auto&& fieldName : sampleData[i]) {
+ // Synthesize cell contents based on the field name and RecordId, so that we can test
+ // that cell contents travel with the (Field name, RecordId) key. The null-byte
+ // delimiter tests that the sorter correctly stores cells with internal null bytes.
+ std::string cell = str::stream() << fieldName << "\0" << i;
+ inMemorySorter->add(fieldName, RecordId(i), cell);
+ externalSorter->add(fieldName, RecordId(i), cell);
+ }
+ }
+
+ // Now sort, iterate the sorted output, and ensure it matches the expected output.
+ std::unique_ptr<ColumnStoreSorter::Iterator> sortedItInMemory(inMemorySorter->done());
+ std::unique_ptr<ColumnStoreSorter::Iterator> sortedItExternal(externalSorter->done());
+ for (auto&& expected : sortedData) {
+ std::string expectedCell = str::stream() << expected.first << "\0" << expected.second;
+
+ {
+ ASSERT(sortedItInMemory->more());
+ auto [columnKey, columnValue] = sortedItInMemory->next();
+
+ ASSERT_EQ(expected.first, columnKey.path);
+ ASSERT_EQ(RecordId(expected.second), columnKey.recordId);
+ ASSERT_EQ(expectedCell, columnValue.cell);
+ }
+
+ {
+ ASSERT(sortedItExternal->more());
+ auto [columnKey, columnValue] = sortedItExternal->next();
+
+ ASSERT_EQ(expected.first, columnKey.path);
+ ASSERT_EQ(RecordId(expected.second), columnKey.recordId);
+ ASSERT_EQ(expectedCell, columnValue.cell);
+ }
+ }
+ ASSERT(!sortedItInMemory->more());
+ ASSERT(!sortedItExternal->more());
+
+ sortedItInMemory.reset();
+ sortedItExternal.reset();
+
+ // Ensure that statistics for spills and file accesses are as expected.
+ ASSERT_EQ(0, inMemorySorter->numSpills());
+ ASSERT_EQ(4, externalSorter->numSpills());
+
+ ASSERT_EQ(0, statsForInMemorySorter.opened.load());
+ ASSERT_EQ(0, statsForInMemorySorter.closed.load());
+
+ // The external sorter has opened its spill file but will not close and delete it until it is
+ // destroyed.
+ ASSERT_EQ(1, statsForExternalSorter.opened.load());
+ ASSERT_EQ(0, statsForExternalSorter.closed.load());
+
+ inMemorySorter.reset();
+ externalSorter.reset();
+
+ ASSERT_EQ(0, statsForInMemorySorter.closed.load());
+ ASSERT_EQ(1, statsForExternalSorter.closed.load());
+}
+} // namespace mongo
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp
index 87501f61cd5..71d4c996aa7 100644
--- a/src/mongo/db/index/columns_access_method.cpp
+++ b/src/mongo/db/index/columns_access_method.cpp
@@ -35,9 +35,11 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/column_cell.h"
#include "mongo/db/index/column_key_generator.h"
+#include "mongo/db/index/column_store_sorter.h"
#include "mongo/logv2/log.h"
#include "mongo/util/progress_meter.h"
@@ -96,23 +98,29 @@ public:
private:
ColumnStoreAccessMethod* const _columnsAccess;
- // For now we'll just collect all the docs to insert before inserting them.
- // TODO SERVER-65481 Do an actual optimized bulk insert with sorting.
- std::list<BSONObj> _ownedObjects;
- std::vector<BsonRecord> _deferredInserts;
+
+ ColumnStoreSorter _sorter;
+ BufBuilder _cellBuilder;
+
int64_t _keysInserted = 0;
};
ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index,
size_t maxMemoryUsageBytes,
StringData dbName)
- : _columnsAccess(index) {}
+ : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) {
+ countNewBuildInStats();
+}
ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index,
size_t maxMemoryUsageBytes,
const IndexStateInfo& stateInfo,
StringData dbName)
- : _columnsAccess(index) {}
+ : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) {
+ countResumedBuildInStats();
+ // TODO SERVER-66925: Add this support.
+ tasserted(6548103, "No support for resuming interrupted columnstore index builds.");
+}
Status ColumnStoreAccessMethod::BulkBuilder::insert(
OperationContext* opCtx,
@@ -123,15 +131,20 @@ Status ColumnStoreAccessMethod::BulkBuilder::insert(
const InsertDeleteOptions& options,
const std::function<void()>& saveCursorBeforeWrite,
const std::function<void()>& restoreCursorAfterWrite) {
- // TODO SERVER-65481 Do an actual optimized bulk insert with sorting.
- _ownedObjects.push_back(obj.getOwned());
- BsonRecord record;
- record.docPtr = &_ownedObjects.back();
- record.id = rid;
- _deferredInserts.push_back(record);
+ column_keygen::visitCellsForInsert(
+ obj, [&](PathView path, const column_keygen::UnencodedCellView& cell) {
+ _cellBuilder.reset();
+ writeEncodedCell(cell, &_cellBuilder);
+ _sorter.add(path, rid, CellView(_cellBuilder.buf(), _cellBuilder.len()));
+
+ ++_keysInserted;
+ });
+
return Status::OK();
}
+// The "multikey" property does not apply to columnstore indexes, because the array key does not
+// represent a field in a document and
const MultikeyPaths& ColumnStoreAccessMethod::BulkBuilder::getMultikeyPaths() const {
const static MultikeyPaths empty;
return empty;
@@ -156,21 +169,73 @@ Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx,
int32_t yieldIterations,
const KeyHandlerFn& onDuplicateKeyInserted,
const RecordIdHandlerFn& onDuplicateRecord) {
- static constexpr size_t kBufferBlockSize = 1024;
- SharedBufferFragmentBuilder pooledBufferBuilder(kBufferBlockSize);
-
- WriteUnitOfWork wunit(opCtx);
- auto status = _columnsAccess->insert(opCtx,
- pooledBufferBuilder,
- collection,
- _deferredInserts,
- InsertDeleteOptions{},
- &_keysInserted);
- if (!status.isOK()) {
- return status;
+ Timer timer;
+
+ auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx);
+
+ static constexpr char message[] =
+ "Index Build: inserting keys from external sorter into columnstore index";
+ ProgressMeterHolder pm;
+ {
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
+ pm.set(
+ CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */));
+ }
+
+ auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx);
+
+ int64_t iterations = 0;
+ boost::optional<ColumnStoreSorter::Key> previousKey;
+ std::unique_ptr<ColumnStoreSorter::Iterator> it(_sorter.done());
+ while (it->more()) {
+ opCtx->checkForInterrupt();
+
+ auto columnStoreKeyWithValue = it->next();
+
+ // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing
+ // order.
+ if (kDebugBuild) {
+ if (previousKey && !(*previousKey < columnStoreKeyWithValue.first)) {
+ LOGV2_FATAL_NOTRACE(6548100,
+ "Out-of-order result from sorter for column store bulk loader",
+ "prevPathName"_attr = previousKey->path,
+ "prevRecordId"_attr = previousKey->recordId,
+ "nextPathName"_attr = columnStoreKeyWithValue.first.path,
+ "nextRecordId"_attr = columnStoreKeyWithValue.first.recordId,
+ "index"_attr = _columnsAccess->_descriptor->indexName());
+ }
+ previousKey = columnStoreKeyWithValue.first;
+ }
+
+ try {
+ writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] {
+ WriteUnitOfWork wunit(opCtx);
+ auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue;
+ builder->addCell(
+ columnStoreKey.path, columnStoreKey.recordId, columnStoreValue.cell);
+ wunit.commit();
+ });
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
+
+ // Yield locks every 'yieldIterations' key insertions.
+ if (++iterations % yieldIterations == 0) {
+ yield(opCtx, &collection, ns);
+ }
+
+ pm.hit();
}
- wunit.commit();
+ pm.finished();
+
+ LOGV2(6548101,
+ "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace "
+ "{namespace} in {duration} seconds",
+ "keysInserted"_attr = _keysInserted,
+ "index"_attr = _columnsAccess->_descriptor->indexName(),
+ logAttrs(ns),
+ "duration"_attr = Seconds(timer.seconds()));
return Status::OK();
}
@@ -185,7 +250,7 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
auto cursor = _store->newWriteCursor(opCtx);
column_keygen::visitCellsForInsert(
bsonRecords,
- [&](StringData path,
+ [&](PathView path,
const BsonRecord& rec,
const column_keygen::UnencodedCellView& cell) {
if (!rec.ts.isNull()) {
@@ -214,7 +279,7 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx,
int64_t* keysDeletedOut,
CheckRecordId checkRecordId) {
auto cursor = _store->newWriteCursor(opCtx);
- column_keygen::visitPathsForDelete(obj, [&](StringData path) {
+ column_keygen::visitPathsForDelete(obj, [&](PathView path) {
cursor->remove(path, rid);
inc(keysDeletedOut);
});
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 150558bfb69..0144e00d973 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -50,7 +50,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/timestamp_block.h"
-#include "mongo/db/sorter/sorter.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
@@ -128,12 +127,12 @@ bool isMultikeyFromPaths(const MultikeyPaths& multikeyPaths) {
[](const MultikeyComponents& components) { return !components.empty(); });
}
-SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) {
+SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats) {
return SortOptions()
.TempDir(storageGlobalParams.dbpath + "/_tmp")
.ExtSortAllowed()
.MaxMemoryUsageBytes(maxMemoryUsageBytes)
- .FileStats(&indexBulkBuilderSSS.sorterFileStats)
+ .FileStats(stats)
.DBName(dbName.toString());
}
@@ -610,6 +609,51 @@ Ident* SortedDataIndexAccessMethod::getIdentPtr() const {
return this->_newInterface.get();
}
+void IndexAccessMethod::BulkBuilder::countNewBuildInStats() {
+ indexBulkBuilderSSS.count.addAndFetch(1);
+}
+
+void IndexAccessMethod::BulkBuilder::countResumedBuildInStats() {
+ indexBulkBuilderSSS.count.addAndFetch(1);
+ indexBulkBuilderSSS.resumed.addAndFetch(1);
+}
+
+SorterFileStats* IndexAccessMethod::BulkBuilder::bulkBuilderFileStats() {
+ return &indexBulkBuilderSSS.sorterFileStats;
+}
+
+void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx,
+ const Yieldable* yieldable,
+ const NamespaceString& ns) {
+ // Releasing locks means a new snapshot should be acquired when restored.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ yieldable->yield();
+
+ auto locker = opCtx->lockState();
+ Locker::LockSnapshot snapshot;
+ if (locker->saveLockStateAndUnlock(&snapshot)) {
+
+ // Track the number of yields in CurOp.
+ CurOp::get(opCtx)->yielded();
+
+ auto failPointHang = [opCtx, &ns](FailPoint* fp) {
+ fp->executeIf(
+ [fp](auto&&) {
+ LOGV2(5180600, "Hanging index build during bulk load yield");
+ fp->pauseWhileSet();
+ },
+ [opCtx, &ns](auto&& config) {
+ return config.getStringField("namespace") == ns.ns();
+ });
+ };
+ failPointHang(&hangDuringIndexBuildBulkLoadYield);
+ failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond);
+
+ locker->restoreLockState(opCtx, snapshot);
+ }
+ yieldable->restore();
+}
+
class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder {
public:
using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>;
@@ -646,9 +690,6 @@ public:
IndexStateInfo persistDataForShutdown() final;
private:
- void _yield(OperationContext* opCtx,
- const Yieldable* yieldable,
- const NamespaceString& ns) const;
void _insertMultikeyMetadataKeysIntoSorter();
Sorter* _makeSorter(
@@ -689,7 +730,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc
size_t maxMemoryUsageBytes,
StringData dbName)
: _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) {
- indexBulkBuilderSSS.count.addAndFetch(1);
+ countNewBuildInStats();
}
SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam,
@@ -702,8 +743,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc
_keysInserted(stateInfo.getNumKeys().value_or(0)),
_isMultiKey(stateInfo.getIsMultikey()),
_indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) {
- indexBulkBuilderSSS.count.addAndFetch(1);
- indexBulkBuilderSSS.resumed.addAndFetch(1);
+ countResumedBuildInStats();
}
Status SortedDataIndexAccessMethod::BulkBuilderImpl::insert(
@@ -825,46 +865,16 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter(
StringData dbName,
boost::optional<StringData> fileName,
const boost::optional<std::vector<SorterRange>>& ranges) const {
- return fileName ? Sorter::makeFromExistingRanges(fileName->toString(),
- *ranges,
- makeSortOptions(maxMemoryUsageBytes, dbName),
- BtreeExternalSortComparison(),
- _makeSorterSettings())
- : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName),
- BtreeExternalSortComparison(),
- _makeSorterSettings());
-}
-
-void SortedDataIndexAccessMethod::BulkBuilderImpl::_yield(OperationContext* opCtx,
- const Yieldable* yieldable,
- const NamespaceString& ns) const {
- // Releasing locks means a new snapshot should be acquired when restored.
- opCtx->recoveryUnit()->abandonSnapshot();
- yieldable->yield();
-
- auto locker = opCtx->lockState();
- Locker::LockSnapshot snapshot;
- if (locker->saveLockStateAndUnlock(&snapshot)) {
-
- // Track the number of yields in CurOp.
- CurOp::get(opCtx)->yielded();
-
- auto failPointHang = [opCtx, &ns](FailPoint* fp) {
- fp->executeIf(
- [fp](auto&&) {
- LOGV2(5180600, "Hanging index build during bulk load yield");
- fp->pauseWhileSet();
- },
- [opCtx, &ns](auto&& config) {
- return config.getStringField("namespace") == ns.ns();
- });
- };
- failPointHang(&hangDuringIndexBuildBulkLoadYield);
- failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond);
-
- locker->restoreLockState(opCtx, snapshot);
- }
- yieldable->restore();
+ return fileName
+ ? Sorter::makeFromExistingRanges(
+ fileName->toString(),
+ *ranges,
+ makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings())
+ : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings());
}
Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit(
@@ -979,7 +989,7 @@ Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit(
// Starts yielding locks after the first non-zero 'yieldIterations' inserts.
if (yieldIterations && (i + 1) % yieldIterations == 0) {
- _yield(opCtx, &collection, ns);
+ yield(opCtx, &collection, ns);
}
// If we're here either it's a dup and we're cool with it or the addKey went just fine.
diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h
index 3718040885d..7e3a4dd504a 100644
--- a/src/mongo/db/index/index_access_method.h
+++ b/src/mongo/db/index/index_access_method.h
@@ -40,6 +40,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/sorter/sorter.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/db/yieldable.h"
@@ -219,6 +220,19 @@ public:
* Persists on disk the keys that have been inserted using this BulkBuilder.
*/
virtual IndexStateInfo persistDataForShutdown() = 0;
+
+ protected:
+ static void countNewBuildInStats();
+ static void countResumedBuildInStats();
+ static SorterFileStats* bulkBuilderFileStats();
+
+ /**
+ * Abandon the current snapshot and release then reacquire locks. Tests that target the
+ * behavior of bulk index builds that yield can use failpoints to stall this yield.
+ */
+ static void yield(OperationContext* opCtx,
+ const Yieldable* yieldable,
+ const NamespaceString& ns);
};
/**
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index c23c2695afd..ecb9e10118e 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -1291,11 +1291,11 @@ void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value&
addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum);
if (_buffer.len() > static_cast<int>(kSortedFileBufferSize))
- spill();
+ writeChunk();
}
template <typename Key, typename Value>
-void SortedFileWriter<Key, Value>::spill() {
+void SortedFileWriter<Key, Value>::writeChunk() {
int32_t size = _buffer.len();
char* outBuffer = _buffer.buf();
@@ -1340,7 +1340,7 @@ void SortedFileWriter<Key, Value>::spill() {
template <typename Key, typename Value>
SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
- spill();
+ writeChunk();
return new sorter::FileIterator<Key, Value>(
_file, _fileStartOffset, _file->currentOffset(), _settings, _dbName, _checksum);
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 12bccee1178..85c0f279ac2 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -592,16 +592,23 @@ public:
void addAlreadySorted(const Key&, const Value&);
/**
- * Spills any data remaining in the buffer to disk and then closes the file to which data was
+ * Writes any data remaining in the buffer to disk and then closes the file to which data was
* written.
*
* No more data can be added via addAlreadySorted() after calling done().
*/
Iterator* done();
-private:
- void spill();
+ /**
+ * The SortedFileWriter organizes data into chunks, with a chunk getting written to the output
+ * file when it exceends a maximum chunks size. A SortedFilerWriter client can produce a short
+ * chunk by manually calling this function.
+ *
+ * If no new data has been added since the last chunk was written, this function is a no-op.
+ */
+ void writeChunk();
+private:
const Settings _settings;
std::shared_ptr<typename Sorter<Key, Value>::File> _file;
BufBuilder _buffer;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
index e9bc642dfb8..e9000044126 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
@@ -393,28 +393,26 @@ std::unique_ptr<ColumnStore::Cursor> WiredTigerColumnStore::newCursor(
class WiredTigerColumnStore::BulkBuilder final : public ColumnStore::BulkBuilder {
public:
BulkBuilder(WiredTigerColumnStore* idx, OperationContext* opCtx)
- : _opCtx(opCtx),
- _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()),
- _cursor(openBulkCursor(idx)) {}
-
- ~BulkBuilder() {
- _cursor->close(_cursor);
- }
+ : _opCtx(opCtx), _cursor(idx->uri(), opCtx) {}
void addCell(PathView path, const RecordId& rid, CellView cell) override {
- uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder");
- }
+ const std::string& key = makeKey(_buffer, path, rid);
+ WiredTigerItem keyItem(key.c_str(), key.size());
+ _cursor->set_key(_cursor.get(), keyItem.Get());
-private:
- WT_CURSOR* openBulkCursor(WiredTigerColumnStore* idx) {
- // TODO SERVER-65484: Much of this logic can be shared with standard WT index.
- uasserted(ErrorCodes::NotImplemented, "WiredTigerColumnStore bulk builder");
+ WiredTigerItem cellItem(cell.rawData(), cell.size());
+ _cursor->set_value(_cursor.get(), cellItem.Get());
+
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
+
+ ResourceConsumption::MetricsCollector::get(_opCtx).incrementOneIdxEntryWritten(
+ std::string(_cursor->uri), keyItem.size);
}
+private:
std::string _buffer;
OperationContext* const _opCtx;
- UniqueWiredTigerSession const _session;
- WT_CURSOR* const _cursor;
+ WiredTigerBulkLoadCursor _cursor;
};
std::unique_ptr<ColumnStore::BulkBuilder> WiredTigerColumnStore::makeBulkBuilder(
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
index 188983bf071..d5704989b20 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp
@@ -82,4 +82,32 @@ WiredTigerCursor::~WiredTigerCursor() {
void WiredTigerCursor::reset() {
invariantWTOK(_cursor->reset(_cursor), _cursor->session);
}
+
+WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(const std::string& indexUri,
+ OperationContext* opCtx)
+ : _session(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->getSession()) {
+ // Open cursors can cause bulk open_cursor to fail with EBUSY.
+ // TODO any other cases that could cause EBUSY?
+ WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(opCtx)->getSession();
+ outerSession->closeAllCursors(indexUri);
+
+ // The 'checkpoint_wait=false' option is set to prefer falling back on the "non-bulk" cursor
+ // over waiting a potentially long time for a checkpoint.
+ WT_SESSION* sessionPtr = _session->getSession();
+ int err = sessionPtr->open_cursor(
+ sessionPtr, indexUri.c_str(), nullptr, "bulk,checkpoint_wait=false", &_cursor);
+ if (!err) {
+ return; // Success
+ }
+
+ LOGV2_WARNING(51783,
+ "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk "
+ "cursor for index {index}",
+ "Failed to create WiredTiger bulk cursor, falling back to non-bulk",
+ "error"_attr = wiredtiger_strerror(err),
+ "index"_attr = indexUri);
+
+ invariantWTOK(sessionPtr->open_cursor(sessionPtr, indexUri.c_str(), nullptr, nullptr, &_cursor),
+ sessionPtr);
+}
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
index b31f91465cf..716226e45a9 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h
@@ -81,4 +81,31 @@ protected:
WT_CURSOR* _cursor = nullptr; // Owned
};
+
+/**
+ * An owning object wrapper for a WT_SESSION and WT_CURSOR configured for bulk loading when
+ * possible. The cursor is created and closed independently of the cursor cache, which does not
+ * store bulk cursors. It uses its own session to avoid hijacking an existing transaction in the
+ * current session.
+ */
+class WiredTigerBulkLoadCursor {
+public:
+ WiredTigerBulkLoadCursor(const std::string& indexUri, OperationContext* opCtx);
+
+ ~WiredTigerBulkLoadCursor() {
+ _cursor->close(_cursor);
+ }
+
+ WT_CURSOR* get() const {
+ return _cursor;
+ }
+
+ WT_CURSOR* operator->() const {
+ return get();
+ }
+
+private:
+ UniqueWiredTigerSession const _session;
+ WT_CURSOR* _cursor = nullptr; // Owned
+};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index 69c422c27d5..16870b4c7d7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -692,54 +692,16 @@ RecordId WiredTigerIndex::_decodeRecordIdAtEnd(const void* buffer, size_t size)
class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface {
public:
BulkBuilder(WiredTigerIndex* idx, OperationContext* opCtx)
- : _ordering(idx->_ordering),
- _opCtx(opCtx),
- _session(WiredTigerRecoveryUnit::get(_opCtx)->getSessionCache()->getSession()),
- _cursor(openBulkCursor(idx)) {}
-
- ~BulkBuilder() {
- _cursor->close(_cursor);
- }
+ : _ordering(idx->_ordering), _opCtx(opCtx), _cursor(idx->uri(), _opCtx) {}
protected:
- WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) {
- // Open cursors can cause bulk open_cursor to fail with EBUSY.
- // TODO any other cases that could cause EBUSY?
- WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_opCtx)->getSession();
- outerSession->closeAllCursors(idx->uri());
-
- // Not using cursor cache since we need to set "bulk".
- WT_CURSOR* cursor;
- // Use a different session to ensure we don't hijack an existing transaction.
- // Configure the bulk cursor open to fail quickly if it would wait on a checkpoint
- // completing - since checkpoints can take a long time, and waiting can result in
- // an unexpected pause in building an index.
- WT_SESSION* session = _session->getSession();
- int err = session->open_cursor(
- session, idx->uri().c_str(), nullptr, "bulk,checkpoint_wait=false", &cursor);
- if (!err)
- return cursor;
-
- LOGV2_WARNING(51783,
- "failed to create WiredTiger bulk cursor: {error} falling back to non-bulk "
- "cursor for index {index}",
- "Failed to create WiredTiger bulk cursor, falling back to non-bulk",
- "error"_attr = wiredtiger_strerror(err),
- "index"_attr = idx->uri());
-
- invariantWTOK(session->open_cursor(session, idx->uri().c_str(), nullptr, nullptr, &cursor),
- session);
- return cursor;
- }
-
void setKey(WT_CURSOR* cursor, const WT_ITEM* item) {
cursor->set_key(cursor, item);
}
const Ordering _ordering;
OperationContext* const _opCtx;
- UniqueWiredTigerSession const _session;
- WT_CURSOR* const _cursor;
+ WiredTigerBulkLoadCursor _cursor;
};
@@ -756,16 +718,16 @@ public:
// Can't use WiredTigerCursor since we aren't using the cache.
WiredTigerItem item(keyString.getBuffer(), keyString.getSize());
- setKey(_cursor, item.Get());
+ setKey(_cursor.get(), item.Get());
const KeyString::TypeBits typeBits = keyString.getTypeBits();
WiredTigerItem valueItem = typeBits.isAllZeros()
? emptyItem
: WiredTigerItem(typeBits.getBuffer(), typeBits.getSize());
- _cursor->set_value(_cursor, valueItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), item.size);
@@ -824,16 +786,16 @@ public:
// Can't use WiredTigerCursor since we aren't using the cache.
WiredTigerItem keyItem(newKeyString.getBuffer(), newKeyString.getSize());
- setKey(_cursor, keyItem.Get());
+ setKey(_cursor.get(), keyItem.Get());
const KeyString::TypeBits typeBits = newKeyString.getTypeBits();
WiredTigerItem valueItem = typeBits.isAllZeros()
? emptyItem
: WiredTigerItem(typeBits.getBuffer(), typeBits.getSize());
- _cursor->set_value(_cursor, valueItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size);
@@ -882,10 +844,10 @@ public:
WiredTigerItem keyItem(newKeyString.getBuffer(), sizeWithoutRecordId);
WiredTigerItem valueItem(value.getBuffer(), value.getSize());
- setKey(_cursor, keyItem.Get());
- _cursor->set_value(_cursor, valueItem.Get());
+ setKey(_cursor.get(), keyItem.Get());
+ _cursor->set_value(_cursor.get(), valueItem.Get());
- invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor), _cursor->session);
+ invariantWTOK(wiredTigerCursorInsert(_opCtx, _cursor.get()), _cursor->session);
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(std::string(_cursor->uri), keyItem.size);
diff --git a/src/mongo/util/bufreader.h b/src/mongo/util/bufreader.h
index 8c30070bada..7b9f8a1c5a9 100644
--- a/src/mongo/util/bufreader.h
+++ b/src/mongo/util/bufreader.h
@@ -124,6 +124,14 @@ public:
s = readCStr().toString();
}
+ /**
+ * Return a view of the next len bytes and advance by len.
+ */
+ StringData readBytes(size_t len) {
+ // Note: the call to skip() includes a check that at least 'len' bytes remain in the buffer.
+ return StringData(reinterpret_cast<const char*>(skip(len)), len);
+ }
+
const void* pos() {
return _pos;
}