diff options
author | Erin Zhu <erin.zhu@mongodb.com> | 2022-08-19 13:45:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-19 14:33:39 +0000 |
commit | 7d7518830818fd84a13c3a408830c3c5ab01117c (patch) | |
tree | 385031d5aaecc58eea4627c9582e56b3bc85a12f /src/mongo/db/index | |
parent | 83ffa4f1c95242972cc0022b8872aa7486bbb95c (diff) | |
download | mongo-7d7518830818fd84a13c3a408830c3c5ab01117c.tar.gz |
SERVER-68941 Unify common functionality in BulkBuilder implementations
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r-- | src/mongo/db/index/bulk_builder_common.h | 192 | ||||
-rw-r--r-- | src/mongo/db/index/column_store_sorter.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/index/columns_access_method.cpp | 164 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 208 |
4 files changed, 351 insertions, 217 deletions
diff --git a/src/mongo/db/index/bulk_builder_common.h b/src/mongo/db/index/bulk_builder_common.h new file mode 100644 index 00000000000..160b962ebd5 --- /dev/null +++ b/src/mongo/db/index/bulk_builder_common.h @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/curop.h" +#include "mongo/db/index/index_access_method.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/record_id.h" +#include "mongo/logv2/log.h" +#include "mongo/util/progress_meter.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +namespace mongo { + +extern FailPoint hangIndexBuildDuringBulkLoadPhase; +extern FailPoint hangIndexBuildDuringBulkLoadPhaseSecond; + +/* BulkBuilderCommon uses CRTP to implement a generic loop for draining keys from a bulk builder. + * Child classes must implement these functions. + * + * Return inserter that will insert keys into the index + * Also must initialize _ns to the namespace string. + * Inserter setUpBulkInserter(OperationContext* opCtx, bool dupsAllowed) + * + * Return iterator it of the sorted keys for the type of the child class + * Iterator finalizeSort() + * + * Check that current key comes after previous key in sort order. + * void debugEnsureSorted(const Key& data) + * + * Return true if data is a duplicate, false otherwise. If duplicate checks don't apply, return + * false. + * bool duplicateCheck(OperationContext* opCtx, const Key& data, bool dupsAllowed, + * const RecordIdHandlerFn& onDuplicateRecord) + * + * Output key to write cursor. + * void insertKey(Inserter& inserter, const Key& data) + * + * Perform finalizing steps for key. + * Status keyCommited(const KeyHandlerFn& onDuplicateKeyInserted, const Key& data, bool isDup) + */ +template <class T> +class BulkBuilderCommon : public IndexAccessMethod::BulkBuilder { + +public: + using KeyHandlerFn = std::function<Status(const KeyString::Value&)>; + using RecordIdHandlerFn = std::function<Status(const RecordId&)>; + BulkBuilderCommon(int64_t numKeys, std::string message, std::string indexName) + : _keysInserted(numKeys), _progressMessage(message), _indexName(indexName){}; + + Status commit(OperationContext* opCtx, + const CollectionPtr& collection, + bool dupsAllowed, + int32_t yieldIterations, + const KeyHandlerFn& onDuplicateKeyInserted, + const RecordIdHandlerFn& onDuplicateRecord) { + + Timer timer; + + auto builder = static_cast<T*>(this)->setUpBulkInserter(opCtx, dupsAllowed); + auto it = static_cast<T*>(this)->finalizeSort(); + + ProgressMeterHolder pm; + { + stdx::unique_lock<Client> lk(*opCtx->getClient()); + pm.set(CurOp::get(opCtx)->setProgress_inlock( + _progressMessage, _keysInserted, 3 /* secondsBetween */)); + } // namespace mongo + + int64_t iterations = 0; + while (it->more()) { + opCtx->checkForInterrupt(); + + auto failPointHang = [opCtx, iterations, &indexName = _indexName](FailPoint* fp) { + fp->executeIf( + [fp, opCtx, iterations, &indexName](const BSONObj& data) { + LOGV2(4924400, + "Hanging index build during bulk load phase", + "iteration"_attr = iterations, + "index"_attr = indexName); + + fp->pauseWhileSet(opCtx); + }, + [iterations, &indexName](const BSONObj& data) { + auto indexNames = data.getObjectField("indexNames"); + return iterations == data["iteration"].numberLong() && + std::any_of(indexNames.begin(), + indexNames.end(), + [&indexName](const auto& elem) { + return indexName == elem.String(); + }); + }); + }; + failPointHang(&hangIndexBuildDuringBulkLoadPhase); + failPointHang(&hangIndexBuildDuringBulkLoadPhaseSecond); + + auto data = it->next(); + if (kDebugBuild) { + static_cast<T*>(this)->debugEnsureSorted(data); + } + + // Before attempting to insert, perform a duplicate key check. + bool isDup; + try { + isDup = static_cast<T*>(this)->duplicateCheck( + opCtx, data, dupsAllowed, onDuplicateRecord); + } catch (DBException& e) { + return e.toStatus(); + } + + if (isDup && !dupsAllowed) { + continue; + } + + + try { + writeConflictRetry(opCtx, "addingKey", _ns.ns(), [&] { + WriteUnitOfWork wunit(opCtx); + static_cast<T*>(this)->insertKey(builder, data); + wunit.commit(); + }); + } catch (DBException& e) { + Status status = e.toStatus(); + // Duplicates are checked before inserting. + invariant(status.code() != ErrorCodes::DuplicateKey); + return status; + } + + Status status = + static_cast<T*>(this)->keyCommitted(onDuplicateKeyInserted, data, isDup); + if (!status.isOK()) + return status; + + // Yield locks every 'yieldIterations' key insertions. + if (yieldIterations > 0 && (++iterations % yieldIterations == 0)) { + yield(opCtx, &collection, _ns); + } + + // If we're here either it's a dup and we're cool with it or the addKey went just + // fine. + pm.hit(); + } + + pm.finished(); + + LOGV2(20685, + "Index build: inserted {bulk_getKeysInserted} keys from external sorter into " + "index in " + "{timer_seconds} seconds", + "Index build: inserted keys from external sorter into index", + logAttrs(_ns), + "index"_attr = _indexName, + "keysInserted"_attr = _keysInserted, + "duration"_attr = Milliseconds(Seconds(timer.seconds()))); + return Status::OK(); + } + +protected: + int64_t _keysInserted = 0; + std::string _progressMessage; + std::string _indexName; + NamespaceString _ns; +}; +}; // namespace mongo + +#undef MONGO_LOGV2_DEFAULT_COMPONENT diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp index 50285f00d74..7e0eb141227 100644 --- a/src/mongo/db/index/column_store_sorter.cpp +++ b/src/mongo/db/index/column_store_sorter.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex -#include "mongo/platform/basic.h" - #include "mongo/db/index/column_store_sorter.h" #include <boost/filesystem/operations.hpp> @@ -117,6 +115,7 @@ ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes, _dbName, range.getChecksum()); }); + this->_stats.setSpilledRanges(_spilledFileIterators.size()); } void ColumnStoreSorter::add(PathView path, RowId rowId, CellView cellContents) { @@ -251,6 +250,7 @@ ColumnStoreSorter::Iterator* ColumnStoreSorter::done() { } spill(); + return SortIteratorInterface<Key, Value>::merge( _spilledFileIterators, makeSortOptions(_dbName, _fileStats), ComparisonForPathAndRid()); } diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index d1ef1f73b62..b2f9a07a512 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -37,6 +37,7 @@ #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" +#include "mongo/db/index/bulk_builder_common.h" #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" #include "mongo/db/index/column_store_sorter.h" @@ -70,7 +71,8 @@ ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice, } } -class ColumnStoreAccessMethod::BulkBuilder final : public IndexAccessMethod::BulkBuilder { +class ColumnStoreAccessMethod::BulkBuilder final + : public BulkBuilderCommon<ColumnStoreAccessMethod::BulkBuilder> { public: BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName); @@ -79,7 +81,6 @@ public: const IndexStateInfo& stateInfo, StringData dbName); - // // Generic APIs // @@ -99,14 +100,24 @@ public: int64_t getKeysInserted() const; - mongo::IndexStateInfo persistDataForShutdown() final; + IndexStateInfo persistDataForShutdown() final; + std::unique_ptr<ColumnStoreSorter::Iterator> finalizeSort(); - Status commit(OperationContext* opCtx, - const CollectionPtr& collection, - bool dupsAllowed, - int32_t yieldIterations, - const KeyHandlerFn& onDuplicateKeyInserted, - const RecordIdHandlerFn& onDuplicateRecord) final; + std::unique_ptr<ColumnStore::BulkBuilder> setUpBulkInserter(OperationContext* opCtx, + bool dupsAllowed); + void debugEnsureSorted(const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data); + + bool duplicateCheck(OperationContext* opCtx, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data, + bool dupsAllowed, + const RecordIdHandlerFn& onDuplicateRecord); + + void insertKey(std::unique_ptr<ColumnStore::BulkBuilder>& inserter, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data); + + Status keyCommitted(const KeyHandlerFn& onDuplicateKeyInserted, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data, + bool isDup); private: ColumnStoreAccessMethod* const _columnsAccess; @@ -114,13 +125,16 @@ private: ColumnStoreSorter _sorter; BufBuilder _cellBuilder; - int64_t _keysInserted = 0; + boost::optional<std::pair<PathValue, RowId>> _previousPathAndRowId; }; ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName) - : _columnsAccess(index), + : BulkBuilderCommon(0, + "Index Build: inserting keys from external sorter into columnstore index", + index->_descriptor->indexName()), + _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats(), bulkBuilderTracker()) { countNewBuildInStats(); } @@ -129,13 +143,16 @@ ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName) - : _columnsAccess(index), + : BulkBuilderCommon(stateInfo.getNumKeys().value_or(0), + "Index Build: inserting keys from external sorter into columnstore index", + index->_descriptor->indexName()), + _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats(), stateInfo.getFileName()->toString(), - *stateInfo.getRanges()), - _keysInserted(stateInfo.getNumKeys().value_or(0)) { + *stateInfo.getRanges(), + bulkBuilderTracker()) { countResumedBuildInStats(); } @@ -187,84 +204,59 @@ IndexStateInfo ColumnStoreAccessMethod::BulkBuilder::persistDataForShutdown() { return stateInfo; } -Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx, - const CollectionPtr& collection, - bool dupsAllowed, - int32_t yieldIterations, - const KeyHandlerFn& onDuplicateKeyInserted, - const RecordIdHandlerFn& onDuplicateRecord) { - Timer timer; - - auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx); - - static constexpr char message[] = - "Index Build: inserting keys from external sorter into columnstore index"; - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set( - CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */)); - } +std::unique_ptr<ColumnStoreSorter::Iterator> ColumnStoreAccessMethod::BulkBuilder::finalizeSort() { + return std::unique_ptr<ColumnStoreSorter::Iterator>(_sorter.done()); +} - auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx); - - int64_t iterations = 0; - boost::optional<std::pair<PathValue, RowId>> previousPathAndRowId; - std::unique_ptr<ColumnStoreSorter::Iterator> it(_sorter.done()); - while (it->more()) { - opCtx->checkForInterrupt(); - - auto columnStoreKeyWithValue = it->next(); - const auto& key = columnStoreKeyWithValue.first; - - // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing - // order. - if (kDebugBuild) { - if (previousPathAndRowId && - !(ColumnStoreSorter::Key{previousPathAndRowId->first, - previousPathAndRowId->second} < key)) { - LOGV2_FATAL_NOTRACE(6548100, - "Out-of-order result from sorter for column store bulk loader", - "prevPathName"_attr = previousPathAndRowId->first, - "prevRecordId"_attr = previousPathAndRowId->second, - "nextPathName"_attr = key.path, - "nextRecordId"_attr = key.rowId, - "index"_attr = _columnsAccess->_descriptor->indexName()); - } +std::unique_ptr<ColumnStore::BulkBuilder> ColumnStoreAccessMethod::BulkBuilder::setUpBulkInserter( + OperationContext* opCtx, bool dupsAllowed) { + _ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx); + return _columnsAccess->_store->makeBulkBuilder(opCtx); +} - // It is not safe to safe to directly store the 'key' object, because it includes a - // PathView, which may be invalid the next time we read it. - previousPathAndRowId.emplace(key.path, key.rowId); - } - - try { - writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] { - WriteUnitOfWork wunit(opCtx); - auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue; - builder->addCell(columnStoreKey.path, columnStoreKey.rowId, columnStoreValue.cell); - wunit.commit(); - }); - } catch (DBException& e) { - return e.toStatus(); - } +void ColumnStoreAccessMethod::BulkBuilder::debugEnsureSorted( + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data) { + // In debug mode only, assert that keys are retrieved from the sorter in strictly + // increasing order. + const auto& key = data.first; + if (_previousPathAndRowId && + !(ColumnStoreSorter::Key{_previousPathAndRowId->first, _previousPathAndRowId->second} < + key)) { + LOGV2_FATAL_NOTRACE(6548100, + "Out-of-order result from sorter for column store bulk loader", + "prevPathName"_attr = _previousPathAndRowId->first, + "prevRecordId"_attr = _previousPathAndRowId->second, + "nextPathName"_attr = key.path, + "nextRecordId"_attr = key.rowId, + "index"_attr = _indexName); + } + // It is not safe to safe to directly store the 'key' object, because it includes a + // PathView, which may be invalid the next time we read it. + _previousPathAndRowId.emplace(key.path, key.rowId); +} - // Yield locks every 'yieldIterations' key insertions. - if (yieldIterations > 0 && (++iterations % yieldIterations == 0)) { - yield(opCtx, &collection, ns); - } +bool ColumnStoreAccessMethod::BulkBuilder::duplicateCheck( + OperationContext* opCtx, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data, + bool dupsAllowed, + const RecordIdHandlerFn& onDuplicateRecord) { + // no duplicates in a columnstore index + return false; +} - pm.hit(); - } +void ColumnStoreAccessMethod::BulkBuilder::insertKey( + std::unique_ptr<ColumnStore::BulkBuilder>& inserter, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data) { - pm.finished(); + auto& [columnStoreKey, columnStoreValue] = data; + inserter->addCell(columnStoreKey.path, columnStoreKey.rowId, columnStoreValue.cell); +} - LOGV2(6548101, - "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace " - "{namespace} in {duration} seconds", - "keysInserted"_attr = _keysInserted, - "index"_attr = _columnsAccess->_descriptor->indexName(), - logAttrs(ns), - "duration"_attr = Seconds(timer.seconds())); +Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted( + const KeyHandlerFn& onDuplicateKeyInserted, + const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data, + bool isDup) { + // nothing to do for columnstore indexes return Status::OK(); } diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index bd12a9add26..f7bba190c2e 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -43,6 +43,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" +#include "mongo/db/index/bulk_builder_common.h" #include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/jsobj.h" @@ -665,7 +666,8 @@ void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx, yieldable->restore(); } -class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder { +class SortedDataIndexAccessMethod::BulkBuilderImpl final + : public BulkBuilderCommon<SortedDataIndexAccessMethod::BulkBuilderImpl> { public: using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>; @@ -687,19 +689,30 @@ public: const std::function<void()>& saveCursorBeforeWrite, const std::function<void()>& restoreCursorAfterWrite) final; - Status commit(OperationContext* opCtx, - const CollectionPtr& collection, - bool dupsAllowed, - int32_t yieldIterations, - const KeyHandlerFn& onDuplicateKeyInserted, - const RecordIdHandlerFn& onDuplicateRecord) final; - const MultikeyPaths& getMultikeyPaths() const final; bool isMultikey() const final; IndexStateInfo persistDataForShutdown() final; + std::unique_ptr<Sorter::Iterator> finalizeSort(); + + std::unique_ptr<SortedDataBuilderInterface> setUpBulkInserter(OperationContext* opCtx, + bool dupsAllowed); + + void debugEnsureSorted(const Sorter::Data& data); + + bool duplicateCheck(OperationContext* opCtx, + const Sorter::Data& data, + bool dupsAllowed, + const RecordIdHandlerFn& onDuplicateRecord); + + void insertKey(std::unique_ptr<SortedDataBuilderInterface>& inserter, const Sorter::Data& data); + + Status keyCommitted(const KeyHandlerFn& onDuplicateKeyInserted, + const Sorter::Data& data, + bool isDup); + private: void _insertMultikeyMetadataKeysIntoSorter(); @@ -713,7 +726,8 @@ private: SortedDataIndexAccessMethod* _iam; std::unique_ptr<Sorter> _sorter; - int64_t _keysInserted = 0; + + KeyString::Value _previousKey; // Set to true if any document added to the BulkBuilder causes the index to become multikey. bool _isMultiKey = false; @@ -740,7 +754,11 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> SortedDataIndexAccessMethod::ini SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam, size_t maxMemoryUsageBytes, StringData dbName) - : _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) { + : BulkBuilderCommon(0, + "Index Build: inserting keys from external sorter into index", + iam->_descriptor->indexName()), + _iam(iam), + _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) { countNewBuildInStats(); } @@ -748,10 +766,12 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName) - : _iam(iam), + : BulkBuilderCommon(stateInfo.getNumKeys().value_or(0), + "Index Build: inserting keys from external sorter into index", + iam->_descriptor->indexName()), + _iam(iam), _sorter( _makeSorter(maxMemoryUsageBytes, dbName, stateInfo.getFileName(), stateInfo.getRanges())), - _keysInserted(stateInfo.getNumKeys().value_or(0)), _isMultiKey(stateInfo.getIsMultikey()), _indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) { countResumedBuildInStats(); @@ -888,135 +908,65 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter( _makeSorterSettings()); } -Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit( - OperationContext* opCtx, - const CollectionPtr& collection, - bool dupsAllowed, - int32_t yieldIterations, - const KeyHandlerFn& onDuplicateKeyInserted, - const RecordIdHandlerFn& onDuplicateRecord) { - - Timer timer; - - const auto descriptor = _iam->_descriptor; - auto ns = _iam->_indexCatalogEntry->getNSSFromCatalog(opCtx); - +std::unique_ptr<mongo::Sorter<KeyString::Value, mongo::NullValue>::Iterator> +SortedDataIndexAccessMethod::BulkBuilderImpl::finalizeSort() { _insertMultikeyMetadataKeysIntoSorter(); - std::unique_ptr<Sorter::Iterator> it(_sorter->done()); - - static constexpr char message[] = "Index Build: inserting keys from external sorter into index"; - ProgressMeterHolder pm; - { - stdx::unique_lock<Client> lk(*opCtx->getClient()); - pm.set( - CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */)); - } - - auto builder = _iam->getSortedDataInterface()->makeBulkBuilder(opCtx, dupsAllowed); - - KeyString::Value previousKey; - - for (int64_t i = 0; it->more(); i++) { - opCtx->checkForInterrupt(); - - auto failPointHang = [opCtx, i, &indexName = descriptor->indexName()](FailPoint* fp) { - fp->executeIf( - [fp, opCtx, i, &indexName](const BSONObj& data) { - LOGV2(4924400, - "Hanging index build during bulk load phase", - "iteration"_attr = i, - "index"_attr = indexName); + return std::unique_ptr<Sorter::Iterator>(_sorter->done()); +} - fp->pauseWhileSet(opCtx); - }, - [i, &indexName](const BSONObj& data) { - auto indexNames = data.getObjectField("indexNames"); - return i == data["iteration"].numberLong() && - std::any_of(indexNames.begin(), - indexNames.end(), - [&indexName](const auto& elem) { - return indexName == elem.String(); - }); - }); - }; - failPointHang(&hangIndexBuildDuringBulkLoadPhase); - failPointHang(&hangIndexBuildDuringBulkLoadPhaseSecond); - - // Get the next datum and add it to the builder. - Sorter::Data data = it->next(); - - // Assert that keys are retrieved from the sorter in non-decreasing order, but only in debug - // builds since this check can be expensive. - int cmpData; - if (descriptor->unique()) { - cmpData = (_iam->getSortedDataInterface()->rsKeyFormat() == KeyFormat::Long) - ? data.first.compareWithoutRecordIdLong(previousKey) - : data.first.compareWithoutRecordIdStr(previousKey); - } +std::unique_ptr<SortedDataBuilderInterface> +SortedDataIndexAccessMethod::BulkBuilderImpl::setUpBulkInserter(OperationContext* opCtx, + bool dupsAllowed) { + _ns = _iam->_indexCatalogEntry->getNSSFromCatalog(opCtx); + return _iam->getSortedDataInterface()->makeBulkBuilder(opCtx, dupsAllowed); +} - if (kDebugBuild && data.first.compare(previousKey) < 0) { - LOGV2_FATAL_NOTRACE( - 31171, - "Expected the next key to be greater than or equal to the previous key", - "nextKey"_attr = data.first.toString(), - "previousKey"_attr = previousKey.toString(), - "index"_attr = descriptor->indexName()); - } - // Before attempting to insert, perform a duplicate key check. - bool isDup = (descriptor->unique()) ? (cmpData == 0) : false; - if (isDup && !dupsAllowed) { - Status status = _iam->_handleDuplicateKey(opCtx, data.first, onDuplicateRecord); - if (!status.isOK()) { - return status; - } - continue; - } +void SortedDataIndexAccessMethod::BulkBuilderImpl::debugEnsureSorted(const Sorter::Data& data) { + if (data.first.compare(_previousKey) < 0) { + LOGV2_FATAL_NOTRACE(31171, + "Expected the next key to be greater than or equal to the previous key", + "nextKey"_attr = data.first.toString(), + "previousKey"_attr = _previousKey.toString(), + "index"_attr = _indexName); + } +} - Status status = writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] { - WriteUnitOfWork wunit(opCtx); - Status status = builder->addKey(data.first); - if (!status.isOK()) { - return status; - } +bool SortedDataIndexAccessMethod::BulkBuilderImpl::duplicateCheck( + OperationContext* opCtx, + const Sorter::Data& data, + bool dupsAllowed, + const RecordIdHandlerFn& onDuplicateRecord) { - wunit.commit(); - return Status::OK(); - }); + auto descriptor = _iam->_descriptor; - if (!status.isOK()) { - // Duplicates are checked before inserting. - invariant(status.code() != ErrorCodes::DuplicateKey); - return status; - } + bool isDup = false; + if (descriptor->unique()) { + int cmpData = (_iam->getSortedDataInterface()->rsKeyFormat() == KeyFormat::Long) + ? data.first.compareWithoutRecordIdLong(_previousKey) + : data.first.compareWithoutRecordIdStr(_previousKey); + isDup = (cmpData == 0); + } - previousKey = data.first; + // Before attempting to insert, perform a duplicate key check. + if (isDup && !dupsAllowed) { + uassertStatusOK(_iam->_handleDuplicateKey(opCtx, data.first, onDuplicateRecord)); + } + return isDup; +} - if (isDup) { - status = onDuplicateKeyInserted(data.first); - if (!status.isOK()) - return status; - } +void SortedDataIndexAccessMethod::BulkBuilderImpl::insertKey( + std::unique_ptr<SortedDataBuilderInterface>& inserter, const Sorter::Data& data) { + uassertStatusOK(inserter->addKey(data.first)); +} - // Starts yielding locks after the first non-zero 'yieldIterations' inserts. - if (yieldIterations && (i + 1) % yieldIterations == 0) { - yield(opCtx, &collection, ns); - } +Status SortedDataIndexAccessMethod::BulkBuilderImpl::keyCommitted( + const KeyHandlerFn& onDuplicateKeyInserted, const Sorter::Data& data, bool isDup) { + _previousKey = data.first; - // If we're here either it's a dup and we're cool with it or the addKey went just fine. - pm.hit(); + if (isDup) { + return onDuplicateKeyInserted(data.first); } - - pm.finished(); - - LOGV2(20685, - "Index build: inserted {bulk_getKeysInserted} keys from external sorter into index in " - "{timer_seconds} seconds", - "Index build: inserted keys from external sorter into index", - logAttrs(ns), - "index"_attr = descriptor->indexName(), - "keysInserted"_attr = _keysInserted, - "duration"_attr = Milliseconds(Seconds(timer.seconds()))); return Status::OK(); } |