summaryrefslogtreecommitdiff
path: root/src/mongo/db/index
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2022-06-29 12:32:12 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-29 20:02:02 +0000
commitcb9472afc30d32d1c18691d64899c1aa72cdc43d (patch)
tree924020b6f50ccf365926a2420c64d92e5497a34a /src/mongo/db/index
parentf684488a8509f0a3764fa48221499b9cd508c0e1 (diff)
downloadmongo-cb9472afc30d32d1c18691d64899c1aa72cdc43d.tar.gz
SERVER-65481 Bulk shredding and loading for column store indexes
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r--src/mongo/db/index/SConscript43
-rw-r--r--src/mongo/db/index/column_store_sorter.cpp297
-rw-r--r--src/mongo/db/index/column_store_sorter.h134
-rw-r--r--src/mongo/db/index/column_store_sorter_test.cpp140
-rw-r--r--src/mongo/db/index/columns_access_method.cpp119
-rw-r--r--src/mongo/db/index/index_access_method.cpp110
-rw-r--r--src/mongo/db/index/index_access_method.h14
7 files changed, 764 insertions, 93 deletions
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 9ccff04ff89..f71d78da857 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -84,9 +84,9 @@ env.Library(
],
)
-serveronlyEnv = env.Clone()
-serveronlyEnv.InjectThirdParty(libraries=['snappy'])
-serveronlyEnv.Library(
+iamEnv = env.Clone()
+iamEnv.InjectThirdParty(libraries=['snappy'])
+iamEnv.Library(
target="index_access_method",
source=[
'duplicate_key_tracker.cpp',
@@ -119,6 +119,22 @@ serveronlyEnv.Library(
],
)
+iamEnv.Library(
+ target="column_store_index",
+ source=[
+ 'column_cell.cpp',
+ "column_store_sorter.cpp",
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
+ '$BUILD_DIR/mongo/db/storage/encryption_hooks',
+ '$BUILD_DIR/mongo/db/storage/storage_options',
+ '$BUILD_DIR/mongo/s/is_mongos',
+ '$BUILD_DIR/third_party/shim_snappy',
+ ],
+)
+
env.Library(
target="index_access_methods",
source=[
@@ -137,31 +153,26 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
+ '$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/fts/base_fts',
'$BUILD_DIR/mongo/db/index_names',
- 'columnar_index',
+ 'column_store_index',
'expression_params',
'key_generator',
],
)
-env.Library(
- target="columnar_index",
- source=[
- 'column_cell.cpp',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/base',
- ],
-)
-
-env.CppUnitTest(
+indexTestEnv = env.Clone()
+indexTestEnv.InjectThirdParty(libraries=['snappy'])
+indexTestEnv.CppUnitTest(
target='db_index_test',
source=[
'2d_key_generator_test.cpp',
'btree_key_generator_test.cpp',
'column_cell_test.cpp',
'column_key_generator_test.cpp',
+ 'column_store_sorter_test.cpp',
'hash_key_generator_test.cpp',
's2_key_generator_test.cpp',
's2_bucket_key_generator_test.cpp',
@@ -177,7 +188,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/query/sort_pattern',
'$BUILD_DIR/mongo/db/record_id_helpers',
- 'columnar_index',
+ 'column_store_index',
'expression_params',
'key_generator',
],
diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp
new file mode 100644
index 00000000000..f708518a560
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter.cpp
@@ -0,0 +1,297 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/index/column_store_sorter.h"
+
+namespace mongo {
+struct ComparisonForPathAndRid {
+ int operator()(const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& left,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& right) const {
+ auto stringComparison = left.first.path.compare(right.first.path);
+ return (stringComparison != 0) ? stringComparison
+ : left.first.recordId.compare(right.first.recordId);
+ }
+};
+
+bool ColumnStoreSorter::Key::operator<(const Key& other) const {
+ if (auto cmp = path.compare(other.path); cmp != 0) {
+ return cmp < 0;
+ } else {
+ return recordId < other.recordId;
+ }
+}
+
+void ColumnStoreSorter::Key::serializeForSorter(BufBuilder& buf) const {
+ buf.appendStr(path);
+ recordId.serializeToken(buf);
+}
+
+ColumnStoreSorter::Key ColumnStoreSorter::Key::deserializeForSorter(
+ BufReader& buf, ColumnStoreSorter::Key::SorterDeserializeSettings) {
+ // Note: unlike function call parameters, the order of evaluation for initializer
+ // parameters is defined.
+ return {buf.readCStr(), RecordId::deserializeToken(buf)};
+}
+
+void ColumnStoreSorter::Value::serializeForSorter(BufBuilder& buf) const {
+ buf.appendNum(uint32_t(cell.size())); // Little-endian write
+ buf.appendBuf(cell.rawData(), cell.size());
+}
+
+ColumnStoreSorter::Value ColumnStoreSorter::Value::deserializeForSorter(
+ BufReader& buf, ColumnStoreSorter::Value::SorterDeserializeSettings) {
+ size_t cellSize = buf.read<LittleEndian<uint32_t>>();
+ return Value{buf.readBytes(cellSize)};
+}
+
+ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes,
+ StringData dbName,
+ SorterFileStats* stats)
+ : _dbName(dbName.toString()),
+ _stats(stats),
+ _maxMemoryUsageBytes(maxMemoryUsageBytes),
+ _spillFile(std::make_shared<Sorter<Key, Value>::File>(pathForNewSpillFile(), _stats)) {}
+
+void ColumnStoreSorter::add(PathView path, RecordId recordId, CellView cellContents) {
+ auto& cellListAtPath = _dataByPath[path];
+ if (cellListAtPath.empty()) {
+ // Track memory usage of this new path.
+ _memUsed += sizeof(StringMap<CellVector>::value_type) + path.size();
+ }
+
+ // The sorter assumes that RecordIds are added in sorted order.
+ tassert(6548102,
+ "Out-of-order record during columnar index build",
+ cellListAtPath.empty() || cellListAtPath.back().first < recordId);
+
+ cellListAtPath.emplace_back(recordId, CellValue(cellContents.rawData(), cellContents.size()));
+ _memUsed += cellListAtPath.back().first.memUsage() + sizeof(CellValue) +
+ cellListAtPath.back().second.size();
+ if (_memUsed > _maxMemoryUsageBytes) {
+ spill();
+ }
+}
+
+namespace {
+std::string tempDir() {
+ return str::stream() << storageGlobalParams.dbpath << "/_tmp";
+}
+} // namespace
+
+SortOptions ColumnStoreSorter::makeSortOptions(const std::string& dbName, SorterFileStats* stats) {
+ return SortOptions().TempDir(tempDir()).ExtSortAllowed().FileStats(stats).DBName(dbName);
+}
+
+std::string ColumnStoreSorter::pathForNewSpillFile() {
+ static AtomicWord<unsigned> fileNameCounter;
+ static const uint64_t randomSuffix = static_cast<uint64_t>(SecureRandom().nextInt64());
+ return str::stream() << tempDir() << "/ext-sort-column-store-index."
+ << fileNameCounter.fetchAndAdd(1) << "-" << randomSuffix;
+}
+
+void ColumnStoreSorter::spill() {
+ if (_dataByPath.empty()) {
+ return;
+ }
+ ++_numSpills;
+
+ SortedFileWriter<Key, Value> writer(makeSortOptions(_dbName, _stats), _spillFile, {});
+
+ // Cells loaded into memory are sorted by record id but not yet sorted by path. We perform that
+ // sort now, so that we can output cells sorted by (path, rid) for later consumption by our
+ // standard external merge implementation: SortIteratorInterface<Key, Value>::merge().
+ std::vector<const StringMap<CellVector>::value_type*> sortedPathList;
+ sortedPathList.reserve(_dataByPath.size());
+ for (auto&& pathWithCellVector : _dataByPath) {
+ sortedPathList.push_back(&pathWithCellVector);
+ }
+ std::sort(sortedPathList.begin(), sortedPathList.end(), [](auto left, auto right) {
+ return left->first < right->first;
+ });
+
+ size_t currentChunkSize = 0;
+ for (auto&& pathWithCellVector : sortedPathList) {
+ auto& [path, cellVector] = *pathWithCellVector;
+
+ size_t cellVectorSize = std::accumulate(
+ cellVector.begin(), cellVector.end(), 0, [& path = path](size_t sum, auto& ridAndCell) {
+ return sum + path.size() + ridAndCell.first.memUsage() + ridAndCell.second.size();
+ });
+
+ // Add (path, rid, cell) records to the spill file so that the first cell in each contiguous
+ // run of cells with the same path lives in its own chunk. E.g.:
+ // Path1, rid1, Cell contents
+ // CHUNK BOUNDARY
+ // Path1, rid2, Cell Contents
+ // ...
+ // Path1, ridN, Cell Contents
+ // CHUNK BOUNDARY
+ // Path2, rid1, Cell Contents
+ // CHUNK BOUNDARY
+ // Path2, rid2, Cell Contents
+ // ...
+ //
+ // During merging, file readers will hold one chunk from each spill file in memory, so
+ // optimizing chunk size can reduce memory usage during the merge. Merging for a column
+ // store index is a special case: because the sorter is loaded in RecordId order, all the
+ // cells from this spill are guaranteed to merge together, with no interleaving cells from
+ // other spill files.
+ //
+ // This layout will result in a merger that holds a single cell from each leg of the merge
+ // representing the first in a large contiguous range. Once that cell gets picked, the merge
+ // will consume all chunks at that path in that file before moving on to the next file or
+ // the next path.
+ //
+ // To avoid the pathological case where runs are very short, we don't force a chunk boundary
+ // when a run of cells would not result in a chunk greater than 1024 bytes.
+ const size_t kShortChunkThreshold = 1024;
+ bool writeBoundaryAfterAdd = (currentChunkSize + cellVectorSize) > kShortChunkThreshold;
+ if (writeBoundaryAfterAdd) {
+ // Add the chunk boundary just before the first cell with this path name.
+ writer.writeChunk();
+ currentChunkSize = 0;
+ }
+ for (auto ridAndCell : cellVector) {
+ const auto& cell = ridAndCell.second;
+ currentChunkSize += path.size() + ridAndCell.first.memUsage() + cell.size();
+ writer.addAlreadySorted(Key{path, ridAndCell.first},
+ Value{CellView{cell.c_str(), cell.size()}});
+
+ if (writeBoundaryAfterAdd) {
+ // Add the chunk boundary just after the first cell with this path name, giving it
+ // its own chunk.
+ writer.writeChunk();
+ writeBoundaryAfterAdd = false;
+ currentChunkSize = 0;
+ }
+ }
+ }
+
+ _spilledFileIterators.emplace_back(writer.done());
+
+ _dataByPath.clear();
+ _memUsed = 0;
+}
+
+ColumnStoreSorter::Iterator* ColumnStoreSorter::done() {
+ invariant(!std::exchange(_done, true));
+
+ if (_spilledFileIterators.size() == 0) {
+ return inMemoryIterator();
+ }
+
+ spill();
+ return SortIteratorInterface<Key, Value>::merge(
+ _spilledFileIterators, makeSortOptions(_dbName, _stats), ComparisonForPathAndRid());
+}
+
+/**
+ * This iterator "unwinds" our path -> CellVector mapping into sorted tuples of (path name,
+ * recordId, cell), with the path name and recordId bundled into a single "key." The unwinding
+ * proceeds using an outer iterator over the paths and an inner iterator for the current CellVector.
+ * The outer iterator uses a separate path list that gets sorted when the 'InMemoryIterator' is
+ * initialized. The inner iterator directly traverses the CellVector, which is already sorted.
+ */
+class ColumnStoreSorter::InMemoryIterator final : public ColumnStoreSorter::Iterator {
+public:
+ InMemoryIterator(const StringMap<CellVector>& dataByPath) {
+ // Cells loaded into memory are sorted by record id but now yet by path. Sorting by path
+ // finalizes the sort algorithm.
+ _sortedPathList.reserve(dataByPath.size());
+ for (const auto& pathWithCellVector : dataByPath) {
+ _sortedPathList.push_back(&pathWithCellVector);
+ }
+ std::sort(_sortedPathList.begin(), _sortedPathList.end(), [](auto left, auto right) {
+ return left->first < right->first;
+ });
+
+ _pathIt = _sortedPathList.begin();
+ if (_pathIt != _sortedPathList.end()) {
+ _cellVectorIt = (*_pathIt)->second.begin();
+ }
+ }
+
+ bool more() final {
+ return _pathIt != _sortedPathList.end();
+ }
+
+ std::pair<Key, Value> next() final {
+ Key key{(*_pathIt)->first, _cellVectorIt->first};
+
+ Value contents{_cellVectorIt->second};
+
+ ++_cellVectorIt;
+ while (_cellVectorIt == (*_pathIt)->second.end() && ++_pathIt != _sortedPathList.end()) {
+ _cellVectorIt = (*_pathIt)->second.begin();
+ }
+
+ return {key, contents};
+ }
+
+ const std::pair<Key, Value>& current() final {
+ tasserted(ErrorCodes::NotImplemented,
+ "current() not implemented for ColumnStoreSorter::Iterator");
+ }
+
+ void openSource() final {}
+
+ void closeSource() final {}
+
+private:
+ std::vector<const StringMap<CellVector>::value_type*> _sortedPathList;
+
+ decltype(_sortedPathList)::const_iterator _pathIt;
+ CellVector::const_iterator _cellVectorIt;
+};
+
+ColumnStoreSorter::Iterator* ColumnStoreSorter::inMemoryIterator() const {
+ return new InMemoryIterator(_dataByPath);
+}
+} // namespace mongo
+
+namespace {
+/**
+ * A 'nextFilename()' is required for the below "sorter.cpp" include to compile, but this file does
+ * not use any of the 'Sorter' classes that call it.
+ */
+std::string nextFileName() {
+ MONGO_UNREACHABLE;
+}
+} // namespace
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT
+#include "mongo/db/sorter/sorter.cpp"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex
+MONGO_CREATE_SORTER(mongo::ColumnStoreSorter::Key,
+ mongo::ColumnStoreSorter::Value,
+ mongo::ComparisonForPathAndRid);
diff --git a/src/mongo/db/index/column_store_sorter.h b/src/mongo/db/index/column_store_sorter.h
new file mode 100644
index 00000000000..833213b3646
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter.h
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/record_id.h"
+#include "mongo/db/sorter/sorter.h"
+#include "mongo/db/storage/column_store.h"
+
+namespace mongo {
+/**
+ * Performs the organizing and sorting steps of a column store index bulk build, presenting an
+ * interface similar to the 'Sorter' interface. The client can add cells with the 'add()' method
+ * until none remain and then call 'done()' to get an iterator that returns the cells in sorted
+ * order.
+ *
+ * This class assumes that inputs are _already sorted_ by RecordId. Adding out-of-orders cells will
+ * result in undefined behavior.
+ *
+ * Internally, this class maintains a hash table that maps each path to a sorted list of
+ * (RecordId, CellValue) pairs. Because we use a hash table and not a sorted data structure (like
+ * std::map), we need to sort the list of paths when finalizing the output or when writing a spill
+ * file. The total number of cells inserted into this mapping is potentially very large, making it
+ * preferable to defer the cost of sorting to the end in order to avoid the cost of a binary tree
+ * traversal for each inserted cell.
+ */
+class ColumnStoreSorter {
+public:
+ ColumnStoreSorter(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats);
+
+ void add(PathView path, RecordId recordId, CellView cellContents);
+
+ size_t numSpills() const {
+ return _numSpills;
+ }
+
+ struct Key {
+ PathView path;
+ RecordId recordId;
+
+ struct SorterDeserializeSettings {};
+
+ bool operator<(const Key& other) const;
+ void serializeForSorter(BufBuilder& buf) const;
+
+ // Assumes that the source buffer will remain valid for the lifetime of the returned
+ // ColumnStoreSorter::Key object.
+ static Key deserializeForSorter(BufReader& buf, SorterDeserializeSettings);
+
+ size_t memUsageForSorter() const {
+ return sizeof(path) + path.size() + recordId.memUsage();
+ }
+
+ Key getOwned() const {
+ MONGO_UNREACHABLE;
+ }
+ };
+
+ struct Value {
+ CellView cell;
+
+ struct SorterDeserializeSettings {};
+
+ void serializeForSorter(BufBuilder& buf) const;
+ static Value deserializeForSorter(BufReader& buf, SorterDeserializeSettings);
+
+ size_t memUsageForSorter() const {
+ return sizeof(cell) + cell.size();
+ }
+
+ Value getOwned() const {
+ MONGO_UNREACHABLE;
+ }
+ };
+
+ using Iterator = SortIteratorInterface<Key, Value>;
+
+ Iterator* done();
+
+private:
+ class InMemoryIterator;
+
+ static SortOptions makeSortOptions(const std::string& dbName, SorterFileStats* stats);
+ static std::string pathForNewSpillFile();
+
+ void spill();
+
+ Iterator* inMemoryIterator() const;
+
+ const std::string _dbName;
+ SorterFileStats* _stats; // Unowned
+
+ const size_t _maxMemoryUsageBytes;
+ size_t _memUsed = 0;
+ size_t _numSpills = 0;
+
+ /**
+ * Mapping from path name to the sorted list of (RecordId, Cell) pairs.
+ */
+ using CellVector = std::vector<std::pair<RecordId, CellValue>>;
+ StringMap<CellVector> _dataByPath;
+
+ std::shared_ptr<Sorter<Key, Value>::File> _spillFile;
+ std::vector<std::shared_ptr<Iterator>> _spilledFileIterators;
+
+ bool _done = false;
+};
+} // namespace mongo
diff --git a/src/mongo/db/index/column_store_sorter_test.cpp b/src/mongo/db/index/column_store_sorter_test.cpp
new file mode 100644
index 00000000000..4b407bc1211
--- /dev/null
+++ b/src/mongo/db/index/column_store_sorter_test.cpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/index/column_store_sorter.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+TEST(ColumnStoreSorter, SortTest) {
+ // Each entry of the top-level vector contains the field names of a sample document whose
+ // RecordId is the entry index. No field values are included in this sample data.
+ std::vector<std::vector<std::string>> sampleData = {{"foo", "bar", "foo.bar", "bar.bar"},
+ {"bar", "foo", "bar.bar"},
+ {"bar.bar", "foo.bar", "bar", "foo"},
+ {"bar", "foo.bar", "baz"},
+ {"foo.bar", "bar", "foo"}};
+
+ // The output of sorting the 'sampleData' field names by (Field name, RecordId).
+ std::vector<std::pair<StringData, int64_t>> sortedData = {{"bar", 0},
+ {"bar", 1},
+ {"bar", 2},
+ {"bar", 3},
+ {"bar", 4},
+ {"bar.bar", 0},
+ {"bar.bar", 1},
+ {"bar.bar", 2},
+ {"baz", 3},
+ {"foo", 0},
+ {"foo", 1},
+ {"foo", 2},
+ {"foo", 4},
+ {"foo.bar", 0},
+ {"foo.bar", 2},
+ {"foo.bar", 3},
+ {"foo.bar", 4}};
+
+ // ColumnStoreSorter uses the dbpath to store its spill files.
+ ON_BLOCK_EXIT(
+ [oldDbPath = storageGlobalParams.dbpath]() { storageGlobalParams.dbpath = oldDbPath; });
+ unittest::TempDir tempDir("columnStoreSorterTests");
+ storageGlobalParams.dbpath = tempDir.path();
+
+ // We test two sorters: one that can perform the sort in memory and one that is constrained so
+ // that it must spill to disk.
+
+ SorterFileStats statsForInMemorySorter;
+ auto inMemorySorter = std::make_unique<ColumnStoreSorter>(
+ 1000000 /* maxMemoryUsageBytes */, "dbName", &statsForInMemorySorter);
+
+ SorterFileStats statsForExternalSorter;
+ auto externalSorter = std::make_unique<ColumnStoreSorter>(
+ 500 /* maxMemoryUsageBytes */, "dbName", &statsForExternalSorter);
+
+ // First, load documents into each sorter.
+ for (size_t i = 0; i < sampleData.size(); ++i) {
+ for (auto&& fieldName : sampleData[i]) {
+ // Synthesize cell contents based on the field name and RecordId, so that we can test
+ // that cell contents travel with the (Field name, RecordId) key. The null-byte
+ // delimiter tests that the sorter correctly stores cells with internal null bytes.
+ std::string cell = str::stream() << fieldName << "\0" << i;
+ inMemorySorter->add(fieldName, RecordId(i), cell);
+ externalSorter->add(fieldName, RecordId(i), cell);
+ }
+ }
+
+ // Now sort, iterate the sorted output, and ensure it matches the expected output.
+ std::unique_ptr<ColumnStoreSorter::Iterator> sortedItInMemory(inMemorySorter->done());
+ std::unique_ptr<ColumnStoreSorter::Iterator> sortedItExternal(externalSorter->done());
+ for (auto&& expected : sortedData) {
+ std::string expectedCell = str::stream() << expected.first << "\0" << expected.second;
+
+ {
+ ASSERT(sortedItInMemory->more());
+ auto [columnKey, columnValue] = sortedItInMemory->next();
+
+ ASSERT_EQ(expected.first, columnKey.path);
+ ASSERT_EQ(RecordId(expected.second), columnKey.recordId);
+ ASSERT_EQ(expectedCell, columnValue.cell);
+ }
+
+ {
+ ASSERT(sortedItExternal->more());
+ auto [columnKey, columnValue] = sortedItExternal->next();
+
+ ASSERT_EQ(expected.first, columnKey.path);
+ ASSERT_EQ(RecordId(expected.second), columnKey.recordId);
+ ASSERT_EQ(expectedCell, columnValue.cell);
+ }
+ }
+ ASSERT(!sortedItInMemory->more());
+ ASSERT(!sortedItExternal->more());
+
+ sortedItInMemory.reset();
+ sortedItExternal.reset();
+
+ // Ensure that statistics for spills and file accesses are as expected.
+ ASSERT_EQ(0, inMemorySorter->numSpills());
+ ASSERT_EQ(4, externalSorter->numSpills());
+
+ ASSERT_EQ(0, statsForInMemorySorter.opened.load());
+ ASSERT_EQ(0, statsForInMemorySorter.closed.load());
+
+ // The external sorter has opened its spill file but will not close and delete it until it is
+ // destroyed.
+ ASSERT_EQ(1, statsForExternalSorter.opened.load());
+ ASSERT_EQ(0, statsForExternalSorter.closed.load());
+
+ inMemorySorter.reset();
+ externalSorter.reset();
+
+ ASSERT_EQ(0, statsForInMemorySorter.closed.load());
+ ASSERT_EQ(1, statsForExternalSorter.closed.load());
+}
+} // namespace mongo
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp
index 87501f61cd5..71d4c996aa7 100644
--- a/src/mongo/db/index/columns_access_method.cpp
+++ b/src/mongo/db/index/columns_access_method.cpp
@@ -35,9 +35,11 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/column_cell.h"
#include "mongo/db/index/column_key_generator.h"
+#include "mongo/db/index/column_store_sorter.h"
#include "mongo/logv2/log.h"
#include "mongo/util/progress_meter.h"
@@ -96,23 +98,29 @@ public:
private:
ColumnStoreAccessMethod* const _columnsAccess;
- // For now we'll just collect all the docs to insert before inserting them.
- // TODO SERVER-65481 Do an actual optimized bulk insert with sorting.
- std::list<BSONObj> _ownedObjects;
- std::vector<BsonRecord> _deferredInserts;
+
+ ColumnStoreSorter _sorter;
+ BufBuilder _cellBuilder;
+
int64_t _keysInserted = 0;
};
ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index,
size_t maxMemoryUsageBytes,
StringData dbName)
- : _columnsAccess(index) {}
+ : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) {
+ countNewBuildInStats();
+}
ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index,
size_t maxMemoryUsageBytes,
const IndexStateInfo& stateInfo,
StringData dbName)
- : _columnsAccess(index) {}
+ : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) {
+ countResumedBuildInStats();
+ // TODO SERVER-66925: Add this support.
+ tasserted(6548103, "No support for resuming interrupted columnstore index builds.");
+}
Status ColumnStoreAccessMethod::BulkBuilder::insert(
OperationContext* opCtx,
@@ -123,15 +131,20 @@ Status ColumnStoreAccessMethod::BulkBuilder::insert(
const InsertDeleteOptions& options,
const std::function<void()>& saveCursorBeforeWrite,
const std::function<void()>& restoreCursorAfterWrite) {
- // TODO SERVER-65481 Do an actual optimized bulk insert with sorting.
- _ownedObjects.push_back(obj.getOwned());
- BsonRecord record;
- record.docPtr = &_ownedObjects.back();
- record.id = rid;
- _deferredInserts.push_back(record);
+ column_keygen::visitCellsForInsert(
+ obj, [&](PathView path, const column_keygen::UnencodedCellView& cell) {
+ _cellBuilder.reset();
+ writeEncodedCell(cell, &_cellBuilder);
+ _sorter.add(path, rid, CellView(_cellBuilder.buf(), _cellBuilder.len()));
+
+ ++_keysInserted;
+ });
+
return Status::OK();
}
+// The "multikey" property does not apply to columnstore indexes, because the array key does not
+// represent a field in a document and
const MultikeyPaths& ColumnStoreAccessMethod::BulkBuilder::getMultikeyPaths() const {
const static MultikeyPaths empty;
return empty;
@@ -156,21 +169,73 @@ Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx,
int32_t yieldIterations,
const KeyHandlerFn& onDuplicateKeyInserted,
const RecordIdHandlerFn& onDuplicateRecord) {
- static constexpr size_t kBufferBlockSize = 1024;
- SharedBufferFragmentBuilder pooledBufferBuilder(kBufferBlockSize);
-
- WriteUnitOfWork wunit(opCtx);
- auto status = _columnsAccess->insert(opCtx,
- pooledBufferBuilder,
- collection,
- _deferredInserts,
- InsertDeleteOptions{},
- &_keysInserted);
- if (!status.isOK()) {
- return status;
+ Timer timer;
+
+ auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx);
+
+ static constexpr char message[] =
+ "Index Build: inserting keys from external sorter into columnstore index";
+ ProgressMeterHolder pm;
+ {
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
+ pm.set(
+ CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */));
+ }
+
+ auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx);
+
+ int64_t iterations = 0;
+ boost::optional<ColumnStoreSorter::Key> previousKey;
+ std::unique_ptr<ColumnStoreSorter::Iterator> it(_sorter.done());
+ while (it->more()) {
+ opCtx->checkForInterrupt();
+
+ auto columnStoreKeyWithValue = it->next();
+
+ // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing
+ // order.
+ if (kDebugBuild) {
+ if (previousKey && !(*previousKey < columnStoreKeyWithValue.first)) {
+ LOGV2_FATAL_NOTRACE(6548100,
+ "Out-of-order result from sorter for column store bulk loader",
+ "prevPathName"_attr = previousKey->path,
+ "prevRecordId"_attr = previousKey->recordId,
+ "nextPathName"_attr = columnStoreKeyWithValue.first.path,
+ "nextRecordId"_attr = columnStoreKeyWithValue.first.recordId,
+ "index"_attr = _columnsAccess->_descriptor->indexName());
+ }
+ previousKey = columnStoreKeyWithValue.first;
+ }
+
+ try {
+ writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] {
+ WriteUnitOfWork wunit(opCtx);
+ auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue;
+ builder->addCell(
+ columnStoreKey.path, columnStoreKey.recordId, columnStoreValue.cell);
+ wunit.commit();
+ });
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
+
+ // Yield locks every 'yieldIterations' key insertions.
+ if (++iterations % yieldIterations == 0) {
+ yield(opCtx, &collection, ns);
+ }
+
+ pm.hit();
}
- wunit.commit();
+ pm.finished();
+
+ LOGV2(6548101,
+ "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace "
+ "{namespace} in {duration} seconds",
+ "keysInserted"_attr = _keysInserted,
+ "index"_attr = _columnsAccess->_descriptor->indexName(),
+ logAttrs(ns),
+ "duration"_attr = Seconds(timer.seconds()));
return Status::OK();
}
@@ -185,7 +250,7 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
auto cursor = _store->newWriteCursor(opCtx);
column_keygen::visitCellsForInsert(
bsonRecords,
- [&](StringData path,
+ [&](PathView path,
const BsonRecord& rec,
const column_keygen::UnencodedCellView& cell) {
if (!rec.ts.isNull()) {
@@ -214,7 +279,7 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx,
int64_t* keysDeletedOut,
CheckRecordId checkRecordId) {
auto cursor = _store->newWriteCursor(opCtx);
- column_keygen::visitPathsForDelete(obj, [&](StringData path) {
+ column_keygen::visitPathsForDelete(obj, [&](PathView path) {
cursor->remove(path, rid);
inc(keysDeletedOut);
});
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 150558bfb69..0144e00d973 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -50,7 +50,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/timestamp_block.h"
-#include "mongo/db/sorter/sorter.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
@@ -128,12 +127,12 @@ bool isMultikeyFromPaths(const MultikeyPaths& multikeyPaths) {
[](const MultikeyComponents& components) { return !components.empty(); });
}
-SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) {
+SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats) {
return SortOptions()
.TempDir(storageGlobalParams.dbpath + "/_tmp")
.ExtSortAllowed()
.MaxMemoryUsageBytes(maxMemoryUsageBytes)
- .FileStats(&indexBulkBuilderSSS.sorterFileStats)
+ .FileStats(stats)
.DBName(dbName.toString());
}
@@ -610,6 +609,51 @@ Ident* SortedDataIndexAccessMethod::getIdentPtr() const {
return this->_newInterface.get();
}
+void IndexAccessMethod::BulkBuilder::countNewBuildInStats() {
+ indexBulkBuilderSSS.count.addAndFetch(1);
+}
+
+void IndexAccessMethod::BulkBuilder::countResumedBuildInStats() {
+ indexBulkBuilderSSS.count.addAndFetch(1);
+ indexBulkBuilderSSS.resumed.addAndFetch(1);
+}
+
+SorterFileStats* IndexAccessMethod::BulkBuilder::bulkBuilderFileStats() {
+ return &indexBulkBuilderSSS.sorterFileStats;
+}
+
+void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx,
+ const Yieldable* yieldable,
+ const NamespaceString& ns) {
+ // Releasing locks means a new snapshot should be acquired when restored.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ yieldable->yield();
+
+ auto locker = opCtx->lockState();
+ Locker::LockSnapshot snapshot;
+ if (locker->saveLockStateAndUnlock(&snapshot)) {
+
+ // Track the number of yields in CurOp.
+ CurOp::get(opCtx)->yielded();
+
+ auto failPointHang = [opCtx, &ns](FailPoint* fp) {
+ fp->executeIf(
+ [fp](auto&&) {
+ LOGV2(5180600, "Hanging index build during bulk load yield");
+ fp->pauseWhileSet();
+ },
+ [opCtx, &ns](auto&& config) {
+ return config.getStringField("namespace") == ns.ns();
+ });
+ };
+ failPointHang(&hangDuringIndexBuildBulkLoadYield);
+ failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond);
+
+ locker->restoreLockState(opCtx, snapshot);
+ }
+ yieldable->restore();
+}
+
class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder {
public:
using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>;
@@ -646,9 +690,6 @@ public:
IndexStateInfo persistDataForShutdown() final;
private:
- void _yield(OperationContext* opCtx,
- const Yieldable* yieldable,
- const NamespaceString& ns) const;
void _insertMultikeyMetadataKeysIntoSorter();
Sorter* _makeSorter(
@@ -689,7 +730,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc
size_t maxMemoryUsageBytes,
StringData dbName)
: _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) {
- indexBulkBuilderSSS.count.addAndFetch(1);
+ countNewBuildInStats();
}
SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam,
@@ -702,8 +743,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc
_keysInserted(stateInfo.getNumKeys().value_or(0)),
_isMultiKey(stateInfo.getIsMultikey()),
_indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) {
- indexBulkBuilderSSS.count.addAndFetch(1);
- indexBulkBuilderSSS.resumed.addAndFetch(1);
+ countResumedBuildInStats();
}
Status SortedDataIndexAccessMethod::BulkBuilderImpl::insert(
@@ -825,46 +865,16 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter(
StringData dbName,
boost::optional<StringData> fileName,
const boost::optional<std::vector<SorterRange>>& ranges) const {
- return fileName ? Sorter::makeFromExistingRanges(fileName->toString(),
- *ranges,
- makeSortOptions(maxMemoryUsageBytes, dbName),
- BtreeExternalSortComparison(),
- _makeSorterSettings())
- : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName),
- BtreeExternalSortComparison(),
- _makeSorterSettings());
-}
-
-void SortedDataIndexAccessMethod::BulkBuilderImpl::_yield(OperationContext* opCtx,
- const Yieldable* yieldable,
- const NamespaceString& ns) const {
- // Releasing locks means a new snapshot should be acquired when restored.
- opCtx->recoveryUnit()->abandonSnapshot();
- yieldable->yield();
-
- auto locker = opCtx->lockState();
- Locker::LockSnapshot snapshot;
- if (locker->saveLockStateAndUnlock(&snapshot)) {
-
- // Track the number of yields in CurOp.
- CurOp::get(opCtx)->yielded();
-
- auto failPointHang = [opCtx, &ns](FailPoint* fp) {
- fp->executeIf(
- [fp](auto&&) {
- LOGV2(5180600, "Hanging index build during bulk load yield");
- fp->pauseWhileSet();
- },
- [opCtx, &ns](auto&& config) {
- return config.getStringField("namespace") == ns.ns();
- });
- };
- failPointHang(&hangDuringIndexBuildBulkLoadYield);
- failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond);
-
- locker->restoreLockState(opCtx, snapshot);
- }
- yieldable->restore();
+ return fileName
+ ? Sorter::makeFromExistingRanges(
+ fileName->toString(),
+ *ranges,
+ makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings())
+ : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings());
}
Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit(
@@ -979,7 +989,7 @@ Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit(
// Starts yielding locks after the first non-zero 'yieldIterations' inserts.
if (yieldIterations && (i + 1) % yieldIterations == 0) {
- _yield(opCtx, &collection, ns);
+ yield(opCtx, &collection, ns);
}
// If we're here either it's a dup and we're cool with it or the addKey went just fine.
diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h
index 3718040885d..7e3a4dd504a 100644
--- a/src/mongo/db/index/index_access_method.h
+++ b/src/mongo/db/index/index_access_method.h
@@ -40,6 +40,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/sorter/sorter.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/db/yieldable.h"
@@ -219,6 +220,19 @@ public:
* Persists on disk the keys that have been inserted using this BulkBuilder.
*/
virtual IndexStateInfo persistDataForShutdown() = 0;
+
+ protected:
+ static void countNewBuildInStats();
+ static void countResumedBuildInStats();
+ static SorterFileStats* bulkBuilderFileStats();
+
+ /**
+ * Abandon the current snapshot and release then reacquire locks. Tests that target the
+ * behavior of bulk index builds that yield can use failpoints to stall this yield.
+ */
+ static void yield(OperationContext* opCtx,
+ const Yieldable* yieldable,
+ const NamespaceString& ns);
};
/**