summaryrefslogtreecommitdiff
path: root/src/mongo/db/index
diff options
context:
space:
mode:
authorErin Zhu <erin.zhu@mongodb.com>2022-08-19 13:45:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-19 14:33:39 +0000
commit7d7518830818fd84a13c3a408830c3c5ab01117c (patch)
tree385031d5aaecc58eea4627c9582e56b3bc85a12f /src/mongo/db/index
parent83ffa4f1c95242972cc0022b8872aa7486bbb95c (diff)
downloadmongo-7d7518830818fd84a13c3a408830c3c5ab01117c.tar.gz
SERVER-68941 Unify common functionality in BulkBuilder implementations
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r--src/mongo/db/index/bulk_builder_common.h192
-rw-r--r--src/mongo/db/index/column_store_sorter.cpp4
-rw-r--r--src/mongo/db/index/columns_access_method.cpp164
-rw-r--r--src/mongo/db/index/index_access_method.cpp208
4 files changed, 351 insertions, 217 deletions
diff --git a/src/mongo/db/index/bulk_builder_common.h b/src/mongo/db/index/bulk_builder_common.h
new file mode 100644
index 00000000000..160b962ebd5
--- /dev/null
+++ b/src/mongo/db/index/bulk_builder_common.h
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/curop.h"
+#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/record_id.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/progress_meter.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+namespace mongo {
+
+extern FailPoint hangIndexBuildDuringBulkLoadPhase;
+extern FailPoint hangIndexBuildDuringBulkLoadPhaseSecond;
+
+/* BulkBuilderCommon uses CRTP to implement a generic loop for draining keys from a bulk builder.
+ * Child classes must implement these functions.
+ *
+ * Return inserter that will insert keys into the index
+ * Also must initialize _ns to the namespace string.
+ * Inserter setUpBulkInserter(OperationContext* opCtx, bool dupsAllowed)
+ *
+ * Return iterator it of the sorted keys for the type of the child class
+ * Iterator finalizeSort()
+ *
+ * Check that current key comes after previous key in sort order.
+ * void debugEnsureSorted(const Key& data)
+ *
+ * Return true if data is a duplicate, false otherwise. If duplicate checks don't apply, return
+ * false.
+ * bool duplicateCheck(OperationContext* opCtx, const Key& data, bool dupsAllowed,
+ * const RecordIdHandlerFn& onDuplicateRecord)
+ *
+ * Output key to write cursor.
+ * void insertKey(Inserter& inserter, const Key& data)
+ *
+ * Perform finalizing steps for key.
+ * Status keyCommited(const KeyHandlerFn& onDuplicateKeyInserted, const Key& data, bool isDup)
+ */
+template <class T>
+class BulkBuilderCommon : public IndexAccessMethod::BulkBuilder {
+
+public:
+ using KeyHandlerFn = std::function<Status(const KeyString::Value&)>;
+ using RecordIdHandlerFn = std::function<Status(const RecordId&)>;
+ BulkBuilderCommon(int64_t numKeys, std::string message, std::string indexName)
+ : _keysInserted(numKeys), _progressMessage(message), _indexName(indexName){};
+
+ Status commit(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ bool dupsAllowed,
+ int32_t yieldIterations,
+ const KeyHandlerFn& onDuplicateKeyInserted,
+ const RecordIdHandlerFn& onDuplicateRecord) {
+
+ Timer timer;
+
+ auto builder = static_cast<T*>(this)->setUpBulkInserter(opCtx, dupsAllowed);
+ auto it = static_cast<T*>(this)->finalizeSort();
+
+ ProgressMeterHolder pm;
+ {
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
+ pm.set(CurOp::get(opCtx)->setProgress_inlock(
+ _progressMessage, _keysInserted, 3 /* secondsBetween */));
+ } // namespace mongo
+
+ int64_t iterations = 0;
+ while (it->more()) {
+ opCtx->checkForInterrupt();
+
+ auto failPointHang = [opCtx, iterations, &indexName = _indexName](FailPoint* fp) {
+ fp->executeIf(
+ [fp, opCtx, iterations, &indexName](const BSONObj& data) {
+ LOGV2(4924400,
+ "Hanging index build during bulk load phase",
+ "iteration"_attr = iterations,
+ "index"_attr = indexName);
+
+ fp->pauseWhileSet(opCtx);
+ },
+ [iterations, &indexName](const BSONObj& data) {
+ auto indexNames = data.getObjectField("indexNames");
+ return iterations == data["iteration"].numberLong() &&
+ std::any_of(indexNames.begin(),
+ indexNames.end(),
+ [&indexName](const auto& elem) {
+ return indexName == elem.String();
+ });
+ });
+ };
+ failPointHang(&hangIndexBuildDuringBulkLoadPhase);
+ failPointHang(&hangIndexBuildDuringBulkLoadPhaseSecond);
+
+ auto data = it->next();
+ if (kDebugBuild) {
+ static_cast<T*>(this)->debugEnsureSorted(data);
+ }
+
+ // Before attempting to insert, perform a duplicate key check.
+ bool isDup;
+ try {
+ isDup = static_cast<T*>(this)->duplicateCheck(
+ opCtx, data, dupsAllowed, onDuplicateRecord);
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
+
+ if (isDup && !dupsAllowed) {
+ continue;
+ }
+
+
+ try {
+ writeConflictRetry(opCtx, "addingKey", _ns.ns(), [&] {
+ WriteUnitOfWork wunit(opCtx);
+ static_cast<T*>(this)->insertKey(builder, data);
+ wunit.commit();
+ });
+ } catch (DBException& e) {
+ Status status = e.toStatus();
+ // Duplicates are checked before inserting.
+ invariant(status.code() != ErrorCodes::DuplicateKey);
+ return status;
+ }
+
+ Status status =
+ static_cast<T*>(this)->keyCommitted(onDuplicateKeyInserted, data, isDup);
+ if (!status.isOK())
+ return status;
+
+ // Yield locks every 'yieldIterations' key insertions.
+ if (yieldIterations > 0 && (++iterations % yieldIterations == 0)) {
+ yield(opCtx, &collection, _ns);
+ }
+
+ // If we're here either it's a dup and we're cool with it or the addKey went just
+ // fine.
+ pm.hit();
+ }
+
+ pm.finished();
+
+ LOGV2(20685,
+ "Index build: inserted {bulk_getKeysInserted} keys from external sorter into "
+ "index in "
+ "{timer_seconds} seconds",
+ "Index build: inserted keys from external sorter into index",
+ logAttrs(_ns),
+ "index"_attr = _indexName,
+ "keysInserted"_attr = _keysInserted,
+ "duration"_attr = Milliseconds(Seconds(timer.seconds())));
+ return Status::OK();
+ }
+
+protected:
+ int64_t _keysInserted = 0;
+ std::string _progressMessage;
+ std::string _indexName;
+ NamespaceString _ns;
+};
+}; // namespace mongo
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT
diff --git a/src/mongo/db/index/column_store_sorter.cpp b/src/mongo/db/index/column_store_sorter.cpp
index 50285f00d74..7e0eb141227 100644
--- a/src/mongo/db/index/column_store_sorter.cpp
+++ b/src/mongo/db/index/column_store_sorter.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex
-#include "mongo/platform/basic.h"
-
#include "mongo/db/index/column_store_sorter.h"
#include <boost/filesystem/operations.hpp>
@@ -117,6 +115,7 @@ ColumnStoreSorter::ColumnStoreSorter(size_t maxMemoryUsageBytes,
_dbName,
range.getChecksum());
});
+ this->_stats.setSpilledRanges(_spilledFileIterators.size());
}
void ColumnStoreSorter::add(PathView path, RowId rowId, CellView cellContents) {
@@ -251,6 +250,7 @@ ColumnStoreSorter::Iterator* ColumnStoreSorter::done() {
}
spill();
+
return SortIteratorInterface<Key, Value>::merge(
_spilledFileIterators, makeSortOptions(_dbName, _fileStats), ComparisonForPathAndRid());
}
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp
index d1ef1f73b62..b2f9a07a512 100644
--- a/src/mongo/db/index/columns_access_method.cpp
+++ b/src/mongo/db/index/columns_access_method.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/curop.h"
+#include "mongo/db/index/bulk_builder_common.h"
#include "mongo/db/index/column_cell.h"
#include "mongo/db/index/column_key_generator.h"
#include "mongo/db/index/column_store_sorter.h"
@@ -70,7 +71,8 @@ ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice,
}
}
-class ColumnStoreAccessMethod::BulkBuilder final : public IndexAccessMethod::BulkBuilder {
+class ColumnStoreAccessMethod::BulkBuilder final
+ : public BulkBuilderCommon<ColumnStoreAccessMethod::BulkBuilder> {
public:
BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName);
@@ -79,7 +81,6 @@ public:
const IndexStateInfo& stateInfo,
StringData dbName);
-
//
// Generic APIs
//
@@ -99,14 +100,24 @@ public:
int64_t getKeysInserted() const;
- mongo::IndexStateInfo persistDataForShutdown() final;
+ IndexStateInfo persistDataForShutdown() final;
+ std::unique_ptr<ColumnStoreSorter::Iterator> finalizeSort();
- Status commit(OperationContext* opCtx,
- const CollectionPtr& collection,
- bool dupsAllowed,
- int32_t yieldIterations,
- const KeyHandlerFn& onDuplicateKeyInserted,
- const RecordIdHandlerFn& onDuplicateRecord) final;
+ std::unique_ptr<ColumnStore::BulkBuilder> setUpBulkInserter(OperationContext* opCtx,
+ bool dupsAllowed);
+ void debugEnsureSorted(const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data);
+
+ bool duplicateCheck(OperationContext* opCtx,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data,
+ bool dupsAllowed,
+ const RecordIdHandlerFn& onDuplicateRecord);
+
+ void insertKey(std::unique_ptr<ColumnStore::BulkBuilder>& inserter,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data);
+
+ Status keyCommitted(const KeyHandlerFn& onDuplicateKeyInserted,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data,
+ bool isDup);
private:
ColumnStoreAccessMethod* const _columnsAccess;
@@ -114,13 +125,16 @@ private:
ColumnStoreSorter _sorter;
BufBuilder _cellBuilder;
- int64_t _keysInserted = 0;
+ boost::optional<std::pair<PathValue, RowId>> _previousPathAndRowId;
};
ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index,
size_t maxMemoryUsageBytes,
StringData dbName)
- : _columnsAccess(index),
+ : BulkBuilderCommon(0,
+ "Index Build: inserting keys from external sorter into columnstore index",
+ index->_descriptor->indexName()),
+ _columnsAccess(index),
_sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats(), bulkBuilderTracker()) {
countNewBuildInStats();
}
@@ -129,13 +143,16 @@ ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index
size_t maxMemoryUsageBytes,
const IndexStateInfo& stateInfo,
StringData dbName)
- : _columnsAccess(index),
+ : BulkBuilderCommon(stateInfo.getNumKeys().value_or(0),
+ "Index Build: inserting keys from external sorter into columnstore index",
+ index->_descriptor->indexName()),
+ _columnsAccess(index),
_sorter(maxMemoryUsageBytes,
dbName,
bulkBuilderFileStats(),
stateInfo.getFileName()->toString(),
- *stateInfo.getRanges()),
- _keysInserted(stateInfo.getNumKeys().value_or(0)) {
+ *stateInfo.getRanges(),
+ bulkBuilderTracker()) {
countResumedBuildInStats();
}
@@ -187,84 +204,59 @@ IndexStateInfo ColumnStoreAccessMethod::BulkBuilder::persistDataForShutdown() {
return stateInfo;
}
-Status ColumnStoreAccessMethod::BulkBuilder::commit(OperationContext* opCtx,
- const CollectionPtr& collection,
- bool dupsAllowed,
- int32_t yieldIterations,
- const KeyHandlerFn& onDuplicateKeyInserted,
- const RecordIdHandlerFn& onDuplicateRecord) {
- Timer timer;
-
- auto ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx);
-
- static constexpr char message[] =
- "Index Build: inserting keys from external sorter into columnstore index";
- ProgressMeterHolder pm;
- {
- stdx::unique_lock<Client> lk(*opCtx->getClient());
- pm.set(
- CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */));
- }
+std::unique_ptr<ColumnStoreSorter::Iterator> ColumnStoreAccessMethod::BulkBuilder::finalizeSort() {
+ return std::unique_ptr<ColumnStoreSorter::Iterator>(_sorter.done());
+}
- auto builder = _columnsAccess->_store->makeBulkBuilder(opCtx);
-
- int64_t iterations = 0;
- boost::optional<std::pair<PathValue, RowId>> previousPathAndRowId;
- std::unique_ptr<ColumnStoreSorter::Iterator> it(_sorter.done());
- while (it->more()) {
- opCtx->checkForInterrupt();
-
- auto columnStoreKeyWithValue = it->next();
- const auto& key = columnStoreKeyWithValue.first;
-
- // In debug mode only, assert that keys are retrieved from the sorter in strictly increasing
- // order.
- if (kDebugBuild) {
- if (previousPathAndRowId &&
- !(ColumnStoreSorter::Key{previousPathAndRowId->first,
- previousPathAndRowId->second} < key)) {
- LOGV2_FATAL_NOTRACE(6548100,
- "Out-of-order result from sorter for column store bulk loader",
- "prevPathName"_attr = previousPathAndRowId->first,
- "prevRecordId"_attr = previousPathAndRowId->second,
- "nextPathName"_attr = key.path,
- "nextRecordId"_attr = key.rowId,
- "index"_attr = _columnsAccess->_descriptor->indexName());
- }
+std::unique_ptr<ColumnStore::BulkBuilder> ColumnStoreAccessMethod::BulkBuilder::setUpBulkInserter(
+ OperationContext* opCtx, bool dupsAllowed) {
+ _ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx);
+ return _columnsAccess->_store->makeBulkBuilder(opCtx);
+}
- // It is not safe to safe to directly store the 'key' object, because it includes a
- // PathView, which may be invalid the next time we read it.
- previousPathAndRowId.emplace(key.path, key.rowId);
- }
-
- try {
- writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] {
- WriteUnitOfWork wunit(opCtx);
- auto& [columnStoreKey, columnStoreValue] = columnStoreKeyWithValue;
- builder->addCell(columnStoreKey.path, columnStoreKey.rowId, columnStoreValue.cell);
- wunit.commit();
- });
- } catch (DBException& e) {
- return e.toStatus();
- }
+void ColumnStoreAccessMethod::BulkBuilder::debugEnsureSorted(
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data) {
+ // In debug mode only, assert that keys are retrieved from the sorter in strictly
+ // increasing order.
+ const auto& key = data.first;
+ if (_previousPathAndRowId &&
+ !(ColumnStoreSorter::Key{_previousPathAndRowId->first, _previousPathAndRowId->second} <
+ key)) {
+ LOGV2_FATAL_NOTRACE(6548100,
+ "Out-of-order result from sorter for column store bulk loader",
+ "prevPathName"_attr = _previousPathAndRowId->first,
+ "prevRecordId"_attr = _previousPathAndRowId->second,
+ "nextPathName"_attr = key.path,
+ "nextRecordId"_attr = key.rowId,
+ "index"_attr = _indexName);
+ }
+ // It is not safe to safe to directly store the 'key' object, because it includes a
+ // PathView, which may be invalid the next time we read it.
+ _previousPathAndRowId.emplace(key.path, key.rowId);
+}
- // Yield locks every 'yieldIterations' key insertions.
- if (yieldIterations > 0 && (++iterations % yieldIterations == 0)) {
- yield(opCtx, &collection, ns);
- }
+bool ColumnStoreAccessMethod::BulkBuilder::duplicateCheck(
+ OperationContext* opCtx,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data,
+ bool dupsAllowed,
+ const RecordIdHandlerFn& onDuplicateRecord) {
+ // no duplicates in a columnstore index
+ return false;
+}
- pm.hit();
- }
+void ColumnStoreAccessMethod::BulkBuilder::insertKey(
+ std::unique_ptr<ColumnStore::BulkBuilder>& inserter,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data) {
- pm.finished();
+ auto& [columnStoreKey, columnStoreValue] = data;
+ inserter->addCell(columnStoreKey.path, columnStoreKey.rowId, columnStoreValue.cell);
+}
- LOGV2(6548101,
- "Index build: bulk sorter inserted {keysInserted} keys into index {index} on namespace "
- "{namespace} in {duration} seconds",
- "keysInserted"_attr = _keysInserted,
- "index"_attr = _columnsAccess->_descriptor->indexName(),
- logAttrs(ns),
- "duration"_attr = Seconds(timer.seconds()));
+Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted(
+ const KeyHandlerFn& onDuplicateKeyInserted,
+ const std::pair<ColumnStoreSorter::Key, ColumnStoreSorter::Value>& data,
+ bool isDup) {
+ // nothing to do for columnstore indexes
return Status::OK();
}
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index bd12a9add26..f7bba190c2e 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/curop.h"
+#include "mongo/db/index/bulk_builder_common.h"
#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/jsobj.h"
@@ -665,7 +666,8 @@ void IndexAccessMethod::BulkBuilder::yield(OperationContext* opCtx,
yieldable->restore();
}
-class SortedDataIndexAccessMethod::BulkBuilderImpl final : public IndexAccessMethod::BulkBuilder {
+class SortedDataIndexAccessMethod::BulkBuilderImpl final
+ : public BulkBuilderCommon<SortedDataIndexAccessMethod::BulkBuilderImpl> {
public:
using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>;
@@ -687,19 +689,30 @@ public:
const std::function<void()>& saveCursorBeforeWrite,
const std::function<void()>& restoreCursorAfterWrite) final;
- Status commit(OperationContext* opCtx,
- const CollectionPtr& collection,
- bool dupsAllowed,
- int32_t yieldIterations,
- const KeyHandlerFn& onDuplicateKeyInserted,
- const RecordIdHandlerFn& onDuplicateRecord) final;
-
const MultikeyPaths& getMultikeyPaths() const final;
bool isMultikey() const final;
IndexStateInfo persistDataForShutdown() final;
+ std::unique_ptr<Sorter::Iterator> finalizeSort();
+
+ std::unique_ptr<SortedDataBuilderInterface> setUpBulkInserter(OperationContext* opCtx,
+ bool dupsAllowed);
+
+ void debugEnsureSorted(const Sorter::Data& data);
+
+ bool duplicateCheck(OperationContext* opCtx,
+ const Sorter::Data& data,
+ bool dupsAllowed,
+ const RecordIdHandlerFn& onDuplicateRecord);
+
+ void insertKey(std::unique_ptr<SortedDataBuilderInterface>& inserter, const Sorter::Data& data);
+
+ Status keyCommitted(const KeyHandlerFn& onDuplicateKeyInserted,
+ const Sorter::Data& data,
+ bool isDup);
+
private:
void _insertMultikeyMetadataKeysIntoSorter();
@@ -713,7 +726,8 @@ private:
SortedDataIndexAccessMethod* _iam;
std::unique_ptr<Sorter> _sorter;
- int64_t _keysInserted = 0;
+
+ KeyString::Value _previousKey;
// Set to true if any document added to the BulkBuilder causes the index to become multikey.
bool _isMultiKey = false;
@@ -740,7 +754,11 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> SortedDataIndexAccessMethod::ini
SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAccessMethod* iam,
size_t maxMemoryUsageBytes,
StringData dbName)
- : _iam(iam), _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) {
+ : BulkBuilderCommon(0,
+ "Index Build: inserting keys from external sorter into index",
+ iam->_descriptor->indexName()),
+ _iam(iam),
+ _sorter(_makeSorter(maxMemoryUsageBytes, dbName)) {
countNewBuildInStats();
}
@@ -748,10 +766,12 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(SortedDataIndexAcc
size_t maxMemoryUsageBytes,
const IndexStateInfo& stateInfo,
StringData dbName)
- : _iam(iam),
+ : BulkBuilderCommon(stateInfo.getNumKeys().value_or(0),
+ "Index Build: inserting keys from external sorter into index",
+ iam->_descriptor->indexName()),
+ _iam(iam),
_sorter(
_makeSorter(maxMemoryUsageBytes, dbName, stateInfo.getFileName(), stateInfo.getRanges())),
- _keysInserted(stateInfo.getNumKeys().value_or(0)),
_isMultiKey(stateInfo.getIsMultikey()),
_indexMultikeyPaths(createMultikeyPaths(stateInfo.getMultikeyPaths())) {
countResumedBuildInStats();
@@ -888,135 +908,65 @@ SortedDataIndexAccessMethod::BulkBuilderImpl::_makeSorter(
_makeSorterSettings());
}
-Status SortedDataIndexAccessMethod::BulkBuilderImpl::commit(
- OperationContext* opCtx,
- const CollectionPtr& collection,
- bool dupsAllowed,
- int32_t yieldIterations,
- const KeyHandlerFn& onDuplicateKeyInserted,
- const RecordIdHandlerFn& onDuplicateRecord) {
-
- Timer timer;
-
- const auto descriptor = _iam->_descriptor;
- auto ns = _iam->_indexCatalogEntry->getNSSFromCatalog(opCtx);
-
+std::unique_ptr<mongo::Sorter<KeyString::Value, mongo::NullValue>::Iterator>
+SortedDataIndexAccessMethod::BulkBuilderImpl::finalizeSort() {
_insertMultikeyMetadataKeysIntoSorter();
- std::unique_ptr<Sorter::Iterator> it(_sorter->done());
-
- static constexpr char message[] = "Index Build: inserting keys from external sorter into index";
- ProgressMeterHolder pm;
- {
- stdx::unique_lock<Client> lk(*opCtx->getClient());
- pm.set(
- CurOp::get(opCtx)->setProgress_inlock(message, _keysInserted, 3 /* secondsBetween */));
- }
-
- auto builder = _iam->getSortedDataInterface()->makeBulkBuilder(opCtx, dupsAllowed);
-
- KeyString::Value previousKey;
-
- for (int64_t i = 0; it->more(); i++) {
- opCtx->checkForInterrupt();
-
- auto failPointHang = [opCtx, i, &indexName = descriptor->indexName()](FailPoint* fp) {
- fp->executeIf(
- [fp, opCtx, i, &indexName](const BSONObj& data) {
- LOGV2(4924400,
- "Hanging index build during bulk load phase",
- "iteration"_attr = i,
- "index"_attr = indexName);
+ return std::unique_ptr<Sorter::Iterator>(_sorter->done());
+}
- fp->pauseWhileSet(opCtx);
- },
- [i, &indexName](const BSONObj& data) {
- auto indexNames = data.getObjectField("indexNames");
- return i == data["iteration"].numberLong() &&
- std::any_of(indexNames.begin(),
- indexNames.end(),
- [&indexName](const auto& elem) {
- return indexName == elem.String();
- });
- });
- };
- failPointHang(&hangIndexBuildDuringBulkLoadPhase);
- failPointHang(&hangIndexBuildDuringBulkLoadPhaseSecond);
-
- // Get the next datum and add it to the builder.
- Sorter::Data data = it->next();
-
- // Assert that keys are retrieved from the sorter in non-decreasing order, but only in debug
- // builds since this check can be expensive.
- int cmpData;
- if (descriptor->unique()) {
- cmpData = (_iam->getSortedDataInterface()->rsKeyFormat() == KeyFormat::Long)
- ? data.first.compareWithoutRecordIdLong(previousKey)
- : data.first.compareWithoutRecordIdStr(previousKey);
- }
+std::unique_ptr<SortedDataBuilderInterface>
+SortedDataIndexAccessMethod::BulkBuilderImpl::setUpBulkInserter(OperationContext* opCtx,
+ bool dupsAllowed) {
+ _ns = _iam->_indexCatalogEntry->getNSSFromCatalog(opCtx);
+ return _iam->getSortedDataInterface()->makeBulkBuilder(opCtx, dupsAllowed);
+}
- if (kDebugBuild && data.first.compare(previousKey) < 0) {
- LOGV2_FATAL_NOTRACE(
- 31171,
- "Expected the next key to be greater than or equal to the previous key",
- "nextKey"_attr = data.first.toString(),
- "previousKey"_attr = previousKey.toString(),
- "index"_attr = descriptor->indexName());
- }
- // Before attempting to insert, perform a duplicate key check.
- bool isDup = (descriptor->unique()) ? (cmpData == 0) : false;
- if (isDup && !dupsAllowed) {
- Status status = _iam->_handleDuplicateKey(opCtx, data.first, onDuplicateRecord);
- if (!status.isOK()) {
- return status;
- }
- continue;
- }
+void SortedDataIndexAccessMethod::BulkBuilderImpl::debugEnsureSorted(const Sorter::Data& data) {
+ if (data.first.compare(_previousKey) < 0) {
+ LOGV2_FATAL_NOTRACE(31171,
+ "Expected the next key to be greater than or equal to the previous key",
+ "nextKey"_attr = data.first.toString(),
+ "previousKey"_attr = _previousKey.toString(),
+ "index"_attr = _indexName);
+ }
+}
- Status status = writeConflictRetry(opCtx, "addingKey", ns.ns(), [&] {
- WriteUnitOfWork wunit(opCtx);
- Status status = builder->addKey(data.first);
- if (!status.isOK()) {
- return status;
- }
+bool SortedDataIndexAccessMethod::BulkBuilderImpl::duplicateCheck(
+ OperationContext* opCtx,
+ const Sorter::Data& data,
+ bool dupsAllowed,
+ const RecordIdHandlerFn& onDuplicateRecord) {
- wunit.commit();
- return Status::OK();
- });
+ auto descriptor = _iam->_descriptor;
- if (!status.isOK()) {
- // Duplicates are checked before inserting.
- invariant(status.code() != ErrorCodes::DuplicateKey);
- return status;
- }
+ bool isDup = false;
+ if (descriptor->unique()) {
+ int cmpData = (_iam->getSortedDataInterface()->rsKeyFormat() == KeyFormat::Long)
+ ? data.first.compareWithoutRecordIdLong(_previousKey)
+ : data.first.compareWithoutRecordIdStr(_previousKey);
+ isDup = (cmpData == 0);
+ }
- previousKey = data.first;
+ // Before attempting to insert, perform a duplicate key check.
+ if (isDup && !dupsAllowed) {
+ uassertStatusOK(_iam->_handleDuplicateKey(opCtx, data.first, onDuplicateRecord));
+ }
+ return isDup;
+}
- if (isDup) {
- status = onDuplicateKeyInserted(data.first);
- if (!status.isOK())
- return status;
- }
+void SortedDataIndexAccessMethod::BulkBuilderImpl::insertKey(
+ std::unique_ptr<SortedDataBuilderInterface>& inserter, const Sorter::Data& data) {
+ uassertStatusOK(inserter->addKey(data.first));
+}
- // Starts yielding locks after the first non-zero 'yieldIterations' inserts.
- if (yieldIterations && (i + 1) % yieldIterations == 0) {
- yield(opCtx, &collection, ns);
- }
+Status SortedDataIndexAccessMethod::BulkBuilderImpl::keyCommitted(
+ const KeyHandlerFn& onDuplicateKeyInserted, const Sorter::Data& data, bool isDup) {
+ _previousKey = data.first;
- // If we're here either it's a dup and we're cool with it or the addKey went just fine.
- pm.hit();
+ if (isDup) {
+ return onDuplicateKeyInserted(data.first);
}
-
- pm.finished();
-
- LOGV2(20685,
- "Index build: inserted {bulk_getKeysInserted} keys from external sorter into index in "
- "{timer_seconds} seconds",
- "Index build: inserted keys from external sorter into index",
- logAttrs(ns),
- "index"_attr = descriptor->indexName(),
- "keysInserted"_attr = _keysInserted,
- "duration"_attr = Milliseconds(Seconds(timer.seconds())));
return Status::OK();
}