diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2022-06-29 12:32:12 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-29 20:02:02 +0000 |
commit | cb9472afc30d32d1c18691d64899c1aa72cdc43d (patch) | |
tree | 924020b6f50ccf365926a2420c64d92e5497a34a /src/mongo/db/index | |
parent | f684488a8509f0a3764fa48221499b9cd508c0e1 (diff) | |
download | mongo-cb9472afc30d32d1c18691d64899c1aa72cdc43d.tar.gz |
SERVER-65481 Bulk shredding and loading for column store indexes
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r-- | src/mongo/db/index/SConscript | 43 | ||||
-rw-r--r-- | src/mongo/db/index/column_store_sorter.cpp | 297 | ||||
-rw-r--r-- | src/mongo/db/index/column_store_sorter.h | 134 | ||||
-rw-r--r-- | src/mongo/db/index/column_store_sorter_test.cpp | 140 | ||||
-rw-r--r-- | src/mongo/db/index/columns_access_method.cpp | 119 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.h | 14 |
7 files changed, 764 insertions, 93 deletions
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 9ccff04ff89..f71d78da857 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -84,9 +84,9 @@ env.Library( ], ) -serveronlyEnv = env.Clone() -serveronlyEnv.InjectThirdParty(libraries=['snappy']) -serveronlyEnv.Library( +iamEnv = env.Clone() +iamEnv.InjectThirdParty(libraries=['snappy']) +iamEnv.Library( target="index_access_method", source=[ 'duplicate_key_tracker.cpp', @@ -119,6 +119,22 @@ serveronlyEnv.Library( ], ) +iamEnv.Library( + target="column_store_index", + source=[ + 'column_cell.cpp', + "column_store_sorter.cpp", + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/sorter/sorter_idl', + '$BUILD_DIR/mongo/db/storage/encryption_hooks', + '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/s/is_mongos', + '$BUILD_DIR/third_party/shim_snappy', + ], +) + env.Library( target="index_access_methods", source=[ @@ -137,31 +153,26 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/concurrency/exception_util', + '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/fts/base_fts', '$BUILD_DIR/mongo/db/index_names', - 'columnar_index', + 'column_store_index', 'expression_params', 'key_generator', ], ) -env.Library( - target="columnar_index", - source=[ - 'column_cell.cpp', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', - ], -) - -env.CppUnitTest( +indexTestEnv = env.Clone() +indexTestEnv.InjectThirdParty(libraries=['snappy']) +indexTestEnv.CppUnitTest( target='db_index_test', source=[ '2d_key_generator_test.cpp', 'btree_key_generator_test.cpp', 'column_cell_test.cpp', 'column_key_generator_test.cpp', + 'column_store_sorter_test.cpp', 'hash_key_generator_test.cpp', 's2_key_generator_test.cpp', 's2_bucket_key_generator_test.cpp', @@ -177,7 +188,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/query/sort_pattern', '$BUILD_DIR/mongo/db/record_id_helpers', - 'columnar_index', + 'column_store_index', 'expression_params', 'key_generator', ], diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp new file mode 100644 index 00000000000..f708518a560 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter.cpp @@ -0,0 +1,297 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex + +#include "mongo/platform/basic.h" + +#include "mongo/db/index/column_store_sorter.h" + +namespace mongo { +struct ComparisonForPathAndRid { + int operator()(const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& left, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& right) const { + auto stringComparison = left.first.path.compare(right.first.path); + return (stringComparison != 0) ? stringComparison + : left.first.recordId.compare(right.first.recordId); + } +}; + +bool ColumnStoreSorter::Key::operator<(const Key& other) const { + if (auto cmp = path.compare(other.path); cmp != 0) { + return cmp < 0; + } else { + return recordId < other.recordId; + } +} + +void ColumnStoreSorter::Key::serializeForSorter(BufBuilder& buf) const { + buf.appendStr(path); + recordId.serializeToken(buf); +} + +ColumnStoreSorter::Key ColumnStoreSorter::Key::deserializeForSorter( + BufReader& buf, ColumnStoreSorter::Key::SorterDeserializeSettings) { + // Note: unlike function call parameters, the order of evaluation for initializer + // parameters is defined. + return {buf.readCStr(), RecordId::deserializeToken(buf)}; +} + +void ColumnStoreSorter::Value::serializeForSorter(BufBuilder& buf) const { + buf.appendNum(uint32_t(cell.size())); // Little-endian write + buf.appendBuf(cell.rawData(), cell.size()); +} + +ColumnStoreSorter::Value ColumnStoreSorter::Value::deserializeForSorter( + BufReader& buf, ColumnStoreSorter::Value::SorterDeserializeSettings) { + size_t cellSize = buf.read<LittleEndian<uint32_t>>(); + return Value{buf.readBytes(cellSize)}; +} + +ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes, + StringData dbName, + SorterFileStats* stats) + : _dbName(dbName.toString()), + _stats(stats), + _maxMemoryUsageBytes(maxMemoryUsageBytes), + _spillFile(std::make_shared<Sorter<Key, Value>::File>(pathForNewSpillFile(), _stats)) {} + +void ColumnStoreSorter::add(PathView path, RecordId recordId, CellView cellContents) { + auto& cellListAtPath = _dataByPath[path]; + if (cellListAtPath.empty()) { + // Track memory usage of this new path. + _memUsed += sizeof(StringMap<CellVector>::value_type) + path.size(); + } + + // The sorter assumes that RecordIds are added in sorted order. + tassert(6548102, + "Out-of-order record during columnar index build", + cellListAtPath.empty() || cellListAtPath.back().first < recordId); + + cellListAtPath.emplace_back(recordId, CellValue(cellContents.rawData(), cellContents.size())); + _memUsed += cellListAtPath.back().first.memUsage() + sizeof(CellValue) + + cellListAtPath.back().second.size(); + if (_memUsed > _maxMemoryUsageBytes) { + spill(); + } +} + +namespace { +std::string tempDir() { + return str::stream() << storageGlobalParams.dbpath << "/_tmp"; +} +} // namespace + +SortOptions ColumnStoreSorter::makeSortOptions(const std::string& dbName, SorterFileStats* stats) { + return SortOptions().TempDir(tempDir()).ExtSortAllowed().FileStats(stats).DBName(dbName); +} + +std::string ColumnStoreSorter::pathForNewSpillFile() { + static AtomicWord<unsigned> fileNameCounter; + static const uint64_t randomSuffix = static_cast<uint64_t>(SecureRandom().nextInt64()); + return str::stream() << tempDir() << "/ext-sort-column-store-index." + << fileNameCounter.fetchAndAdd(1) << "-" << randomSuffix; +} + +void ColumnStoreSorter::spill() { + if (_dataByPath.empty()) { + return; + } + ++_numSpills; + + SortedFileWriter<Key, Value> writer(makeSortOptions(_dbName, _stats), _spillFile, {}); + + // Cells loaded into memory are sorted by record id but not yet sorted by path. We perform that + // sort now, so that we can output cells sorted by (path, rid) for later consumption by our + // standard external merge implementation: SortIteratorInterface<Key, Value>::merge(). + std::vector<const StringMap<CellVector>::value_type*> sortedPathList; + sortedPathList.reserve(_dataByPath.size()); + for (auto&& pathWithCellVector : _dataByPath) { + sortedPathList.push_back(&pathWithCellVector); + } + std::sort(sortedPathList.begin(), sortedPathList.end(), [](auto left, auto right) { + return left->first < right->first; + }); + + size_t currentChunkSize = 0; + for (auto&& pathWithCellVector : sortedPathList) { + auto& [path, cellVector] = *pathWithCellVector; + + size_t cellVectorSize = std::accumulate( + cellVector.begin(), cellVector.end(), 0, [& path = path](size_t sum, auto& ridAndCell) { + return sum + path.size() + ridAndCell.first.memUsage() + ridAndCell.second.size(); + }); + + // Add (path, rid, cell) records to the spill file so that the first cell in each contiguous + // run of cells with the same path lives in its own chunk. E.g.: + // Path1, rid1, Cell contents + // CHUNK BOUNDARY + // Path1, rid2, Cell Contents + // ... + // Path1, ridN, Cell Contents + // CHUNK BOUNDARY + // Path2, rid1, Cell Contents + // CHUNK BOUNDARY + // Path2, rid2, Cell Contents + // ... + // + // During merging, file readers will hold one chunk from each spill file in memory, so + // optimizing chunk size can reduce memory usage during the merge. Merging for a column + // store index is a special case: because the sorter is loaded in RecordId order, all the + // cells from this spill are guaranteed to merge together, with no interleaving cells from + // other spill files. + // + // This layout will result in a merger that holds a single cell from each leg of the merge + // representing the first in a large contiguous range. Once that cell gets picked, the merge + // will consume all chunks at that path in that file before moving on to the next file or + // the next path. + // + // To avoid the pathological case where runs are very short, we don't force a chunk boundary + // when a run of cells would not result in a chunk greater than 1024 bytes. + const size_t kShortChunkThreshold = 1024; + bool writeBoundaryAfterAdd = (currentChunkSize + cellVectorSize) > kShortChunkThreshold; + if (writeBoundaryAfterAdd) { + // Add the chunk boundary just before the first cell with this path name. + writer.writeChunk(); + currentChunkSize = 0; + } + for (auto ridAndCell : cellVector) { + const auto& cell = ridAndCell.second; + currentChunkSize += path.size() + ridAndCell.first.memUsage() + cell.size(); + writer.addAlreadySorted(Key{path, ridAndCell.first}, + Value{CellView{cell.c_str(), cell.size()}}); + + if (writeBoundaryAfterAdd) { + // Add the chunk boundary just after the first cell with this path name, giving it + // its own chunk. + writer.writeChunk(); + writeBoundaryAfterAdd = false; + currentChunkSize = 0; + } + } + } + + _spilledFileIterators.emplace_back(writer.done()); + + _dataByPath.clear(); + _memUsed = 0; +} + +ColumnStoreSorter::Iterator* ColumnStoreSorter::done() { + invariant(!std::exchange(_done, true)); + + if (_spilledFileIterators.size() == 0) { + return inMemoryIterator(); + } + + spill(); + return SortIteratorInterface<Key, Value>::merge( + _spilledFileIterators, makeSortOptions(_dbName, _stats), ComparisonForPathAndRid()); +} + +/** + * This iterator "unwinds" our path -> CellVector mapping into sorted tuples of (path name, + * recordId, cell), with the path name and recordId bundled into a single "key." The unwinding + * proceeds using an outer iterator over the paths and an inner iterator for the current CellVector. + * The outer iterator uses a separate path list that gets sorted when the 'InMemoryIterator' is + * initialized. The inner iterator directly traverses the CellVector, which is already sorted. + */ +class ColumnStoreSorter::InMemoryIterator final : public ColumnStoreSorter::Iterator { +public: + InMemoryIterator(const StringMap<CellVector>& dataByPath) { + // Cells loaded into memory are sorted by record id but now yet by path. Sorting by path + // finalizes the sort algorithm. + _sortedPathList.reserve(dataByPath.size()); + for (const auto& pathWithCellVector : dataByPath) { + _sortedPathList.push_back(&pathWithCellVector); + } + std::sort(_sortedPathList.begin(), _sortedPathList.end(), [](auto left, auto right) { + return left->first < right->first; + }); + + _pathIt = _sortedPathList.begin(); + if (_pathIt != _sortedPathList.end()) { + _cellVectorIt = (*_pathIt)->second.begin(); + } + } + + bool more() final { + return _pathIt != _sortedPathList.end(); + } + + std::pair<Key, Value> next() final { + Key key{(*_pathIt)->first, _cellVectorIt->first}; + + Value contents{_cellVectorIt->second}; + + ++_cellVectorIt; + while (_cellVectorIt == (*_pathIt)->second.end() && ++_pathIt != _sortedPathList.end()) { + _cellVectorIt = (*_pathIt)->second.begin(); + } + + return {key, contents}; + } + + const std::pair<Key, Value>& current() final { + tasserted(ErrorCodes::NotImplemented, + "current() not implemented for ColumnStoreSorter::Iterator"); + } + + void openSource() final {} + + void closeSource() final {} + +private: + std::vector<const StringMap<CellVector>::value_type*> _sortedPathList; + + decltype(_sortedPathList)::const_iterator _pathIt; + CellVector::const_iterator _cellVectorIt; +}; + +ColumnStoreSorter::Iterator* ColumnStoreSorter::inMemoryIterator() const { + return new InMemoryIterator(_dataByPath); +} +} // namespace mongo + +namespace { +/** + * A 'nextFilename()' is required for the below "sorter.cpp" include to compile, but this file does + * not use any of the 'Sorter' classes that call it. + */ +std::string nextFileName() { + MONGO_UNREACHABLE; +} +} // namespace + +#undef MONGO_LOGV2_DEFAULT_COMPONENT +#include "mongo/db/sorter/sorter.cpp" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex +MONGO_CREATE_SORTER(mongo::ColumnStoreSorter::Key, + mongo::ColumnStoreSorter::Value, + mongo::ComparisonForPathAndRid); diff --git a/src/mongo/db/index/column_store_sorter.h b/src/mongo/db/index/column_store_sorter.h new file mode 100644 index 00000000000..833213b3646 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter.h @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/record_id.h" +#include "mongo/db/sorter/sorter.h" +#include "mongo/db/storage/column_store.h" + +namespace mongo { +/** + * Performs the organizing and sorting steps of a column store index bulk build, presenting an + * interface similar to the 'Sorter' interface. The client can add cells with the 'add()' method + * until none remain and then call 'done()' to get an iterator that returns the cells in sorted + * order. + * + * This class assumes that inputs are _already sorted_ by RecordId. Adding out-of-orders cells will + * result in undefined behavior. + * + * Internally, this class maintains a hash table that maps each path to a sorted list of + * (RecordId, CellValue) pairs. Because we use a hash table and not a sorted data structure (like + * std::map), we need to sort the list of paths when finalizing the output or when writing a spill + * file. The total number of cells inserted into this mapping is potentially very large, making it + * preferable to defer the cost of sorting to the end in order to avoid the cost of a binary tree + * traversal for each inserted cell. + */ +class ColumnStoreSorter { +public: + ColumnStoreSorter(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats); + + void add(PathView path, RecordId recordId, CellView cellContents); + + size_t numSpills() const { + return _numSpills; + } + + struct Key { + PathView path; + RecordId recordId; + + struct SorterDeserializeSettings {}; + + bool operator<(const Key& other) const; + void serializeForSorter(BufBuilder& buf) const; + + // Assumes that the source buffer will remain valid for the lifetime of the returned + // ColumnStoreSorter::Key object. + static Key deserializeForSorter(BufReader& buf, SorterDeserializeSettings); + + size_t memUsageForSorter() const { + return sizeof(path) + path.size() + recordId.memUsage(); + } + + Key getOwned() const { + MONGO_UNREACHABLE; + } + }; + + struct Value { + CellView cell; + + struct SorterDeserializeSettings {}; + + void serializeForSorter(BufBuilder& buf) const; + static Value deserializeForSorter(BufReader& buf, SorterDeserializeSettings); + + size_t memUsageForSorter() const { + return sizeof(cell) + cell.size(); + } + + Value getOwned() const { + MONGO_UNREACHABLE; + } + }; + + using Iterator = SortIteratorInterface<Key, Value>; + + Iterator* done(); + +private: + class InMemoryIterator; + + static SortOptions makeSortOptions(const std::string& dbName, SorterFileStats* stats); + static std::string pathForNewSpillFile(); + + void spill(); + + Iterator* inMemoryIterator() const; + + const std::string _dbName; + SorterFileStats* _stats; // Unowned + + const size_t _maxMemoryUsageBytes; + size_t _memUsed = 0; + size_t _numSpills = 0; + + /** + * Mapping from path name to the sorted list of (RecordId, Cell) pairs. + */ + using CellVector = std::vector<std::pair<RecordId, CellValue>>; + StringMap<CellVector> _dataByPath; + + std::shared_ptr<Sorter<Key, Value>::File> _spillFile; + std::vector<std::shared_ptr<Iterator>> _spilledFileIterators; + + bool _done = false; +}; +} // namespace mongo diff --git a/src/mongo/db/index/column_store_sorter_test.cpp b/src/mongo/db/index/column_store_sorter_test.cpp new file mode 100644 index 00000000000..4b407bc1211 --- /dev/null +++ b/src/mongo/db/index/column_store_sorter_test.cpp @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/index/column_store_sorter.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +TEST(ColumnStoreSorter, SortTest) { + // Each entry of the top-level vector contains the field names of a sample document whose + // RecordId is the entry index. No field values are included in this sample data. + std::vector<std::vector<std::string>> sampleData = {{"foo", "bar", "foo.bar", "bar.bar"}, + {"bar", "foo", "bar.bar"}, + {"bar.bar", "foo.bar", "bar", "foo"}, + {"bar", "foo.bar", "baz"}, + {"foo.bar", "bar", "foo"}}; + + // The output of sorting the 'sampleData' field names by (Field name, RecordId). + std::vector<std::pair<StringData, int64_t>> sortedData = {{"bar", 0}, + {"bar", 1}, + {"bar", 2}, + {"bar", 3}, + {"bar", 4}, + {"bar.bar", 0}, + {"bar.bar", 1}, + {"bar.bar", 2}, + {"baz", 3}, + {"foo", 0}, + {"foo", 1}, + {"foo", 2}, + {"foo", 4}, + {"foo.bar", 0}, + {"foo.bar", 2}, + {"foo.bar", 3}, + {"foo.bar", 4}}; + + // ColumnStoreSorter uses the dbpath to store its spill files. + ON_BLOCK_EXIT( + [oldDbPath = storageGlobalParams.dbpath]() { storageGlobalParams.dbpath = oldDbPath; }); + unittest::TempDir tempDir("columnStoreSorterTests"); + storageGlobalParams.dbpath = tempDir.path(); + + // We test two sorters: one that can perform the sort in memory and one that is constrained so + // that it must spill to disk. + + SorterFileStats statsForInMemorySorter; + auto inMemorySorter = std::make_unique<ColumnStoreSorter>( + 1000000 /* maxMemoryUsageBytes */, "dbName", &statsForInMemorySorter); + + SorterFileStats statsForExternalSorter; + auto externalSorter = std::make_unique<ColumnStoreSorter>( + 500 /* maxMemoryUsageBytes */, "dbName", &statsForExternalSorter); + + // First, load documents into each sorter. + for (size_t i = 0; i < sampleData.size(); ++i) { + for (auto&& fieldName : sampleData[i]) { + // Synthesize cell contents based on the field name and RecordId, so that we can test + // that cell contents travel with the (Field name, RecordId) key. The null-byte + // delimiter tests that the sorter correctly stores cells with internal null bytes. + std::string cell = str::stream() << fieldName << "\0" << i; + inMemorySorter->add(fieldName, RecordId(i), cell); + externalSorter->add(fieldName, RecordId(i), cell); + } + } + + // Now sort, iterate the sorted output, and ensure it matches the expected output. + std::unique_ptr<ColumnStoreSorter::Iterator> sortedItInMemory(inMemorySorter->done()); + std::unique_ptr<ColumnStoreSorter::Iterator> sortedItExternal(externalSorter->done()); + for (auto&& expected : sortedData) { + std::string expectedCell = str::stream() << expected.first << "\0" << expected.second; + + { + ASSERT(sortedItInMemory->more()); + auto [columnKey, columnValue] = sortedItInMemory->next(); + + ASSERT_EQ(expected.first, columnKey.path); + ASSERT_EQ(RecordId(expected.second), columnKey.recordId); + ASSERT_EQ(expectedCell, columnValue.cell); + } + + { + ASSERT(sortedItExternal->more()); + auto [columnKey, columnValue] = sortedItExternal->next(); + + ASSERT_EQ(expected.first, columnKey.path); + ASSERT_EQ(RecordId(expected.second), columnKey.recordId); + ASSERT_EQ(expectedCell, columnValue.cell); + } + } + ASSERT(!sortedItInMemory->more()); + ASSERT(!sortedItExternal->more()); + + sortedItInMemory.reset(); + sortedItExternal.reset(); + + // Ensure that statistics for spills and file accesses are as expected. + ASSERT_EQ(0, inMemorySorter->numSpills()); + ASSERT_EQ(4, externalSorter->numSpills()); + + ASSERT_EQ(0, statsForInMemorySorter.opened.load()); + ASSERT_EQ(0, statsForInMemorySorter.closed.load()); + + // The external sorter has opened its spill file but will not close and delete it until it is + // destroyed. + ASSERT_EQ(1, statsForExternalSorter.opened.load()); + ASSERT_EQ(0, statsForExternalSorter.closed.load()); + + inMemorySorter.reset(); + externalSorter.reset(); + + ASSERT_EQ(0, statsForInMemorySorter.closed.load()); + ASSERT_EQ(1, statsForExternalSorter.closed.load()); +} +} // namespace mongo diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index 87501f61cd5..71d4c996aa7 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -35,9 +35,11 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" +#include "mongo/db/index/column_store_sorter.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" @@ -96,23 +98,29 @@ public: private: ColumnStoreAccessMethod* const _columnsAccess; - // For now we'll just collect all the docs to insert before inserting them. - // TODO SERVER-65481 Do an actual optimized bulk insert with sorting. - std::list<BSONObj> _ownedObjects; - std::vector<BsonRecord> _deferredInserts; + + ColumnStoreSorter _sorter; + BufBuilder _cellBuilder; + int64_t _keysInserted = 0; }; ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName) - : _columnsAccess(index) {} + : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) { + countNewBuildInStats(); +} ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName) - : _columnsAccess(index) {} + : _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()) { + countResumedBuildInStats(); + // TODO SERVER-66925: Add this support. + tasserted(6548103, "No support for resuming interrupted columnstore index builds."); +} Status ColumnStoreAccessMethod::BulkBuilder::insert( OperationContext* opCtx, @@ -123,15 +131,20 @@ Status ColumnStoreAccessMethod::BulkBuilder::insert( const InsertDeleteOptions& options, const std::function<void()>& saveCursorBeforeWrite, const std::function<void()>& restoreCursorAfterWrite) { - // TODO SERVER-65481 Do an actual optimized bulk insert with sorting. - _ownedObjects.push_back(obj.getOwned()); - BsonRecord record; - record.docPtr = &_ownedObjects.back(); - record.id = rid; - _deferredInserts.push_back(record); + column_keygen::visitCellsForInsert( + obj, [&](PathView path, const column_keygen::UnencodedCellView& cell) { + _cellBuilder.reset(); + writeEncodedCell(cell, &_cellBuilder); + _sorter.add(path, rid, CellView(_cellBuilder.buf(), _cellBuilder.len())); + + ++_keysInserted; + }); + return Status::OK(); } +// The "multikey" property does not apply to columnstore indexes, because the array key does not +// represent a field in a document and const MultikeyPaths& ColumnStoreAccessMethod::BulkBuilder::getMultikeyPaths() const { const static MultikeyPaths empty; return empty; @@ -156,21 +169,73 @@ Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx, int32_t yieldIterations, const KeyHandlerFn& onDuplicateKeyInserted, const RecordIdHandlerFn& onDuplicateRecord) { - static constexpr size_t kBufferBlockSize = 1024; - SharedBufferFragmentBuilder pooledBufferBuilder(kBufferBlockSize); - - WriteUnitOfWork wunit(opCtx); - auto status = _columnsAccess->insert(opCtx, - pooledBufferBuilder, - collection, - _deferredInserts, - InsertDeleteOptions{}, - &_keysInserted); - if (!status.isOK()) { - return status; + Timer timer; + + auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx); + + static constexpr char message[] = + "Index Build: inserting keys from external sorter into columnstore index"; + ProgressMeterHolder pm; + { + stdx::unique_lock<Client> lk(*opCtx->getClient()); + pm.set( + CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */)); + } + + auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx); + + int64_t iterations = 0; + boost::optional<ColumnStoreSorter::Key> previousKey; + std::unique_ptr<ColumnStoreSorter::Iterator> it(_sorter.done()); + while (it->more()) { + opCtx->checkForInterrupt(); + + auto columnStoreKeyWithValue = it->next(); + + // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing + // order. + if (kDebugBuild) { + if (previousKey && !(*previousKey < columnStoreKeyWithValue.first)) { + LOGV2_FATAL_NOTRACE(6548100, + "Out-of-order result from sorter for column store bulk loader", + "prevPathName"_attr = previousKey->path, + "prevRecordId"_attr = previousKey->recordId, + "nextPathName"_attr = columnStoreKeyWithValue.first.path, + "nextRecordId"_attr = columnStoreKeyWithValue.first.recordId, + "index"_attr = _columnsAccess->_descriptor->indexName()); + } + previousKey = columnStoreKeyWithValue.first; + } + + try { + writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] { + WriteUnitOfWork wunit(opCtx); + auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue; + builder->addCell( + columnStoreKey.path, columnStoreKey.recordId, columnStoreValue.cell); + wunit.commit(); + }); + } catch (DBException& e) { + return e.toStatus(); + } + + // Yield locks every 'yieldIterations' key insertions. + if (++iterations % yieldIterations == 0) { + yield(opCtx, &collection, ns); + } + + pm.hit(); } - wunit.commit(); + pm.finished(); + + LOGV2(6548101, + "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace " + "{namespace} in {duration} seconds", + "keysInserted"_attr = _keysInserted, + "index"_attr = _columnsAccess->_descriptor->indexName(), + logAttrs(ns), + "duration"_attr = Seconds(timer.seconds())); return Status::OK(); } @@ -185,7 +250,7 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, auto cursor = _store->newWriteCursor(opCtx); column_keygen::visitCellsForInsert( bsonRecords, - [&](StringData path, + [&](PathView path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { if (!rec.ts.isNull()) { @@ -214,7 +279,7 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { auto cursor = _store->newWriteCursor(opCtx); - column_keygen::visitPathsForDelete(obj, [&](StringData path) { + column_keygen::visitPathsForDelete(obj, [&](PathView path) { cursor->remove(path, rid); inc(keysDeletedOut); }); diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 150558bfb69..0144e00d973 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -50,7 +50,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/timestamp_block.h" -#include "mongo/db/sorter/sorter.h" #include "mongo/db/storage/execution_context.h" #include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" @@ -128,12 +127,12 @@ bool isMultikeyFromPaths(const MultikeyPaths& multikeyPaths) { [](const MultikeyComponents& components) { return !components.empty(); }); } -SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) { +SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName, SorterFileStats* stats) { return SortOptions() .TempDir(storageGlobalParams.dbpath + "/_tmp") .ExtSortAllowed() .MaxMemoryUsageBytes(maxMemoryUsageBytes) - .FileStats(&indexBulkBuilderSSS.sorterFileStats) + .FileStats(stats) .DBName(dbName.toString()); } @@ -610,6 +609,51 @@ Ident* SortedDataIndexAccessMethod::getIdentPtr() const { return this->_newInterface.get(); } +void IndexAccessMethod::BulkBuilder::countNewBuildInStats() { + indexBulkBuilderSSS.count.addAndFetch(1); +} + +void IndexAccessMethod::BulkBuilder::countResumedBuildInStats() { + indexBulkBuilderSSS.count.addAndFetch(1); + indexBulkBuilderSSS.resumed.addAndFetch(1); +} + +SorterFileStats* IndexAccessMethod::BulkBuilder::bulkBuilderFileStats() { + return &indexBulkBuilderSSS.sorterFileStats; +} + +void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx, + const Yieldable* yieldable, + const NamespaceString& ns) { + // Releasing locks means a new snapshot should be acquired when restored. + opCtx->recoveryUnit()->abandonSnapshot(); + yieldable->yield(); + + auto locker = opCtx->lockState(); + Locker::LockSnapshot snapshot; + if (locker->saveLockStateAndUnlock(&snapshot)) { + + // Track the number of yields in CurOp. + CurOp::get(opCtx)->yielded(); + + auto failPointHang = [opCtx, &ns](FailPoint* fp) { + fp->executeIf( + [fp](auto&&) { + LOGV2(5180600, "Hanging index build during bulk load yield"); + fp->pauseWhileSet(); + }, + [opCtx, &ns](auto&& config) { + return config.getStringField("namespace") == ns.ns(); + }); + }; + failPointHang(&hangDuringIndexBuildBulkLoadYield); + failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond); + + locker->restoreLockState(opCtx, snapshot); + } + yieldable->restore(); +} + class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder { public: using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>; @@ -646,9 +690,6 @@ public: IndexStateInfo persistDataForShutdown() final; private: - void _yield(OperationContext* opCtx, - const Yieldable* yieldable, - const NamespaceString& ns) const; void _insertMultikeyMetadataKeysIntoSorter(); Sorter* _makeSorter( @@ -689,7 +730,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc size_t maxMemoryUsageBytes, StringData dbName) : _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) { - indexBulkBuilderSSS.count.addAndFetch(1); + countNewBuildInStats(); } SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam, @@ -702,8 +743,7 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc _keysInserted(stateInfo.getNumKeys().value_or(0)), _isMultiKey(stateInfo.getIsMultikey()), _indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) { - indexBulkBuilderSSS.count.addAndFetch(1); - indexBulkBuilderSSS.resumed.addAndFetch(1); + countResumedBuildInStats(); } Status SortedDataIndexAccessMethod::BulkBuilderImpl::insert( @@ -825,46 +865,16 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter( StringData dbName, boost::optional<StringData> fileName, const boost::optional<std::vector<SorterRange>>& ranges) const { - return fileName ? Sorter::makeFromExistingRanges(fileName->toString(), - *ranges, - makeSortOptions(maxMemoryUsageBytes, dbName), - BtreeExternalSortComparison(), - _makeSorterSettings()) - : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName), - BtreeExternalSortComparison(), - _makeSorterSettings()); -} - -void SortedDataIndexAccessMethod::BulkBuilderImpl::_yield(OperationContext* opCtx, - const Yieldable* yieldable, - const NamespaceString& ns) const { - // Releasing locks means a new snapshot should be acquired when restored. - opCtx->recoveryUnit()->abandonSnapshot(); - yieldable->yield(); - - auto locker = opCtx->lockState(); - Locker::LockSnapshot snapshot; - if (locker->saveLockStateAndUnlock(&snapshot)) { - - // Track the number of yields in CurOp. - CurOp::get(opCtx)->yielded(); - - auto failPointHang = [opCtx, &ns](FailPoint* fp) { - fp->executeIf( - [fp](auto&&) { - LOGV2(5180600, "Hanging index build during bulk load yield"); - fp->pauseWhileSet(); - }, - [opCtx, &ns](auto&& config) { - return config.getStringField("namespace") == ns.ns(); - }); - }; - failPointHang(&hangDuringIndexBuildBulkLoadYield); - failPointHang(&hangDuringIndexBuildBulkLoadYieldSecond); - - locker->restoreLockState(opCtx, snapshot); - } - yieldable->restore(); + return fileName + ? Sorter::makeFromExistingRanges( + fileName->toString(), + *ranges, + makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()), + BtreeExternalSortComparison(), + _makeSorterSettings()) + : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName, bulkBuilderFileStats()), + BtreeExternalSortComparison(), + _makeSorterSettings()); } Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit( @@ -979,7 +989,7 @@ Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit( // Starts yielding locks after the first non-zero 'yieldIterations' inserts. if (yieldIterations && (i + 1) % yieldIterations == 0) { - _yield(opCtx, &collection, ns); + yield(opCtx, &collection, ns); } // If we're here either it's a dup and we're cool with it or the addKey went just fine. diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 3718040885d..7e3a4dd504a 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -40,6 +40,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/record_id.h" +#include "mongo/db/sorter/sorter.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/db/yieldable.h" @@ -219,6 +220,19 @@ public: * Persists on disk the keys that have been inserted using this BulkBuilder. */ virtual IndexStateInfo persistDataForShutdown() = 0; + + protected: + static void countNewBuildInStats(); + static void countResumedBuildInStats(); + static SorterFileStats* bulkBuilderFileStats(); + + /** + * Abandon the current snapshot and release then reacquire locks. Tests that target the + * behavior of bulk index builds that yield can use failpoints to stall this yield. + */ + static void yield(OperationContext* opCtx, + const Yieldable* yieldable, + const NamespaceString& ns); }; /** |